Prompted by some discussion on StackOverflow today...
This is the simple single-producer, single-consumer version of the Lock-Free Queue as published by Herb Sutter in DDJ, but adapted to replace the C++0x style atomic<Node*>
with Win32 API calls to InterlockedExchangePointer
and InterlockedCompareExchangePointer
. Converting the multiple producers/consumers case is left as a trivial exercise for the reader.
So is doing the free-list manually, as a stack of Node
s rather than bringing in a gratuitous std::list
.
/* DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar 14 rue de Plaisance, 75014 Paris, France Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. */ /* This program is free software. It comes without any warranty, to * the extent permitted by applicable law. You can redistribute it * and/or modify it under the terms of the Do What The Fuck You Want * To Public License, Version 2, as published by Sam Hocevar. See * http://sam.zoy.org/wtfpl/COPYING for more details. */ #pragma once // suppress 64-bit ready warning about casting on the win32 platform (only) #pragma warning(disable:4311) #pragma warning(disable:4312) // After Sutter in Dr. Dobbs -- http://ddj.com/cpp/210604448?pgno=2 #include <list> #include "Windows.h" template <typename T> class SimpleQueue { typedef T* Tptr; // should really be a smart_pointer<T> private: SimpleQueue(const SimpleQueue &); // Not copyable SimpleQueue & operator= (const SimpleQueue &); // Not assignable struct Node { Node( Tptr val ) : value(val), next(NULL) { } Node() : next(NULL) { value = NULL; } Tptr value; Node* next; }; std::list<Node *> freeList; // for producer only Node* first; // for producer only Node *divider, *last; // shared -- Use explicit atomic compares only // Allocator/Deallocator for nodes -- // only used in the producer thread // OR in the destructor. Node * Get(Tptr val) { if(!freeList.empty()) { // Clean because of Release Node * next = freeList.front(); freeList.pop_front(); next->value = val; return next; } // clean by construction return new Node(val); } // Avoids costly free() while running void Release(Node * node) { // reset the node to clean before shelving it node->value = NULL; node->next = NULL; freeList.push_back(node); } public: SimpleQueue() { first = divider = last = Get(NULL); // add dummy separator } ~SimpleQueue() { while( first != NULL ) { // release the list Node* tmp = first; first = tmp->next; delete tmp; } // Require -- Producer thread calls this or is dead while(!freeList.empty()) { delete Get(NULL); } } // Produce is called on the producer thread only: void Produce( Tptr t ) { last->next = Get(t); // add the new item InterlockedExchangePointer(&last, last->next); // publish it // Burn the consumed part of the queue for( PVOID looper = first; // non-null; pointer read is atomic InterlockedCompareExchangePointer(&looper, NULL, divider), looper; looper = first) { Node* tmp = first; first = first->next; Release(tmp); } } // Consume is called on the consumer thread only: bool Consume( Tptr & result ) { PVOID choice = divider; // non-null; pointer read is atomic InterlockedCompareExchangePointer(&choice, NULL, last); if(choice) { result = divider->next->value; // C: copy it back choice = divider; InterlockedExchangePointer(÷r, divider->next); // D: publish that we took it reinterpret_cast<Node*>(choice)->next = NULL; return true; // and report success } return false; // else report empty } }; #pragma warning(default:4312) #pragma warning(default:4311)
And a simple test harness using C++/CLI to make the threading simpler to write
/* DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar 14 rue de Plaisance, 75014 Paris, France Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. */ /* This program is free software. It comes without any warranty, to * the extent permitted by applicable law. You can redistribute it * and/or modify it under the terms of the Do What The Fuck You Want * To Public License, Version 2, as published by Sam Hocevar. See * http://sam.zoy.org/wtfpl/COPYING for more details. */ #include "stdafx.h" #include "SimpleQueue.h" using namespace System; using namespace System::Threading; ref class Work { public: static void Produce() { Console::WriteLine( "Producer thread procedure." ); for(int i=0; i<32768; ++i) { int * data = reinterpret_cast<int*>(malloc(sizeof(int))); *data = i; q->Produce(data); if(0 == (i%1000)) Thread::Sleep(20); } Console::WriteLine( "Producer thread finished." ); } static void Consume() { Console::WriteLine( "Consumer thread procedure." ); int latest = -1; while(latest < 32767) { int * data; if(q->Consume(data)) { if(data[0] != latest+1) { Console::WriteLine( "Consumer thread procedure -- failure." ); } latest = data[0]; free(data); } } Console::WriteLine( "Consumer thread finished." ); } static SimpleQueue<int> * q; }; int main(array<System::String ^> ^) { { SimpleQueue<int> queue; Work::q = &queue; ThreadStart ^ producerDelegate = gcnew ThreadStart( &Work::Produce ); Thread ^ producerThread = gcnew Thread( producerDelegate ); ThreadStart ^ consumerDelegate = gcnew ThreadStart( &Work::Consume ); Thread ^ consumerThread = gcnew Thread( consumerDelegate ); producerThread->Start(); consumerThread->Start(); producerThread->Join(); consumerThread->Join(); } Console::WriteLine( "Completed." ); return 0; }
6 comments:
Now, what's going to be the best way to avoid the locking within malloc/free?
That truly isn't an issue of the queue locking (after an initial hit for allocating Nodes until some steady state, it isn't allocating), but rather one for the consuming application.
If your producer isn't forced to allocate at any point, but can re-use the T records, than you can use a second queue with the roles reversed to send "empty" records back from the original consumer for re-use by the producer. After the initial hits for allocating to a steady state high-water mark, that ceases to be an issue --deallocation is deferred until after the queue terminates.
Steve,
I was able to get this to work within a CLR console application in Visual C++.Net.
However, when I try to use it in my C++.Net application, the inclusion of Windows.h causes problems.
Is there a version of this code that would work with a Visual C++.Net CLR?
Thank you.
If cutting down to just winbase.h doesn't work, then you may have to manually extract just that part that defines the Interlocked methods. If that fails, you probably have to resort to System.Threading.Interlocked methods instead.
Just a warning to anyone who thinks about using this code: its wrong, nothing prevents the compiler to reorder instructions, the cpu to execute out of order, or the memory bus to flush the caches out of order.
To be fixed it would need volatile variables and/or memory barriers to tell compiler and hardware where to prevent reordering.
Did you check the docs for InterlockedExchangePointer and InterlockedCompareExchangePointer?
"This function generates a full memory barrier (or fence) to ensure that memory operations are completed in order."
Of course in 2013, we can now use atomic<Node*> explicitly in any reasonably recent compiler, rather than having to dive into platform specific APIs like those.
Post a Comment