You need to use the state field of nodes to hold state. Node itself is
merely a holder for state. Here is a fixed main function:

int main()
{
    const int MAX = 20;
    mpscq_t q;
    mpscq_create(&q, new mpscq_node_t);

    mpscq_node_t* n = 0;

    printf("Push: ");
    mpscq_node_t* nodes[MAX];
    for(int i =0; i < MAX; i++){
        nodes[i] = new mpscq_node_t;
        nodes[i]->state = (void*)(long)(i + 42);
        spscq_push(&q, nodes[i]);
        printf("%d, ", i + 42);
    }
    printf("\nPoP:  ");

    for(int i =0; i < MAX+5; i++)
    {
        n = spscq_pop(&q);
        if(n !=0)
            printf("%d, ", (int)(long)n->state);
        else
            printf("\nNothing to pop!\n");
    }
}





On Thu, Mar 10, 2016 at 10:56 PM, Saman Barghi <sama...@gmail.com> wrote:
> Dimitry,
>
> Here is a version of the code you posted in this thread modified for gcc. I
> also changed the main function to print out what is being pushed and popped
> to/from the queue:
>
> ================================================================================
> #include <cstdint>
> #include <cassert>
> #include <unistd.h>
> #include <cstdio>
>
> template<typename T>
> T XCHG(T volatile* dest, T value)
> {
>     T t = (T)__sync_lock_test_and_set((long*)dest, (long)value);
>     __sync_synchronize();
>     return t;
> }
>
> template<typename T>
> bool CAS(T volatile* dest, T cmp, T xchg)
> {
>     return cmp == (T)__sync_val_compare_and_swap((long*)dest, (long)xchg,
> (long)cmp);
> }
>
> struct mpscq_node_t
> {
>     mpscq_node_t* volatile  next;
>     void*                   state;
>
> };
>
> struct mpscq_t
> {
>     mpscq_node_t* volatile  head;
>     mpscq_node_t*           tail;
> };
>
> void mpscq_create(mpscq_t* self, mpscq_node_t* stub)
> {
>     stub->next = 0;
>     self->tail = stub;
>     // mark it as empty
>     self->head = (mpscq_node_t*)((uintptr_t)stub | 1);
> }
>
> bool spscq_push(mpscq_t* self, mpscq_node_t* n)
> {
>     n->next = 0;
>     // serialization-point wrt producers
>     mpscq_node_t* prev = XCHG(&self->head, n);
>     // only 2 AND instructions on fast-path added
>     bool was_empty = ((uintptr_t)prev & 1) != 0;
>     prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
>     prev->next = n; // serialization-point wrt consumer
>     return was_empty;
> }
>
> mpscq_node_t* spscq_pop(mpscq_t* self)
> {
>     mpscq_node_t* tail = self->tail;
> l_retry:
>     // fast-path is not modified
>     mpscq_node_t* next = tail->next; // serialization-point wrt producers
>     if (next)
>     {
>         self->tail = next;
>         tail->state = next->state;
>         return tail; //This should be replaced with return next;
>     }
>     mpscq_node_t* head = self->head;
>     // if head is marked as empty,
>     // then the queue had not be scheduled in the first place
>     assert(((uintptr_t)head & 1) == 0);
>     if (tail != head)
>     {
>         // there is just a temporal gap -> wait for the producer to update
> 'next' link
>         while (tail->next == 0)
>             usleep(10);
>         goto l_retry;
>     }
>     else
>     {
>         // the queue seems to be really empty -> try to mark it as empty
>         mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
>         if (CAS(&self->head, tail, xchg))
>             // if we succesfully marked it as empty -> return
>             // the following producer will re-schedule the queue for
> execution
>             return 0;
>         // producer had enqueued new item
>         goto l_retry;
>     }
> }
>
> int main()
> {
>     const int MAX = 20;
>     mpscq_t q;
>     mpscq_create(&q, new mpscq_node_t);
>
>     mpscq_node_t* n = 0;
>     //n = spscq_pop(&q);
>
>     printf("Push: ");
>     mpscq_node_t* nodes[MAX];
>     for(int i =0; i < MAX; i++){
>         nodes[i] = new mpscq_node_t;
>         nodes[i]->next = 0;
>         spscq_push(&q, nodes[i]);
>         printf("%p, ", nodes[i]);
>     }
>     printf("\nPoP: ");
>
>     for(int i =0; i < MAX+5; i++)
>     {
>         n = spscq_pop(&q);
>         if(n !=0)
>             printf("%p, ", n);
>         else
>             printf("\nNothing to pop!\n");
>     }
> }
>
> ================================================================================
>
> And here is the output:
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> Push: 0x91e030, 0x91e050, 0x91e070, 0x91e090, 0x91e0b0, 0x91e0d0, 0x91e0f0,
> 0x91e110, 0x91e130, 0x91e150, 0x91e170, 0x91e190, 0x91e1b0, 0x91e1d0,
> 0x91e1f0, 0x91e210, 0x91e230, 0x91e250, 0x91e270, 0x91e290,
> PoP:   0x91e010, 0x91e030, 0x91e050, 0x91e070, 0x91e090, 0x91e0b0, 0x91e0d0,
> 0x91e0f0, 0x91e110, 0x91e130, 0x91e150, 0x91e170, 0x91e190, 0x91e1b0,
> 0x91e1d0, 0x91e1f0, 0x91e210, 0x91e230, 0x91e250, 0x91e270,
> Nothing to pop!
>
> Nothing to pop!
>
> Nothing to pop!
>
> Nothing to pop!
>
> Nothing to pop!
>
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
>
> The stub item is being popped and the last item is not popped out of the
> queue. I just went over the code again and replacing the highlighted code
> above ( return tail;) with (return next;) would solve the problem.
>
> Cheers,
> Saman
>
>
> In your original implementation here, the stub item is
> On Thursday, 10 March 2016 12:54:22 UTC-5, Dmitry Vyukov wrote:
>>
>> Hi Saman,
>>
>> Please post full source code and point to the exact thing that should
>> not happen. I've already lost any context of this.
>>
>> Thanks
>>
>>
>> On Thu, Mar 10, 2016 at 6:47 PM, Saman Barghi <sam...@gmail.com> wrote:
>> > Hi Dmitriy,
>> >
>> > I got here from your blog post. Just wanted to point out that since the
>> > queue needs at least one item to stay in the queue for it to work
>> > properly,
>> > the last item in the queue stays in the queue until another item
>> > arrives.
>> > Trying to pop the last item return 0 although there is 1 item left in
>> > the
>> > queue. In your blog post
>> > you have a stub member to replace the last item, but here the stub item
>> > is
>> > being popped too. Check with this code:
>> >
>> > int main()
>> > {
>> >     const int MAX = 20;
>> >     mpscq_t q;
>> >     mpscq_create(&q, new mpscq_node_t);
>> >
>> >     mpscq_node_t* n = 0;
>> >     //n = spscq_pop(&q);
>> >
>> >     mpscq_node_t* nodes[MAX];
>> >     for(int i =0; i < MAX; i++){
>> >         nodes[i] = new mpscq_node_t;
>> >         nodes[i]->next = 0;
>> >         spscq_push(&q, nodes[i]);
>> >         printf("%p, ", nodes[i]);
>> >     }
>> >     printf("\n");
>> >
>> >     for(int i =0; i < MAX+5; i++)
>> >     {
>> >         n = spscq_pop(&q);
>> >         if(n !=0)
>> >             printf("%p, ", n);
>> >         else
>> >             printf("\nNothing to pop!\n");
>> >     }
>> > }
>> >
>> >
>> > Thanks,
>> > Saman
>> >
>> >
>> > On Friday, 12 November 2010 03:42:14 UTC-5, Dmitry Vyukov wrote:
>> >>
>> >> On Nov 12, 4:05 am, Pierre Habouzit <pierre.habou...@intersec-
>> >> group.com> wrote:
>> >> > I'm using a work-stealing scheduler, with a per thread bounded
>> >> > dequeue
>> >> > that can be pushed/poped at the back from the local thread, and
>> >> > popped
>> >> > from the front by thieves. Yes this means that my tasks run in LIFO
>> >> > order most of the time, and I'm fine with that.
>> >> >
>> >> > When a task is queued in a full dequeue then the task is run
>> >> > immediately instead of beeing queued, and when thieves cannot steal
>> >> > anything they are blocked using an eventcount.
>> >> >
>> >> > In addition to that, I'd like to have serial queues of tasks that
>> >> > ensure that:
>> >> > - tasks are run in a full FIFO fashion;
>> >> > - at most one of their task can be run at the same time.
>> >> > Though tasks from different serial queues can be run at the same time
>> >> > without problem. The idea is that a queue allows to protect a
>> >> > resource
>> >> > (think file, GUI, ...).
>> >> >
>> >> > My requirements are that:
>> >> > - queues are registered at most once in the scheduler at any time;
>> >> > - non empty queues are always registered within the scheduler;
>> >> > - empty queues are not registered in the scheduler (though it is
>> >> > acceptable that just after a queue becomes empty it remains
>> >> > registered
>> >> > when a race occurs, as long as it eventually finds its way out)
>> >> >
>> >> > I'm using a setup very similar to the low overhead mpsc queue, that
>> >> > I've tweaked this way:
>> >> >
>> >> > enum {
>> >> >     QUEUE_EMPTY,
>> >> >     QUEUE_NOT_EMPTY,
>> >> >     QUEUE_RUNNING,
>> >> >
>> >> > };
>> >> >
>> >> > struct mpsc_queue_t {
>> >> >     ... /* the usual stuff */
>> >> >     volatile unsigned state;
>> >> >     job_t run_queue;
>> >> >
>> >> > };
>> >> >
>> >> > struct mpsc_node_t {
>> >> >     ... /* the usual stuff */
>> >> >     job_t *job;
>> >> >
>> >> > };
>> >> >
>> >> > mpsc_node_t *mpsc_queue_push(mpsc_queue_t *q, job_t *job)
>> >> > {
>> >> >     mpsc_node_t *n = mpsc_node_alloc(); /* uses a per-thread cache */
>> >> >
>> >> >     n->job = job;
>> >> >     mpsc_queue_push(q, n);
>> >> >     if (XCHG(&q->state, QUEUE_NOT_EMPTY)) == QUEUE_EMPTY)
>> >> >         schedule_job(&q->run_queue); /* points to queue_run */
>> >> >
>> >> > }
>> >> >
>> >> > void queue_run(job_t *job)
>> >> > {
>> >> >     mpsc_queue_t *q = container_of(job, mpsc_queue_t, run_queue);
>> >> >     mpsc_node_t *n;
>> >> >
>> >> >     do {
>> >> >         XCHG(&q->state, QUEUE_RUNNING);
>> >> >
>> >> >         while ((n = mpsc_queue_pop(q))) {
>> >> >             job_t *job = n->job;
>> >> >
>> >> >             mpsc_node_free(n); /* releases in cache first, so that
>> >> > it's hot if quickly reused */
>> >> >             job_run(job);
>> >> >         }
>> >> >     } while (!CAS(&q->state, QUEUE_RUNNING, QUEUE_EMPTY));
>> >> >
>> >> > }
>> >> >
>> >> > I think this works, but I'm also pretty sure that this is spoiling
>> >> > the
>> >> > underlying mpsc a lot, because I create contention on q->state for
>> >> > everyone involved which is ugly, not to mention the ugly loop in the
>> >> > consumer. So, is there someone that has any idea on how to make that
>> >> > idea better, or does it looks like it's good enough ?
>> >> >
>> >> > Note: since the scheduler uses a LIFO that registering into the
>> >> > scheduler from queue_run() will just do the same as the loop on the
>> >> > CAS because jobs run in LIFO order most of the time, so it just hides
>> >> > the loop in the scheduler, and it still doesn't fix the contention
>> >> > due
>> >> > to the additionnal XCHG in the _push operation :/
>> >>
>> >> Hi Pierre,
>> >>
>> >> Yes, it's a way too much overhead for RUNNABLE/NONRUNNABLE detection.
>> >> It can be done with ~1 cycle overhead for producers and 0 cycle
>> >> overhead for consumer's fast-path. The idea is that we only need to
>> >> track state changes between RUNNABLE<->NONRUNNABLE, moreover on
>> >> producer's side it can be combined with the XCHG in enqueue(), while
>> >> on consumer's side we need an additional RMW which is executed only
>> >> when queue become empty. The key is that we can use low bit of queue
>> >> tail pointer as RUNNABLE/NONRUNNABLE flag.
>> >>
>> >> Here is the code (and yes, here I still call queue's tail as 'head'):
>> >>
>> >> template<typename T>
>> >> T XCHG(T volatile* dest, T value)
>> >> {
>> >>     return (T)_InterlockedExchange((long*)dest, (long)value);
>> >> }
>> >>
>> >> template<typename T>
>> >> bool CAS(T volatile* dest, T cmp, T xchg)
>> >> {
>> >>     return cmp == (T)_InterlockedCompareExchange((long*)dest,
>> >> (long)xchg, (long)cmp);
>> >> }
>> >>
>> >> struct mpscq_node_t
>> >> {
>> >>     mpscq_node_t* volatile  next;
>> >>     void*                   state;
>> >> };
>> >>
>> >> struct mpscq_t
>> >> {
>> >>     mpscq_node_t* volatile  head;
>> >>     mpscq_node_t*           tail;
>> >> };
>> >>
>> >> void mpscq_create(mpscq_t* self, mpscq_node_t* stub)
>> >> {
>> >>     stub->next = 0;
>> >>     self->tail = stub;
>> >>     // mark it as empty
>> >>     self->head = (mpscq_node_t*)((uintptr_t)stub | 1);
>> >> }
>> >>
>> >> bool spscq_push(mpscq_t* self, mpscq_node_t* n)
>> >> {
>> >>     n->next = 0;
>> >>     // serialization-point wrt producers
>> >>     mpscq_node_t* prev = XCHG(&self->head, n);
>> >>     // only 2 AND instructions on fast-path added
>> >>     bool was_empty = ((uintptr_t)prev & 1) != 0;
>> >>     prev = (mpscq_node_t*)((uintptr_t)prev & ~1);
>> >>     prev->next = n; // serialization-point wrt consumer
>> >>     return was_empty;
>> >> }
>> >>
>> >> mpscq_node_t* spscq_pop(mpscq_t* self)
>> >> {
>> >>     mpscq_node_t* tail = self->tail;
>> >> l_retry:
>> >>     // fast-path is not modified
>> >>     mpscq_node_t* next = tail->next; // serialization-point wrt
>> >> producers
>> >>     if (next)
>> >>     {
>> >>         self->tail = next;
>> >>         tail->state = next->state;
>> >>         return tail;
>> >>     }
>> >>     mpscq_node_t* head = self->head;
>> >>     // if head is marked as empty,
>> >>     // then the queue had not be scheduled in the first place
>> >>     assert(((uintptr_t)head & 1) == 0);
>> >>     if (tail != head)
>> >>     {
>> >>         // there is just a temporal gap -> wait for the producer to
>> >> update 'next' link
>> >>         while (tail->next == 0)
>> >>             _mm_pause();
>> >>         goto l_retry;
>> >>     }
>> >>     else
>> >>     {
>> >>         // the queue seems to be really empty -> try to mark it as
>> >> empty
>> >>         mpscq_node_t* xchg = (mpscq_node_t*)((uintptr_t)tail | 1);
>> >>         if (CAS(&self->head, tail, xchg))
>> >>             // if we succesfully marked it as empty -> return
>> >>             // the following producer will re-schedule the queue for
>> >> execution
>> >>             return 0;
>> >>         // producer had enqueued new item
>> >>         goto l_retry;
>> >>     }
>> >> }
>> >>
>> >> int main()
>> >> {
>> >>     mpscq_t q;
>> >>     mpscq_create(&q, new mpscq_node_t);
>> >>
>> >>     mpscq_node_t* n = 0;
>> >>     //n = spscq_pop(&q);
>> >>
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     n = spscq_pop(&q);
>> >>     n = spscq_pop(&q);
>> >>
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     n = spscq_pop(&q);
>> >>     n = spscq_pop(&q);
>> >>     n = spscq_pop(&q);
>> >>
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     n = spscq_pop(&q);
>> >>     spscq_push(&q, new mpscq_node_t);
>> >>     n = spscq_pop(&q);
>> >>     n = spscq_pop(&q);
>> >>     n = spscq_pop(&q);
>> >> }
>> >>
>> >> Now, whenever spscq_push() returns true, the thread has to put it into
>> >> scheduler for execution. Once the queue scheduled for execution, a
>> >> thread pops from the queue until it returns 0.
>> >> The nice thing is that consumer is not obliged to process all items in
>> >> a queue. It can process for example 100 items, and then just return
>> >> the queue to scheduler, and switch to other starving queues.
>> >> Also with regard to that _mm_pause() loop in pop(), instead of waiting
>> >> for producer to restore 'next' pointer, consumer can decide to just
>> >> return the queue to scheduler. Then handle some other queues, and when
>> >> he will back the first queue, the 'next' link will be most likely
>> >> already restored. So no senseless waiting.
>
> --
>
> ---
> You received this message because you are subscribed to the Google Groups
> "Scalable Synchronization Algorithms" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to lock-free+unsubscr...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/lock-free/6f655220-1af6-41a8-bb67-a37350803a85%40googlegroups.com.
>
> For more options, visit https://groups.google.com/d/optout.



-- 
Dmitry Vyukov

All about lockfree/waitfree algorithms, multicore, scalability,
parallel computing and related topics:
http://www.1024cores.net

-- 

--- 
You received this message because you are subscribed to the Google Groups 
"Scalable Synchronization Algorithms" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to lock-free+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/lock-free/CAEeQi3uTNPFMGm2SfhasixbcmJFXgfx0WcwrF_Tf0KM540FMwA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to