The previous entry gave the queue itself, and didn't worry about locks that might or might not be taken out by the ancillary code. To avoid allocations and deallocations of queue nodes in the steady state, get rid of the STL list
and keep them on a stack --
/* DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar 14 rue de Plaisance, 75014 Paris, France Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. */ /* This program is free software. It comes without any warranty, to * the extent permitted by applicable law. You can redistribute it * and/or modify it under the terms of the Do What The Fuck You Want * To Public License, Version 2, as published by Sam Hocevar. See * http://sam.zoy.org/wtfpl/COPYING for more details. */ #pragma once // suppress 64-bit ready warning about casting on the win32 platform (only) #pragma warning(disable:4311) #pragma warning(disable:4312) // After Sutter in Dr. Dobbs -- http://ddj.com/cpp/210604448?pgno=2 #include <list> #include "Windows.h" template <typename T> class SimpleQueue { public: typedef T* Tptr; // should really be a smart_pointer<T> typedef const T * Tcptr; private: SimpleQueue(const SimpleQueue &); // Not copyable SimpleQueue & operator= (const SimpleQueue &); // Not assignable struct Node { Node( Tptr val ) : value(val), next(NULL) { } Node() : next(NULL) { value = NULL; } Tptr value; Node* next; }; Node * freeStack; // for producer only Node* first; // for producer only Node *divider, *last; // shared -- Use explicit atomic compares only // Allocator/Deallocator for nodes -- // only used in the producer thread // OR in the destructor. Node * Get(Tptr val) { if(freeStack) { // Clean because of Release Node * next = freeStack; freeStack = next->next; next->value = val; return next; } // clean by construction return new Node(val); } // Avoids costly free() while running void Release(Node * node) { // reset the node to clean before shelving it node->value = NULL; node->next = freeStack; freeStack = node; } public: SimpleQueue() : freeStack(NULL) { first = divider = last = Get(NULL); // add dummy separator } ~SimpleQueue() { while( first != NULL ) { // release the list Node* tmp = first; first = tmp->next; delete tmp; } // Require -- Producer thread calls this or is dead while(freeStack) { delete Get(NULL); } } // Produce is called on the producer thread only: void Produce( Tptr t ) { last->next = Get(t); // add the new item InterlockedExchangePointer(&last, last->next); // publish it : atomic last = last->next; // Burn the consumed part of the queue for( PVOID looper = first; // non-null; pointer read is atomic; atomic while( first != divider ) InterlockedCompareExchangePointer(&looper, NULL, divider), looper; looper = first) { Node* tmp = first; first = first->next; Release(tmp); } } // Consume is called on the consumer thread only: bool Consume( Tptr & result ) { PVOID choice = divider; // non-null; pointer read is atomic InterlockedCompareExchangePointer(&choice, NULL, last); if(choice) { if(!divider || ! divider->next || ! divider->next->value) printf("help!"); result = divider->next->value; // C: copy it back choice = divider; InterlockedExchangePointer(÷r, divider->next); // D: publish that we took it : atomic divider = divider->next; reinterpret_cast<Node*>(choice)->next = NULL; return true; // and report success } return false; // else report empty } }; #pragma warning(default:4312) #pragma warning(default:4311)
Next add a recycling class to avoid having to keep allocating and reallocating payload objects. C++ sucks as a functional language (without all the heavyweight Boost machinery), so import the functions we need via static polymorphic constraints on templated types.
/* DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar 14 rue de Plaisance, 75014 Paris, France Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. */ /* This program is free software. It comes without any warranty, to * the extent permitted by applicable law. You can redistribute it * and/or modify it under the terms of the Do What The Fuck You Want * To Public License, Version 2, as published by Sam Hocevar. See * http://sam.zoy.org/wtfpl/COPYING for more details. */ #pragma once #include "SimpleQueue.h" template <typename T, typename TSource, typename TReceiver> class ConveyorBelt { SimpleQueue<T> forward; SimpleQueue<T> reverse; public: ConveyorBelt() {} ~ConveyorBelt() { SimpleQueue<T>::Tptr t; while(forward.Consume(t)) { delete t; } while(reverse.Consume(t)) { delete t; } } void Accept(TSource & source) { SimpleQueue<T>::Tptr t; if(!reverse.Consume(t)) { t = new T(); } source.Set(t); forward.Produce(t); } bool Display(TReceiver & sink) { SimpleQueue<T>::Tptr t; if(!forward.Consume(t)) return false; SimpleQueue<T>::Tcptr tconst = t; sink.Use(tconst); t->Reset(); reverse.Produce(t); return true; } };
All T
records are owned by the ConveyorBelt
, and we just loan them out to the TSource
and TReceiver
objects to write into or read from. Once read, they are recycled back to the producing thread.
A test harness looks like this --
/* DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE Version 2, December 2004 Copyright (C) 2004 Sam Hocevar 14 rue de Plaisance, 75014 Paris, France Everyone is permitted to copy and distribute verbatim or modified copies of this license document, and changing it is allowed as long as the name is changed. DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION 0. You just DO WHAT THE FUCK YOU WANT TO. */ /* This program is free software. It comes without any warranty, to * the extent permitted by applicable law. You can redistribute it * and/or modify it under the terms of the Do What The Fuck You Want * To Public License, Version 2, as published by Sam Hocevar. See * http://sam.zoy.org/wtfpl/COPYING for more details. */ #include "stdafx.h" #include "SimpleQueue.h" #include "ConveyorBelt.h" using namespace System; using namespace System::Threading; class MarkedBuffer { public: size_t start; size_t end; char data[16384]; MarkedBuffer() : start(0), end(0) { Reset(); } void Reset() { memset(data, 0, sizeof(data)); } }; class Endpoints { public: size_t source; size_t sink; Endpoints() : source(0), sink(0) {} void Set(MarkedBuffer * t) { t->start = source; } void Use(MarkedBuffer const * t) { sink = t->start; } }; ref class Work { public: static ConveyorBelt<MarkedBuffer, Endpoints, Endpoints> * q2; static Endpoints * e; static void Produce2() { Console::WriteLine( "Producer thread procedure." ); for(int i=0; i<32768; ++i) { e->source = i; q2->Accept(*e); if(0 == (i%100)) Thread::Sleep(200); else Thread::Sleep(0); } Console::WriteLine( "Producer thread finishhd." ); } static void Consume2() { Console::WriteLine( "Consumer thread procedure." ); int latest = -1; while(latest < 32767) { if(q2->Display(*e)) { if(e->sink != size_t(latest+1)) { Console::WriteLine( "Consumer thread procedure -- failure." ); } latest = int(e->sink); } } Console::WriteLine( "Consumer thread finished." ); } }; int main(array<System::String ^> ^) { { ConveyorBelt<MarkedBuffer, Endpoints, Endpoints> queue; Endpoints e; Work::q2 = &queue; Work::e = &e; ThreadStart ^ producerDelegate = gcnew ThreadStart( &Work::Produce2 ); Thread ^ producerThread = gcnew Thread( producerDelegate ); ThreadStart ^ consumerDelegate = gcnew ThreadStart( &Work::Consume2 ); Thread ^ consumerThread = gcnew Thread( consumerDelegate ); producerThread->Start(); consumerThread->Start(); producerThread->Join(); consumerThread->Join(); } Console::WriteLine( "Completed." ); return 0; }
Where the MarkedBuffer
type is representative of the uses I have made of such a queue inside a proxying application -- a packet read from the network into the data array or composed for writing to the network, with start and end offsets as marked; the data being conveyed between threads service client facing and server facing sockets.
Using the ConveyorBelt
to carry data, allocations will take place until a steady state is reached, and may infrequently happen as spikes load the queue; all deallocations are deferred to the destruction of the system.
1 comment:
Thanks for your really helpful code.
I think there is an error in "bool Consume( Tptr & result )"
the line:
reinterpret_cast(choice)->next = NULL;
should be:
reinterpret_cast(choice)->next->value = NULL;
Without this change, your code is leaking a lot.
Post a Comment