On Thu, Aug 09, 2012 at 04:46:26PM +0800, Lai Jiangshan wrote: > Some guys would be surprised by this fact: > There are already TWO implementations of wfqueue in urcu. > > The first one is in urcu/static/wfqueue.h: > 1) enqueue: exchange the tail and then update previous->next > 2) dequeue: wait for first node's next pointer and them shift, a dummy node > is introduced to avoid the queue->tail become NULL when shift. > > The second one shares some code with the first one, and the left code > are spreading in urcu-call-rcu-impl.h: > 1) enqueue: share with the first one > 2) no dequeue operation: and no shift, so it don't need dummy node, > Although the dummy node is queued when initialization, but it is removed > after the first dequeue_all operation in call_rcu_thread(). > call_rcu_data_free() forgets to handle the dummy node if it is not > removed. > 3)dequeue_all: record the old head and tail, and queue->head become the > special > tail node.(atomic record the tail and change the tail). > > The second implementation's code are spreading, bad for review, and it is not > tested by tests/test_urcu_wfq. > > So we need a better implementation avoid the dummy node dancing and can > service > both generic wfqueue APIs and dequeue_all API for call rcu. > > The new implementation: > 1) enqueue: share with the first one/original implementation. > 2) dequeue: shift when node count >= 2, cmpxchg when node count = 1. > no dummy node, save memory. > 3) dequeue_all: simply set queue->head.next to NULL, xchg the tail > and return the old head.next.
Looks good at first glance! The reason for xchg and the dummy node is old history involving parallel systems based on 80386, which did not have a cmpxchg instruction. ;-) Thanx, Paul > More implementation details are in the code. > tests/test_urcu_wfq will be update in future for testing new APIs. > > > Signed-off-by: Lai Jiangshan <la...@cn.fujitsu.com> > --- > urcu-call-rcu-impl.h | 50 ++++++++++-------------- > urcu/static/wfqueue.h | 104 ++++++++++++++++++++++++++++++++++++------------ > urcu/wfqueue.h | 25 ++++++++++-- > wfqueue.c | 29 ++++++++++++++ > 4 files changed, 149 insertions(+), 59 deletions(-) > > diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h > index 13b24ff..dbfb410 100644 > --- a/urcu-call-rcu-impl.h > +++ b/urcu-call-rcu-impl.h > @@ -221,7 +221,7 @@ static void *call_rcu_thread(void *arg) > { > unsigned long cbcount; > struct cds_wfq_node *cbs; > - struct cds_wfq_node **cbs_tail; > + struct cds_wfq_node *cbs_tail; > struct call_rcu_data *crdp = (struct call_rcu_data *)arg; > struct rcu_head *rhp; > int rt = !!(uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT); > @@ -243,24 +243,18 @@ static void *call_rcu_thread(void *arg) > cmm_smp_mb(); > } > for (;;) { > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) > - poll(NULL, 0, 1); > - _CMM_STORE_SHARED(crdp->cbs.head, NULL); > - cbs_tail = (struct cds_wfq_node **) > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); > + cbs = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &cbs_tail); > + if (cbs) { > synchronize_rcu(); > cbcount = 0; > do { > - while (cbs->next == NULL && > - &cbs->next != cbs_tail) > - poll(NULL, 0, 1); > - if (cbs == &crdp->cbs.dummy) { > - cbs = cbs->next; > - continue; > - } > rhp = (struct rcu_head *)cbs; > - cbs = cbs->next; > + > + if (cbs != cbs_tail) > + cbs = __cds_wfq_node_sync_next(cbs); > + else > + cbs = NULL; > + > rhp->func(rhp); > cbcount++; > } while (cbs != NULL); > @@ -270,8 +264,7 @@ static void *call_rcu_thread(void *arg) > break; > rcu_thread_offline(); > if (!rt) { > - if (&crdp->cbs.head > - == _CMM_LOAD_SHARED(crdp->cbs.tail)) { > + if (cds_wfq_empty(&crdp->cbs)) { > call_rcu_wait(crdp); > poll(NULL, 0, 10); > uatomic_dec(&crdp->futex); > @@ -625,32 +618,31 @@ void call_rcu(struct rcu_head *head, > */ > void call_rcu_data_free(struct call_rcu_data *crdp) > { > - struct cds_wfq_node *cbs; > - struct cds_wfq_node **cbs_tail; > - struct cds_wfq_node **cbs_endprev; > + struct cds_wfq_node *head, *tail; > > if (crdp == NULL || crdp == default_call_rcu_data) { > return; > } > + > if ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == 0) { > uatomic_or(&crdp->flags, URCU_CALL_RCU_STOP); > wake_call_rcu_thread(crdp); > while ((uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOPPED) == > 0) > poll(NULL, 0, 1); > } > - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { > - while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) > - poll(NULL, 0, 1); > - _CMM_STORE_SHARED(crdp->cbs.head, NULL); > - cbs_tail = (struct cds_wfq_node **) > - uatomic_xchg(&crdp->cbs.tail, &crdp->cbs.head); > + > + if (!cds_wfq_empty(&crdp->cbs)) { > + head = __cds_wfq_dequeue_all_blocking(&crdp->cbs, &tail); > + assert(head); > + > /* Create default call rcu data if need be */ > (void) get_default_call_rcu_data(); > - cbs_endprev = (struct cds_wfq_node **) > - uatomic_xchg(&default_call_rcu_data, cbs_tail); > - *cbs_endprev = cbs; > + > + __cds_wfq_append_list(&default_call_rcu_data->cbs, head, tail); > + > uatomic_add(&default_call_rcu_data->qlen, > uatomic_read(&crdp->qlen)); > + > wake_call_rcu_thread(default_call_rcu_data); > } > > diff --git a/urcu/static/wfqueue.h b/urcu/static/wfqueue.h > index 636e1af..15ea9fc 100644 > --- a/urcu/static/wfqueue.h > +++ b/urcu/static/wfqueue.h > @@ -10,6 +10,7 @@ > * dynamically with the userspace rcu library. > * > * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoy...@efficios.com> > + * Copyright 2011-2012 - Lai Jiangshan <la...@cn.fujitsu.com> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -29,6 +30,7 @@ > #include <pthread.h> > #include <assert.h> > #include <poll.h> > +#include <stdbool.h> > #include <urcu/compiler.h> > #include <urcu/uatomic.h> > > @@ -38,8 +40,6 @@ extern "C" { > > /* > * Queue with wait-free enqueue/blocking dequeue. > - * This implementation adds a dummy head node when the queue is empty to > ensure > - * we can always update the queue locklessly. > * > * Inspired from half-wait-free/half-blocking queue implementation done by > * Paul E. McKenney. > @@ -57,31 +57,43 @@ static inline void _cds_wfq_init(struct cds_wfq_queue *q) > { > int ret; > > - _cds_wfq_node_init(&q->dummy); > /* Set queue head and tail */ > - q->head = &q->dummy; > - q->tail = &q->dummy.next; > + _cds_wfq_node_init(&q->head); > + q->tail = &q->head; > ret = pthread_mutex_init(&q->lock, NULL); > assert(!ret); > } > > -static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, > - struct cds_wfq_node *node) > +static inline bool _cds_wfq_empty(struct cds_wfq_queue *q) > { > - struct cds_wfq_node **old_tail; > + /* > + * Queue is empty if no node is pointed by q->head.next nor q->tail. > + */ > + return q->head.next == NULL && CMM_LOAD_SHARED(q->tail) == &q->head; > +} > > +static inline void ___cds_wfq_append_list(struct cds_wfq_queue *q, > + struct cds_wfq_node *head, struct cds_wfq_node *tail) > +{ > /* > * uatomic_xchg() implicit memory barrier orders earlier stores to data > * structure containing node and setting node->next to NULL before > * publication. > */ > - old_tail = uatomic_xchg(&q->tail, &node->next); > + tail = uatomic_xchg(&q->tail, tail); > + > /* > - * At this point, dequeuers see a NULL old_tail->next, which indicates > + * At this point, dequeuers see a NULL tail->next, which indicates > * that the queue is being appended to. The following store will append > * "node" to the queue from a dequeuer perspective. > */ > - CMM_STORE_SHARED(*old_tail, node); > + CMM_STORE_SHARED(tail->next, head); > +} > + > +static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, > + struct cds_wfq_node *node) > +{ > + ___cds_wfq_append_list(q, node, node); > } > > /* > @@ -120,27 +132,46 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > { > struct cds_wfq_node *node, *next; > > - /* > - * Queue is empty if it only contains the dummy node. > - */ > - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next) > + if (_cds_wfq_empty(q)) > return NULL; > - node = q->head; > > - next = ___cds_wfq_node_sync_next(node); > + node = ___cds_wfq_node_sync_next(&q->head); > + > + if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { > + if (CMM_LOAD_SHARED(q->tail) == node) { > + /* > + * @node is the only node in the queue. > + * Try to move the tail to &q->head > + */ > + _cds_wfq_node_init(&q->head); > + if (uatomic_cmpxchg(&q->tail, node, &q->head) == node) > + return node; > + } > + next = ___cds_wfq_node_sync_next(node); > + } > > /* > * Move queue head forward. > */ > - q->head = next; > - /* > - * Requeue dummy node if we just dequeued it. > - */ > - if (node == &q->dummy) { > - _cds_wfq_node_init(node); > - _cds_wfq_enqueue(q, node); > - return ___cds_wfq_dequeue_blocking(q); > - } > + q->head.next = next; > + > + return node; > +} > + > +/* dequeue all nodes, the nodes are not synchronized for the next pointer */ > +static inline struct cds_wfq_node * > +___cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q, > + struct cds_wfq_node **tail) > +{ > + struct cds_wfq_node *node; > + > + if (_cds_wfq_empty(q)) > + return NULL; > + > + node = ___cds_wfq_node_sync_next(&q->head); > + _cds_wfq_node_init(&q->head); > + *tail = uatomic_xchg(&q->tail, &q->head); > + > return node; > } > > @@ -158,6 +189,27 @@ _cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) > return retnode; > } > > +static inline struct cds_wfq_node * > +_cds_wfq_dequeue_all_blocking(struct cds_wfq_queue *q, > + struct cds_wfq_node **tail) > +{ > + struct cds_wfq_node *node, *next; > + int ret; > + > + ret = pthread_mutex_lock(&q->lock); > + assert(!ret); > + node = ___cds_wfq_dequeue_all_blocking(q, tail); > + ret = pthread_mutex_unlock(&q->lock); > + assert(!ret); > + > + /* synchronize all nodes' next pointer */ > + next = node; > + while (next != *tail) > + next = ___cds_wfq_node_sync_next(next); > + > + return node; > +} > + > #ifdef __cplusplus > } > #endif > diff --git a/urcu/wfqueue.h b/urcu/wfqueue.h > index 03a73f1..985f540 100644 > --- a/urcu/wfqueue.h > +++ b/urcu/wfqueue.h > @@ -7,6 +7,7 @@ > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue > * > * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoy...@efficios.com> > + * Copyright 2011-2012 - Lai Jiangshan <la...@cn.fujitsu.com> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -25,6 +26,7 @@ > > #include <pthread.h> > #include <assert.h> > +#include <stdbool.h> > #include <urcu/compiler.h> > > #ifdef __cplusplus > @@ -33,8 +35,6 @@ extern "C" { > > /* > * Queue with wait-free enqueue/blocking dequeue. > - * This implementation adds a dummy head node when the queue is empty to > ensure > - * we can always update the queue locklessly. > * > * Inspired from half-wait-free/half-blocking queue implementation done by > * Paul E. McKenney. > @@ -45,8 +45,7 @@ struct cds_wfq_node { > }; > > struct cds_wfq_queue { > - struct cds_wfq_node *head, **tail; > - struct cds_wfq_node dummy; /* Dummy node */ > + struct cds_wfq_node head, *tail; > pthread_mutex_t lock; > }; > > @@ -56,18 +55,36 @@ struct cds_wfq_queue { > > #define cds_wfq_node_init _cds_wfq_node_init > #define cds_wfq_init _cds_wfq_init > +#define cds_wfq_empty _cds_wfq_empty > +#define __cds_wfq_append_list ___cds_wfq_append_list > #define cds_wfq_enqueue _cds_wfq_enqueue > #define __cds_wfq_dequeue_blocking ___cds_wfq_dequeue_blocking > #define cds_wfq_dequeue_blocking _cds_wfq_dequeue_blocking > +#define __cds_wfq_node_sync_next ___cds_wfq_node_sync_next > +#define __cds_wfq_dequeue_all_blocking ___cds_wfq_dequeue_all_blocking > +#define cds_wfq_dequeue_all_blocking _cds_wfq_dequeue_all_blocking > > #else /* !_LGPL_SOURCE */ > > extern void cds_wfq_node_init(struct cds_wfq_node *node); > extern void cds_wfq_init(struct cds_wfq_queue *q); > +extern bool cds_wfq_empty(struct cds_wfq_queue *q); > +/* __cds_wfq_append_list: caller ensures mutual exclusion between dequeues */ > +extern void __cds_wfq_append_list(struct cds_wfq_queue *q, > + struct cds_wfq_node *head, struct cds_wfq_node *tail); > extern void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node > *node); > /* __cds_wfq_dequeue_blocking: caller ensures mutual exclusion between > dequeues */ > extern struct cds_wfq_node *__cds_wfq_dequeue_blocking(struct cds_wfq_queue > *q); > extern struct cds_wfq_node *cds_wfq_dequeue_blocking(struct cds_wfq_queue > *q); > +extern struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node > *node); > +/* > + * __cds_wfq_dequeue_all_blocking: caller ensures mutual exclusion between > + * dequeues, and need synchronize next pointer berfore use it. > + */ > +extern struct cds_wfq_node *__cds_wfq_dequeue_all_blocking( > + struct cds_wfq_queue *q, struct cds_wfq_node **tail); > +extern struct cds_wfq_node *cds_wfq_dequeue_all_blocking( > + struct cds_wfq_queue *q, struct cds_wfq_node **tail); > > #endif /* !_LGPL_SOURCE */ > > diff --git a/wfqueue.c b/wfqueue.c > index 3337171..28a7b58 100644 > --- a/wfqueue.c > +++ b/wfqueue.c > @@ -4,6 +4,7 @@ > * Userspace RCU library - Queue with Wait-Free Enqueue/Blocking Dequeue > * > * Copyright 2010 - Mathieu Desnoyers <mathieu.desnoy...@efficios.com> > + * Copyright 2011-2012 - Lai Jiangshan <la...@cn.fujitsu.com> > * > * This library is free software; you can redistribute it and/or > * modify it under the terms of the GNU Lesser General Public > @@ -38,6 +39,17 @@ void cds_wfq_init(struct cds_wfq_queue *q) > _cds_wfq_init(q); > } > > +bool cds_wfq_empty(struct cds_wfq_queue *q) > +{ > + return _cds_wfq_empty(q); > +} > + > +void __cds_wfq_append_list(struct cds_wfq_queue *q, > + struct cds_wfq_node *head, struct cds_wfq_node *tail) > +{ > + return ___cds_wfq_append_list(q, head, tail); > +} > + > void cds_wfq_enqueue(struct cds_wfq_queue *q, struct cds_wfq_node *node) > { > _cds_wfq_enqueue(q, node); > @@ -52,3 +64,20 @@ struct cds_wfq_node *cds_wfq_dequeue_blocking(struct > cds_wfq_queue *q) > { > return _cds_wfq_dequeue_blocking(q); > } > + > +struct cds_wfq_node *__cds_wfq_node_sync_next(struct cds_wfq_node *node) > +{ > + return ___cds_wfq_node_sync_next(node); > +} > + > +struct cds_wfq_node *__cds_wfq_dequeue_all_blocking( > + struct cds_wfq_queue *q, struct cds_wfq_node **tail) > +{ > + return ___cds_wfq_dequeue_all_blocking(q, tail); > +} > + > +struct cds_wfq_node *cds_wfq_dequeue_all_blocking( > + struct cds_wfq_queue *q, struct cds_wfq_node **tail) > +{ > + return _cds_wfq_dequeue_all_blocking(q, tail); > +} > -- > 1.7.7 > _______________________________________________ lttng-dev mailing list lttng-dev@lists.lttng.org http://lists.lttng.org/cgi-bin/mailman/listinfo/lttng-dev