My hobbyist coding updates and releases as the mysterious "Mr. Tines"

Friday, 24 July 2009

The lock-free queue revisited

The previous entry gave the queue itself, and didn't worry about locks that might or might not be taken out by the ancillary code. To avoid allocations and deallocations of queue nodes in the steady state, get rid of the STL list and keep them on a stack --

/*
            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
                    Version 2, December 2004

 Copyright (C) 2004 Sam Hocevar
  14 rue de Plaisance, 75014 Paris, France
 Everyone is permitted to copy and distribute verbatim or modified
 copies of this license document, and changing it is allowed as long
 as the name is changed.

            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. You just DO WHAT THE FUCK YOU WANT TO. */

/* This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details. */ 

#pragma once

// suppress 64-bit ready warning about casting on the win32 platform (only)
#pragma warning(disable:4311)
#pragma warning(disable:4312)

// After Sutter in Dr. Dobbs -- http://ddj.com/cpp/210604448?pgno=2
#include <list>
#include "Windows.h"

template <typename T>
class SimpleQueue {
public:
    typedef T* Tptr;                                // should really be a smart_pointer<T>
    typedef const T * Tcptr;

private:
    SimpleQueue(const SimpleQueue &);               // Not copyable
     SimpleQueue & operator= (const SimpleQueue &); // Not assignable

  struct Node {
     Node( Tptr val ) : value(val), next(NULL) { }
    Node() : next(NULL) { value = NULL; }
    Tptr value;
    Node* next;
  };

  Node * freeStack;             // for producer only
  Node* first;                  // for producer only
  Node *divider, *last;         // shared -- Use explicit atomic compares only

  // Allocator/Deallocator for nodes -- 
  // only used in the producer thread
  // OR in the destructor.
  Node * Get(Tptr val)
  {
     if(freeStack)
     {
        // Clean because of Release
        Node * next = freeStack;
        freeStack = next->next;
        next->value = val;
        return next;
     }

     // clean by construction
     return new Node(val);     
  }

  // Avoids costly free() while running
  void Release(Node * node)
  {
        // reset the node to clean before shelving it
        node->value = NULL;
        node->next = freeStack;
        freeStack = node;
  }


public:
    SimpleQueue() : freeStack(NULL) {
    first = divider = last = Get(NULL);                         // add dummy separator
  }

  ~SimpleQueue() {
    while( first != NULL ) {               // release the list
      Node* tmp = first;
      first = tmp->next;
      delete tmp;
    }

     // Require -- Producer thread calls this or is dead
     while(freeStack)
     {
          delete Get(NULL);
     }
  }

  // Produce is called on the producer thread only:
  void Produce( Tptr t ) {
    last->next = Get(t);                            // add the new item
    InterlockedExchangePointer(&last, last->next);  // publish it : atomic last = last->next;


    // Burn the consumed part of the queue
    for( PVOID looper = first;                     // non-null; pointer read is atomic; atomic while( first != divider )
           InterlockedCompareExchangePointer(&looper, NULL, divider), looper;
           looper = first)
     {
      Node* tmp = first;
      first = first->next;

      Release(tmp);
    }
  }

  // Consume is called on the consumer thread only:
  bool Consume( Tptr & result ) {

     PVOID choice = divider;                                  // non-null; pointer read is atomic
     InterlockedCompareExchangePointer(&choice, NULL, last);

     if(choice)
     {
        if(!divider || ! divider->next || ! divider->next->value) 
            printf("help!");

        result = divider->next->value;                        // C: copy it back
        choice = divider;
        InterlockedExchangePointer(&divider, divider->next);  // D: publish that we took it : atomic divider = divider->next;
        reinterpret_cast<Node*>(choice)->next = NULL;
        return true;                                          // and report success
    }

    return false;                                             // else report empty
  }
};

#pragma warning(default:4312)
#pragma warning(default:4311)

Next add a recycling class to avoid having to keep allocating and reallocating payload objects. C++ sucks as a functional language (without all the heavyweight Boost machinery), so import the functions we need via static polymorphic constraints on templated types.

/*
            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
                    Version 2, December 2004

 Copyright (C) 2004 Sam Hocevar
  14 rue de Plaisance, 75014 Paris, France
 Everyone is permitted to copy and distribute verbatim or modified
 copies of this license document, and changing it is allowed as long
 as the name is changed.

            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. You just DO WHAT THE FUCK YOU WANT TO. */

/* This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details. */ 
#pragma once

#include "SimpleQueue.h"

template <typename T, typename TSource, typename TReceiver>
class ConveyorBelt {
    SimpleQueue<T> forward;
    SimpleQueue<T> reverse;

public:
    

