On Thu, Dec 1, 2016 at 5:37 AM, Matias Elo <matias....@nokia.com> wrote: > Implement ordered locks using per lock atomic counters. The counter values > are compared to the queue’s atomic context to guarantee ordered locking. > Compared to the previous implementation this enables parallel processing of > ordered events outside of the lock context. > > Signed-off-by: Matias Elo <matias....@nokia.com> > --- > .../linux-generic/include/odp_queue_internal.h | 2 + > platform/linux-generic/odp_queue.c | 6 +++ > platform/linux-generic/odp_schedule.c | 49 > ++++++++++++++++++++-- > 3 files changed, 54 insertions(+), 3 deletions(-) > > diff --git a/platform/linux-generic/include/odp_queue_internal.h > b/platform/linux-generic/include/odp_queue_internal.h > index b905bd8..8b55de1 100644 > --- a/platform/linux-generic/include/odp_queue_internal.h > +++ b/platform/linux-generic/include/odp_queue_internal.h > @@ -59,6 +59,8 @@ struct queue_entry_s { > struct { > odp_atomic_u64_t ctx; /**< Current ordered context id */ > odp_atomic_u64_t next_ctx; /**< Next unallocated context id > */ > + /** Array of ordered locks */ > + odp_atomic_u64_t lock[CONFIG_QUEUE_MAX_ORD_LOCKS]; > } ordered ODP_ALIGNED_CACHE; > > enq_func_t enqueue ODP_ALIGNED_CACHE; > diff --git a/platform/linux-generic/odp_queue.c > b/platform/linux-generic/odp_queue.c > index 4c7f497..d9cb9f3 100644 > --- a/platform/linux-generic/odp_queue.c > +++ b/platform/linux-generic/odp_queue.c > @@ -77,8 +77,14 @@ static int queue_init(queue_entry_t *queue, const char > *name, > queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED; > > if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) { > + unsigned i; > + > odp_atomic_init_u64(&queue->s.ordered.ctx, 0); > odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0); > + > + for (i = 0; i < queue->s.param.sched.lock_count; i++) > + odp_atomic_init_u64(&queue->s.ordered.lock[i], > + 0); > } > } > queue->s.type = queue->s.param.type; > diff --git a/platform/linux-generic/odp_schedule.c > b/platform/linux-generic/odp_schedule.c > index 4b33513..c628142 100644 > --- a/platform/linux-generic/odp_schedule.c > +++ b/platform/linux-generic/odp_schedule.c > @@ -126,6 +126,15 @@ typedef struct { > int num; > } ordered_stash_t; > > +/* Ordered lock states */ > +typedef union { > + uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS]; > + uint32_t all; > +} lock_called_t; > + > +ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t), > + "Lock_called_values_do_not_fit_in_uint32"); > + > /* Scheduler local data */ > typedef struct { > int thr; > @@ -145,6 +154,7 @@ typedef struct { > ordered_stash_t stash[MAX_ORDERED_STASH]; > int stash_num; /**< Number of stashed enqueue operations */ > uint8_t in_order; /**< Order status */ > + lock_called_t lock_called; /**< States of ordered locks */ > } ordered; > > } sched_local_t; > @@ -553,12 +563,21 @@ static inline void ordered_stash_release(void) > > static inline void release_ordered(void) > { > + unsigned i; > queue_entry_t *queue; > > queue = sched_local.ordered.src_queue; > > wait_for_order(queue); > > + /* Release all ordered locks */ > + for (i = 0; i < queue->s.param.sched.lock_count; i++) { > + if (!sched_local.ordered.lock_called.u8[i]) > + odp_atomic_store_rel_u64(&queue->s.ordered.lock[i], > + sched_local.ordered.ctx + 1); > + } > + > + sched_local.ordered.lock_called.all = 0; > sched_local.ordered.src_queue = NULL; > sched_local.ordered.in_order = 0; > > @@ -923,19 +942,43 @@ static void order_unlock(void) > { > } > > -static void schedule_order_lock(unsigned lock_index ODP_UNUSED) > +static void schedule_order_lock(unsigned lock_index) > { > + odp_atomic_u64_t *ord_lock; > queue_entry_t *queue; > > queue = sched_local.ordered.src_queue; > > ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
Sorry, I should have been more precise. The staleness test I was referring to was to verify that the lock had not been previously used in this ordered context. In the current code that's the following assert (in odp_schedule_order.c) sync = sched_local.sync[lock_index]; sync_out = odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]); ODP_ASSERT(sync >= sync_out); The test above should be open code since it's a validity check on the call. The current code treats odp_schedule_order_lock/unlock as no-ops if we aren't running in an ordered context as that permits queue types to be moved from say ordered to atomic without having to change the application. > > - wait_for_order(queue); > + ord_lock = &queue->s.ordered.lock[lock_index]; > + > + /* Busy loop to synchronize ordered processing */ > + while (1) { > + uint64_t lock_seq; > + > + lock_seq = odp_atomic_load_acq_u64(ord_lock); In this logic that test would take the form: ODP_ASSERT(sched_local.ordered_ctx >= lock_seq); Though given the existence of lock_called.u8, perhaps ODP_ASSERT(sched_local.lordered.lock_called.ui[lock_index] == 0); would be equally effective. > + > + if (lock_seq == sched_local.ordered.ctx) { > + sched_local.ordered.lock_called.u8[lock_index] = 1; > + return; > + } > + odp_cpu_pause(); > + } > } > > -static void schedule_order_unlock(unsigned lock_index ODP_UNUSED) > +static void schedule_order_unlock(unsigned lock_index) > { > + odp_atomic_u64_t *ord_lock; > + queue_entry_t *queue; > + > + queue = sched_local.ordered.src_queue; > + > + ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count); > + > + ord_lock = &queue->s.ordered.lock[lock_index]; The corresponding staleness test here would be: ODP_ASSERT(sched_local.ordered_ctx == odp_atomic_load_acq_u64(ord_lock)); The point being that you want to detect erroneous duplicate lock acquires or frees, which can lead to very subtle bugs that are difficult to find. > + > + odp_atomic_store_rel_u64(ord_lock, sched_local.ordered.ctx + 1); > } > > static void schedule_pause(void) > -- > 2.7.4 >