I have been working on a project and needed a good concurrent queue, so I wrote one, is their anyone more familiar with the atomic arts that could tell me if their is anyway i can further improve it.

module container.concurrentqueue;

import std.typetuple;
import core.atomic;

class ConcurrentQueue( items...  ) {
        
        align(64) class nodeType {
                align(1):
this( ) { atomicStore( this.next, cast(shared nodeType) null ); }
                this( TypeTuple!items value ) {
                        foreach( k, v ; value ) {
                                this.value[k] = v;
                        }
                        this();
                }
                
                TypeTuple!items value;
                shared nodeType next;
        }
        
        class ConsumerResult {
                TypeTuple!items value;
                
                alias value this;
        }
        
        public this() {
                shared nodeType temp = cast(shared)new nodeType( );
                
                atomicStore( first, temp );
                atomicStore( last , temp );
                atomicStore( producerLock, false );
                atomicStore( consumerLock, false );
        }       
        
        public void Produce( items item ) {
                
                TypeTuple!items t = item;
                shared nodeType temp = cast(shared)new nodeType ( t );
                
                while( !cas(&producerLock, false, true ) ) { }
                
                atomicStore( last.next   , temp );
                atomicStore( last        , temp );
                atomicStore( producerLock, false );
        }
        
        public ConsumerResult Consume( ) {
                while( !cas(&consumerLock, false, true ) ) { }
                
                shared nodeType temp = cast(shared)atomicLoad( first );
                shared nodeType next = cast(shared)atomicLoad( temp.next );
                
                ConsumerResult result = new ConsumerResult();
                
                if( next !is null ) {
                        foreach( k, v ; items ) {
                                result[k] = cast(v)next.value[k];
                        }
                        first = next;
                        atomicStore( consumerLock, false );
                        return result;
                }
                atomicStore( consumerLock, false );
                return null;
        }
        
        private shared nodeType first;
        
        private byte padd1[64];
        
        private shared nodeType last;
        
        private byte padd2[64];
        
        private shared bool consumerLock;
        
        private byte padd3[64];
        
        private shared bool producerLock;
}

Reply via email to