ringBufferA
Dennis Lang
lang.dennis @ comcast.net

Thread optimized Ring Buffer
Compared to Queues published in DDJ Jan 2009 by Herb Sutter
Updated: 25-Jan-2015


Dr Dobbs has published several articles on thread safe queues and queue performance:

The graph shows the performance (transactions per second) of several Queue and Ring buffer implementations when exercised by two threads. One thread is the producer and fills the buffer and the other is the consumer and empties the buffer. This is a common programming work flow.

The primary reason for this article was to demonstrate that a lock free buffer was possible and that it out performs other implementations. The graph clearly shows this and further shows that Boost's lock-free queue is the slowest in this test scenario.

The key part of the test program spawns two threads which share the container. The producer thread fills the buffer with numbers stored in a object container and the second thread consumes the objects and sums the values. The total, memory usage and number of transactions is recorded. The test includes various container sizes to measure how they impact the buffer's performance.

The memory is monitored by globally overloading new and delete. The maximum memory usage is shown in the table. The StackMS object shows no memory probably because it is using malloc and not new so its memory usage is unknown.

Test were run with increasing ring buffer size and there was no change in the performance order of the containers. The lockfree RingBuffer always out performed the others. Similarly as the data object size was increased the buffers relative performance order was unchanged but they all decreased their transaction rate as the overhead of moving large objects started to dominate the test.

graph table

The test was run on Windows7/x64 Dual Quad Core Intel Xeon at 2.83GHz, 12GB ram with a 64bit VS2013 compiled release x64 build.

RingBuffer is the baseline container and has no locks and only supports a single producer and consumer (2 threads). This container has the best performance and uses no extra memory beyond its fixed queue size allocation. Data objects are held by value.

StackMS is a thread safe Stack using Microsoft's singly linked list. This is a very fast thread safe container. It uses slightly more memory then RingBuffer but is still very light. This container supports multiple threads and may be the best general purpose solution on Windows but it is a stack (LIFO, last in first out).

RingBufferLock is a fixed size ring buffer and stores the data object by value avoiding calls to Malloc. This container is similar to RingBuffer but uses locks to make it thread safe. This buffer works well for small data objects.

BoostFixedszQueue is Boost's lock-free queue set to use a fixed size buffer making it similar to RingBuffer. This container has the worse performance.

Benefits of a fixed size Ring buffers over a dynamically growing container:





Non-locking Base line RingBuffer template. Under tseting is thread safe with two threads, one Producer and one Consumer.

//
//  Simple fixed size ring buffer.
//  Manage objects by value.
//  Thread safe for single Producer and single Consumer.
//


#pragma once

template <class T, size_t RingSize>
class RingBuffer
{
public:
    RingBuffer(size_t size = 100) 
        : m_size(size), m_buffer(new T[size]), m_rIndex(0), m_wIndex(0) 
        { assert(size > 1 && m_buffer != NULL); }

    ~RingBuffer() 
        { delete [] m_buffer; };

    size_t Next(size_t n) const 
        { return (n+1)%m_size; }
    bool Empty() const 
        { return (m_rIndex == m_wIndex); }
    bool Full() const
        { return (Next(m_wIndex) == m_rIndex); }

    bool Put(const T& value)
    {
        if (Full()) 
            return false;
        m_buffer[m_wIndex] = value;
        m_wIndex = Next(m_wIndex);
        return true;
    }

    bool Get(T& value)
    {
        if (Empty())
            return false;
        value = m_buffer[m_rIndex];
        m_rIndex = Next(m_rIndex);
        return true;
    }

private:
    T*              m_buffer;
    size_t          m_size;

    // volatile is only used to keep compiler from placing values in registers.
    // volatile does NOT make the index thread safe.
    volatile size_t m_rIndex;   
    volatile size_t m_wIndex;
};


StackMS template. Thread safe Stack (Microsoft Singly Linked List):

#pragma once
#include "Lock.h"
#include "IBuffer.h"

// Fixed sized thread save Stack using Mircosoft's Sinlgy Linked List API.
//
// Create two Linked List (free and data).
// Free is filled at construction and used to provide containers to hold data added by caller to data list.
// When data is popped off the container is placed back on the free list.
//
// T = data type
// RingSize = ring size
template <class T, size_t RingSize>
class StackMS : public IBuffer<T>
{
private:
// we need to align the memory-pool-chunks.
#pragma pack(push, MEMORY_ALLOCATION_ALIGNMENT)

