The current futex implementation is actually busy waiting on the list, because futex is never set to -1. This one instead piggybacks on the flags field, so that the futex can also be used for the STOP flag.
Signed-off-by: Paolo Bonzini <[email protected]> --- urcu-call-rcu-impl.h | 43 +++++++++++++++++++++++++------------------ urcu-call-rcu.h | 1 + 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/urcu-call-rcu-impl.h b/urcu-call-rcu-impl.h index ca597d0..9824515 100644 --- a/urcu-call-rcu-impl.h +++ b/urcu-call-rcu-impl.h @@ -45,8 +45,7 @@ struct call_rcu_data { struct cds_wfq_queue cbs; - unsigned long flags; - int futex; + int flags; pthread_t tid; int cpu_affinity; struct cds_list_head list; @@ -89,22 +88,18 @@ static long maxcpus; static void call_rcu_wait(struct call_rcu_data *crdp) { - /* Read call_rcu list before read futex */ - cmm_smp_mb(); - if (uatomic_read(&crdp->futex) == -1) - futex_async(&crdp->futex, FUTEX_WAIT, -1, - NULL, NULL, 0); + int flags; + for (;;) { + flags = uatomic_read(&crdp->flags); + if (flags & (URCU_CALL_RCU_BUSY | URCU_CALL_RCU_STOP)) + break; + futex_async(&crdp->flags, FUTEX_WAIT, flags, NULL, 0, 0); + } } static void call_rcu_wake_up(struct call_rcu_data *crdp) { - /* Write to call_rcu list before reading/writing futex */ - cmm_smp_mb(); - if (unlikely(uatomic_read(&crdp->futex) == -1)) { - uatomic_set(&crdp->futex, 0); - futex_async(&crdp->futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } + futex_async(&crdp->flags, FUTEX_WAKE, 1, NULL, NULL, 0); } /* Allocate the array if it has not already been allocated. */ @@ -209,7 +204,16 @@ static void *call_rcu_thread(void *arg) thread_call_rcu_data = crdp; for (;;) { - if (&crdp->cbs.head != _CMM_LOAD_SHARED(crdp->cbs.tail)) { + for (;;) { + if (&crdp->cbs.head + == _CMM_LOAD_SHARED(crdp->cbs.tail)) { + uatomic_and(&crdp->flags, ~URCU_CALL_RCU_BUSY); + /* Write flags before rechecking the list. */ + cmm_smp_mb(); + if (&crdp->cbs.head + == _CMM_LOAD_SHARED(crdp->cbs.tail)) + break; + } while ((cbs = _CMM_LOAD_SHARED(crdp->cbs.head)) == NULL) poll(NULL, 0, 1); _CMM_STORE_SHARED(crdp->cbs.head, NULL); @@ -229,13 +233,14 @@ static void *call_rcu_thread(void *arg) rhp->func(rhp); } while (cbs != NULL); } + /* Read call_rcu list before reading the flags. */ + cmm_smp_mb(); if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_STOP) break; if (uatomic_read(&crdp->flags) & URCU_CALL_RCU_RT) poll(NULL, 0, 10); else { - if (&crdp->cbs.head == _CMM_LOAD_SHARED(crdp->cbs.tail)) - call_rcu_wait(crdp); + call_rcu_wait(crdp); poll(NULL, 0, 10); } } @@ -262,7 +267,6 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp, } memset(crdp, '\0', sizeof(*crdp)); cds_wfq_init(&crdp->cbs); - crdp->futex = 0; crdp->flags = flags; cds_list_add(&crdp->list, &call_rcu_data_list); crdp->cpu_affinity = cpu_affinity; @@ -513,6 +517,9 @@ void call_rcu(struct rcu_head *head, head->func = func; crdp = get_call_rcu_data(); cds_wfq_enqueue(&crdp->cbs, &head->next); + /* Write list before writing the flags. */ + cmm_smp_mb(); + uatomic_or(&crdp->flags, URCU_CALL_RCU_BUSY); wake_call_rcu_thread(crdp); } diff --git a/urcu-call-rcu.h b/urcu-call-rcu.h index e76a844..bfd56ec 100644 --- a/urcu-call-rcu.h +++ b/urcu-call-rcu.h @@ -48,6 +48,7 @@ struct call_rcu_data; #define URCU_CALL_RCU_RUNNING 0x2 #define URCU_CALL_RCU_STOP 0x4 #define URCU_CALL_RCU_STOPPED 0x8 +#define URCU_CALL_RCU_BUSY 0x10 /* * The rcu_head data structure is placed in the structure to be freed -- 1.7.4.4 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