    ConveyorBelt() {}
    ~ConveyorBelt() 
    {
        SimpleQueue<T>::Tptr t;
        while(forward.Consume(t))
        {
            delete t;
        }
        while(reverse.Consume(t))
        {
            delete t;
        }
    }

    void Accept(TSource & source)
    {
        SimpleQueue<T>::Tptr t;
        if(!reverse.Consume(t))
        {
            t = new T();
        }
        source.Set(t);
        forward.Produce(t);
    }

    bool Display(TReceiver & sink)
    {
        SimpleQueue<T>::Tptr t;
        if(!forward.Consume(t))
            return false;

        SimpleQueue<T>::Tcptr tconst = t;
        sink.Use(tconst);
        t->Reset();


        reverse.Produce(t);
        return true;
    }
};

All T records are owned by the ConveyorBelt, and we just loan them out to the TSource and TReceiver objects to write into or read from. Once read, they are recycled back to the producing thread.

A test harness looks like this --

/*
            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
                    Version 2, December 2004

 Copyright (C) 2004 Sam Hocevar
  14 rue de Plaisance, 75014 Paris, France
 Everyone is permitted to copy and distribute verbatim or modified
 copies of this license document, and changing it is allowed as long
 as the name is changed.

            DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
   TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION

  0. You just DO WHAT THE FUCK YOU WANT TO. */

/* This program is free software. It comes without any warranty, to
 * the extent permitted by applicable law. You can redistribute it
 * and/or modify it under the terms of the Do What The Fuck You Want
 * To Public License, Version 2, as published by Sam Hocevar. See
 * http://sam.zoy.org/wtfpl/COPYING for more details. */ 

#include "stdafx.h"
#include "SimpleQueue.h"
#include "ConveyorBelt.h"

using namespace System;
using namespace System::Threading;


class MarkedBuffer {
public:
    size_t start;
    size_t end;
    char data[16384];

    MarkedBuffer() : start(0), end(0) 
    { Reset(); }

    void Reset()
    { memset(data, 0, sizeof(data)); }
};

class Endpoints {
public:
    size_t source;
    size_t sink;

    Endpoints() : source(0), sink(0) {}
    void Set(MarkedBuffer * t)
    {
        t->start = source;
    }
    void Use(MarkedBuffer const * t)
    {
        sink = t->start;
    }
};


ref class Work
{
public:

   static ConveyorBelt<MarkedBuffer, Endpoints, Endpoints> * q2;
   static Endpoints * e;

   static void Produce2()
   {
      Console::WriteLine( "Producer thread procedure." );
       for(int i=0; i<32768; ++i)
       {
            e->source = i;
            q2->Accept(*e);
            if(0 == (i%100)) Thread::Sleep(200);
            else Thread::Sleep(0);
       }
      Console::WriteLine( "Producer thread finishhd." );
   }

   static void Consume2()
   {
      Console::WriteLine( "Consumer thread procedure." );
       int latest = -1;
       while(latest < 32767)
       {
           if(q2->Display(*e))
            {
                 if(e->sink != size_t(latest+1))
                 {
      Console::WriteLine( "Consumer thread procedure -- failure." );
                 }
                 latest = int(e->sink);
            }
       }
      Console::WriteLine( "Consumer thread finished." );
   }
};


int main(array<System::String ^> ^)
{

     {
          ConveyorBelt<MarkedBuffer, Endpoints, Endpoints> queue;
          Endpoints e;

          Work::q2 = &queue;
          Work::e = &e;

          ThreadStart ^ producerDelegate = gcnew ThreadStart( &Work::Produce2 );
          Thread ^ producerThread = gcnew Thread( producerDelegate );
          ThreadStart ^ consumerDelegate = gcnew ThreadStart( &Work::Consume2 );
          Thread ^ consumerThread = gcnew Thread( consumerDelegate );


          producerThread->Start();
          consumerThread->Start();

          producerThread->Join();
          consumerThread->Join();
     }

      Console::WriteLine( "Completed." );

    return 0;
}

Where the MarkedBuffer type is representative of the uses I have made of such a queue inside a proxying application -- a packet read from the network into the data array or composed for writing to the network, with start and end offsets as marked; the data being conveyed between threads service client facing and server facing sockets.

Using the ConveyorBelt to carry data, allocations will take place until a steady state is reached, and may infrequently happen as spikes load the queue; all deallocations are deferred to the destruction of the system.

1 comment:

oo said...

Thanks for your really helpful code.

I think there is an error in "bool Consume( Tptr & result )"

the line:

reinterpret_cast(choice)->next = NULL;

should be:

reinterpret_cast(choice)->next->value = NULL;

Without this change, your code is leaking a lot.