My hobbyist coding updates and releases as the mysterious "Mr. Tines"

Wednesday, 22 July 2009

A simple LockFree Queue (C++, Windows)

Prompted by some discussion on StackOverflow today...

This is the simple single-producer, single-consumer version of the Lock-Free Queue as published by Herb Sutter in DDJ, but adapted to replace the C++0x style atomic<Node*> with Win32 API calls to InterlockedExchangePointer and InterlockedCompareExchangePointer. Converting the multiple producers/consumers case is left as a trivial exercise for the reader.

So is doing the free-list manually, as a stack of Nodes rather than bringing in a gratuitous std::list.

/*
            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
                    Version 2, December 2004

 Copyright (C) 2004 Sam Hocevar
  14 rue de Plaisance, 75014 Paris, France
 Everyone is permitted to copy and distribute verbatim or modified
 copies of this license document, and changing it is allowed as long
 as the name is changed.

            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. You just DO WHAT THE FUCK YOU WANT TO. */

/* This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details. */ 

#pragma once

// suppress 64-bit ready warning about casting on the win32 platform (only)
#pragma warning(disable:4311)
#pragma warning(disable:4312)

// After Sutter in Dr. Dobbs -- http://ddj.com/cpp/210604448?pgno=2
#include <list>
#include "Windows.h"

template <typename T>
class SimpleQueue {

    typedef T* Tptr;                                // should really be a smart_pointer<T>

private:
    SimpleQueue(const SimpleQueue &);               // Not copyable
     SimpleQueue & operator= (const SimpleQueue &); // Not assignable

  struct Node {
     Node( Tptr val ) : value(val), next(NULL) { }
    Node() : next(NULL) { value = NULL; }
    Tptr value;
    Node* next;
  };

  std::list<Node *> freeList;   // for producer only
  Node* first;                  // for producer only
  Node *divider, *last;         // shared -- Use explicit atomic compares only

  // Allocator/Deallocator for nodes -- 
  // only used in the producer thread
  // OR in the destructor.
  Node * Get(Tptr val)
  {
     if(!freeList.empty())
     {
        // Clean because of Release
        Node * next = freeList.front();
        freeList.pop_front();
        next->value = val;
        return next;
     }

     // clean by construction
     return new Node(val);     
  }

  // Avoids costly free() while running
  void Release(Node * node)
  {
        // reset the node to clean before shelving it
        node->value = NULL;
        node->next = NULL;
        freeList.push_back(node);
  }


public:
  SimpleQueue() {
    first = divider = last = Get(NULL);                         // add dummy separator
  }

  ~SimpleQueue() {
    while( first != NULL ) {               // release the list
      Node* tmp = first;
      first = tmp->next;
      delete tmp;
    }

     // Require -- Producer thread calls this or is dead
     while(!freeList.empty())
     {
          delete Get(NULL);
     }
  }

  // Produce is called on the producer thread only:
  void Produce( Tptr t ) {
    last->next = Get(t);                            // add the new item
    InterlockedExchangePointer(&last, last->next);  // publish it

    // Burn the consumed part of the queue
    for( PVOID looper = first;                     // non-null; pointer read is atomic
           InterlockedCompareExchangePointer(&looper, NULL, divider), looper;
           looper = first)
     {
      Node* tmp = first;
      first = first->next;
      Release(tmp);
    }
  }

  // Consume is called on the consumer thread only:
  bool Consume( Tptr & result ) {

     PVOID choice = divider;                                  // non-null; pointer read is atomic
     InterlockedCompareExchangePointer(&choice, NULL, last);

     if(choice)
     {
        result = divider->next->value;                        // C: copy it back
        choice = divider;

        InterlockedExchangePointer(&divider, divider->next);  // D: publish that we took it
        reinterpret_cast<Node*>(choice)->next = NULL;
        return true;                                          // and report success
    }

    return false;                                             // else report empty
  }
};

#pragma warning(default:4312)
#pragma warning(default:4311)

And a simple test harness using C++/CLI to make the threading simpler to write

/*
            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
                    Version 2, December 2004

 Copyright (C) 2004 Sam Hocevar
  14 rue de Plaisance, 75014 Paris, France
 Everyone is permitted to copy and distribute verbatim or modified
 copies of this license document, and changing it is allowed as long
 as the name is changed.

            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. You just DO WHAT THE FUCK YOU WANT TO. */

/* This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details. */ 

#include "stdafx.h"
#include "SimpleQueue.h"

using namespace System;
using namespace System::Threading;
ref class Work
{
public:
   static void Produce()
   {
      Console::WriteLine( "Producer thread procedure." );
       for(int i=0; i<32768; ++i)
       {
            int * data = reinterpret_cast<int*>(malloc(sizeof(int)));
            *data = i;
            q->Produce(data);
            if(0 == (i%1000)) Thread::Sleep(20);
       }
      Console::WriteLine( "Producer thread finished." );
   }

   static void Consume()
   {
      Console::WriteLine( "Consumer thread procedure." );
       int latest = -1;
       while(latest < 32767)
       {
            int * data;
            if(q->Consume(data))
            {
                 if(data[0] != latest+1)
                 {
      Console::WriteLine( "Consumer thread procedure -- failure." );
                 }
                 latest = data[0];
                 free(data);
            }
       }
      Console::WriteLine( "Consumer thread finished." );
   }

   static SimpleQueue<int> * q;

};


int main(array<System::String ^> ^)
{
     {
          SimpleQueue<int> queue;
          Work::q = &queue;

          ThreadStart ^ producerDelegate = gcnew ThreadStart( &Work::Produce );
          Thread ^ producerThread = gcnew Thread( producerDelegate );
          ThreadStart ^ consumerDelegate = gcnew ThreadStart( &Work::Consume );
          Thread ^ consumerThread = gcnew Thread( consumerDelegate );


          producerThread->Start();
          consumerThread->Start();
          producerThread->Join();
          consumerThread->Join();
     }

      Console::WriteLine( "Completed." );

    return 0;
}

6 comments:

Anonymous said...

Now, what's going to be the best way to avoid the locking within malloc/free?

Steve said...

That truly isn't an issue of the queue locking (after an initial hit for allocating Nodes until some steady state, it isn't allocating), but rather one for the consuming application.

If your producer isn't forced to allocate at any point, but can re-use the T records, than you can use a second queue with the roles reversed to send "empty" records back from the original consumer for re-use by the producer. After the initial hits for allocating to a steady state high-water mark, that ceases to be an issue --deallocation is deferred until after the queue terminates.

ivan said...

Steve,

I was able to get this to work within a CLR console application in Visual C++.Net.

However, when I try to use it in my C++.Net application, the inclusion of Windows.h causes problems.

Is there a version of this code that would work with a Visual C++.Net CLR?

Thank you.

Steve said...

If cutting down to just winbase.h doesn't work, then you may have to manually extract just that part that defines the Interlocked methods. If that fails, you probably have to resort to System.Threading.Interlocked methods instead.

Anonymous said...

Just a warning to anyone who thinks about using this code: its wrong, nothing prevents the compiler to reorder instructions, the cpu to execute out of order, or the memory bus to flush the caches out of order.

To be fixed it would need volatile variables and/or memory barriers to tell compiler and hardware where to prevent reordering.

Steve Gilham said...

Did you check the docs for InterlockedExchangePointer and InterlockedCompareExchangePointer?

"This function generates a full memory barrier (or fence) to ensure that memory operations are completed in order."

Of course in 2013, we can now use atomic<Node*> explicitly in any reasonably recent compiler, rather than having to dive into platform specific APIs like those.