    /// List Item
    template <typename TYPE>
    struct ListItem
    {
        /// Next entry in the singly-linked list.
        SLIST_ENTRY Next;
        T value;
    };
    typedef ListItem<T> ListValue;

#pragma pack(pop, MEMORY_ALLOCATION_ALIGNMENT)

    /// Head of free linked list.
    SLIST_HEADER m_freeList;
    ListValue m_freeValues[RingSize];

    /// Head of data linked list.
    SLIST_HEADER m_dataList;

    // no copying is supported
    StackMS& operator=(const StackMS&);
    StackMS(const StackMS&);

public:
    StackMS()
    {
        InitializeSListHead(&m_freeList);
        for (size_t i = 0; i < RingSize; i++)
        {
            InterlockedPushEntrySList(&m_freeList, &m_freeValues[i].Next);
        }
        InitializeSListHead(&m_dataList);
    }

    ~StackMS()
    {
        InterlockedFlushSList(&m_freeList);
        InterlockedFlushSList(&m_dataList);
    }
    
    size_t Size() 
    {
        return QueryDepthSList(&m_dataList);
    }

    // Get object off of stack (LIFO)
    bool Get(T& value)
    {
        ListValue* pListValue = reinterpret_cast<ListValue*>(InterlockedPopEntrySList(&m_dataList));
        if (NULL == pListValue)
            return false;
        value = pListValue->value;
        InterlockedPushEntrySList(&m_freeList, &pListValue->Next);
        return true;
    }

    // Put object on of stack (LIFO)
    bool Put(const T& value)  
    {
        ListValue* pListValue = reinterpret_cast<ListValue*>(InterlockedPopEntrySList(&m_freeList));
        if (NULL == pListValue)
            return false;

        pListValue->value = value;
        InterlockedPushEntrySList(&m_dataList, &pListValue->Next);
        return true;
    }

    // Reverse order of Linked List items.
    ListValue* ReverseSList(ListValue* pHead)
    {
	    ListValue * pPrev = NULL;
        ListValue * pThis = pHead;
	    ListValue * pNext;  

   	    while (pThis)
        {
		    pNext = pThis->Next;
		    pThis->Next = pPrev;
		    pPrev = pThis;
		    pThis = pNext;
	    }

	    return pPrev;
    }

    // Get all items in LIFO order.
    ListValue* GetAll()
    {
        ListValue* pItems = InterlockedFlushSList(&m_dataList);
        PutAllOnFree(pItems);
        return pItems;
    }

    // Get all items in FIFO order.
    ListValue* GetAllReverse()
    {
        ListValue* pItems = ReverseSList(InterlockedFlushSList(&m_dataList));
        PutAllOnFree(pItems);
        return pItems;
    }

    void PutAllOnFree(ListValue* pItems)
    {
        while (pItems != NULL)
        {
            ListItem* pNextItem = pItems->Next;
            InterlockedPushEntrySList(&m_freeList, pItems);
            pItems = pNextItem;
        }
    }
};


RingBufferLock template, Thread safe RingBuffer holds data by value.

#pragma once
#include "Lock.h"
#include "IBuffer.h"

template <class T, size_t RingSize, class L>
class RingBufferLock : public IBuffer<T>
{
public:
    RingBufferLock(size_t size = RingSize) 
        : m_buffer(new T[size]), m_size(size), m_rIndex(0), m_wIndex(0) 
        { assert(size > 1 && m_buffer != NULL); }

    ~RingBufferLock() 
        { delete [] m_buffer; };

    size_t Next(size_t n) const 
        { return (n+1)%m_size; }
    bool Empty() const 
        { return (m_rIndex == m_wIndex); }
    bool Full() const
        { return (Next(m_wIndex) == m_rIndex); }

    bool Put(const T& value)
    {
		LockIt lockit(m_wLock);
		if (!Full()) 
		{
			m_buffer[m_wIndex] = value;
			m_wIndex = Next(m_wIndex);
			return true;
		}

        return false;
    }

    bool Get(T& value)
    {
		LockIt lockit(m_rLock);
		if (!Empty())
		{
			value = m_buffer[m_rIndex];
			m_rIndex = Next(m_rIndex);
			return true;
		}
     
        return false;
    }

private:
    T*              m_buffer;
    size_t          m_size;

    // volatile is only used to keep comipler from placing values in registers.
    // volatile does NOT make the index thread safe.

    volatile size_t m_rIndex;
    L m_rLock;

    volatile size_t m_wIndex;
    L m_wLock;
};


Top
Home