From: Ola Liljedahl <ola.liljed...@arm.com>

                A Monkeys Can Code Production

                <*- Locks are for Lamers -*>

            A high performance SW scheduler for ODP


A queue and scheduler design attempting to use lock-free and lock-less
synchronisation where possible and to minimise ordering and synchronisation
between threads.

Optimised for ARM (specifically Cortex-A53) targets. Builds and runs on
x86 (-64) but no attempt to optimise performance here.

Simple performance benchmark, pushing 2048 events through 20 queues (which
takes a few milliseconds).
Avg cycles for single-event enqueue/schedule operations on Cortex-A53@1.5GHz
CPU's   atomic  parallel        ordered
 1      183     222             388
 2      254     282             450
 3      269     333             489

A presentation and discussion is scheduled for the ODP Design Sprint at
Linaro Connect Las Vegas.

Signed-off-by: Ola Liljedahl <ola.liljed...@arm.com>
---
 LICENSE     |   28 +
 Makefile    |  164 +++++
 llqueue.c   |  363 +++++++++++
 llsc.c      |  254 ++++++++
 scheduler.c | 2042 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 2851 insertions(+)
 create mode 100644 LICENSE
 create mode 100644 Makefile
 create mode 100644 llqueue.c
 create mode 100644 llsc.c
 create mode 100644 scheduler.c

diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..15fdb21
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,28 @@
+Copyright (c) 2016, ARM Limited. All rights reserved.
+
+SPDX-License-Identifier:       BSD-3-Clause
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice, this
+list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the name of ARM Limited nor the names of its contributors may be
+used to endorse or promote products derived from this software without specific
+prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..ac7cd6b
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,164 @@
+###############################################################################
+# Copyright (c) 2016, ARM Limited. All rights reserved.
+#
+# SPDX-License-Identifier:        BSD-3-Clause
+################################################################################
+
+###############################################################################
+# Project specific definitions
+################################################################################
+
+#Name of directory and also Dropbox source tar file
+DIRNAME = scheduler
+#List of executable files to build
+TARGETS = scheduler
+#List object files for each target
+OBJECTS_scheduler = scheduler.o
+
+#Customizable compiler and linker flags
+GCCTARGET =
+CCFLAGS += -mcx16#Required for CMPXCHG16 on x86
+#GCCTARGET = aarch64-linux-gnu
+#CCFLAGS += -mcpu=cortex-a53
+DEFINE += -DNDEBUG#disable assertions
+CCFLAGS += -std=c99
+CCFLAGS += -g -ggdb -Wall
+CCFLAGS += -O2 -fno-stack-check -fno-stack-protector
+LDFLAGS += -g -ggdb -pthread
+LIBS = -lrt
+
+#Where to find the source files
+VPATH += .
+
+#Default to non-verbose mode (echo command lines)
+VERB = @
+
+#Location of object and other derived/temporary files
+OBJDIR = obj#Must not be .
+
+###############################################################################
+# Make actions (phony targets)
+################################################################################
+
+.PHONY : default all clean tags etags
+
+default:
+       @echo "Make targets:"
+       @echo "all         build all targets ($(TARGETS))"
+       @echo "clean       remove derived files"
+       @echo "tags        generate vi tags file"
+       @echo "etags       generate emacs tags file"
+
+all : $(TARGETS)
+
+#Make sure we don't remove current directory with all source files
+ifeq ($(OBJDIR),.)
+$(error invalid OBJDIR=$(OBJDIR))
+endif
+ifeq ($(TARGETS),.)
+$(error invalid TARGETS=$(TARGETS))
+endif
+clean:
+       @echo "--- Removing derived files"
+       $(VERB)-rm -rf $(OBJDIR) $(TARGETS) tags TAGS perf.data perf.data.old
+
+tags :
+       $(VERB)ctags -R .
+
+etags :
+       $(VERB)ctags -e -R .
+
+################################################################################
+# Setup tool commands and flags
+################################################################################
+
+#Defaults to be overriden by compiler makefragment
+CCOUT = -o $@
+ASOUT = -o $@
+LDOUT = -o $@
+
+ifneq ($(GCCTARGET),)
+#Some experimental cross compiling support
+#GCCLIB = $(GCCROOT)/lib/gcc/$(GCCTARGET)/4.7.3
+GCCROOT = /opt/gcc-linaro-5.3-2016.02-x86_64_aarch64-linux-gnu
+GCCSETUP = PATH=$(GCCROOT)/bin:$(GCCROOT)/$(GCCTARGET)/bin:/bin:/usr/bin
+CC = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-gcc
+CXX = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++
+LD = $(GCCSETUP) $(GCCROOT)/bin/$(GCCTARGET)-g++
+else
+#Native compilation
+ifeq ($(CLANG),yes)
+CC = clang
+CXX = clang++
+AS = as
+LD = clang++
+else
+CC = gcc
+CXX = g++
+AS = as
+LD = g++
+endif
+endif
+#GROUPSTART = -Wl,--start-group
+#GROUPEND = -Wl,--end-group
+BIN2C = bin2c
+
+#Important compilation flags
+CCFLAGS += -c -MMD -MP
+
+################################################################################
+# Post-process some variables and definitions, generate dependencies
+################################################################################
+
+CCFLAGS += $(DEFINE) $(INCLUDE)
+#Generate list of all object files (for all targets)
+override OBJECTS := $(addprefix $(OBJDIR)/,$(foreach 
var,$(TARGETS),$(OBJECTS_$(var))))
+#Generate target:objects dependencies for all targets
+$(foreach target,$(TARGETS),$(eval $(target) : $$(addprefix 
$$(OBJDIR)/,$$(OBJECTS_$(target)))))
+#Special dependency for object files on object directory
+$(OBJECTS) : | $(OBJDIR)
+
+################################################################################
+# Build recipes
+################################################################################
+
+$(OBJDIR) :
+       $(VERB)mkdir -p $(OBJDIR)
+
+#Keep intermediate pcap C-files
+.PRECIOUS : $(OBJDIR)/%_pcap.c
+
+$(OBJDIR)/%_pcap.o : $(OBJDIR)/%_pcap.c
+       @echo "--- Compiling $<"
+       $(VERB)$(CC) $(CCFLAGS) $(CCOUT) $<
+
+$(OBJDIR)/%_pcap.c : %.pcap
+       @echo "--- Generating $@"
+       $(VERB)$(BIN2C) -n $(notdir $(basename $@)) -o $@ $<
+
+$(OBJDIR)/%.o : %.cc
+       @echo "--- Compiling $<"
+       $(VERB)$(CXX) $(CXXFLAGS) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) 
$<
+
+$(OBJDIR)/%.o : %.c
+       @echo "--- Compiling $<"
+       $(VERB)$(CC) $(CCFLAGS) $(CCFLAGS_$(basename $<)) $(CCOUT) $<
+
+$(OBJDIR)/%.o : %.s
+       @echo "--- Compiling $<"
+       $(VERB)$(AS) $(ASFLAGS) $(ASONLYFLAGS) $(ASOUT) $<
+
+$(OBJDIR)/%.o : %.S
+       @echo "--- Compiling $<"
+       $(VERB)$(CC) $(CCFLAGS) $(addprefix $(ASPREFIX),$(ASFLAGS)) $(CCOUT) $<
+
+$(TARGETS) :
+       @echo "--- Linking $@ from $(OBJECTS_$@) $(LIBS)"
+       $(VERB)$(LD) $(LDFLAGS) $(LDOUT) $(addprefix $(OBJDIR)/,$(OBJECTS_$@)) 
$(GROUPSTART) $(LIBS) $(GROUPEND) $(LDMAP)
+
+################################################################################
+# Include generated dependencies
+################################################################################
+
+-include $(patsubst %.o,%.d,$(OBJECTS))
+# DO NOT DELETE
diff --git a/llqueue.c b/llqueue.c
new file mode 100644
index 0000000..1fecab7
--- /dev/null
+++ b/llqueue.c
@@ -0,0 +1,363 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#undef likely
+#undef unlikely
+#if defined __GNUC__
+#define likely(x)    __builtin_expect(!!(x), 1)
+#define unlikely(x)  __builtin_expect(!!(x), 0)
+#else
+#define likely(x)    (x)
+#define unlikely(x)  (x)
+#endif
+
+/******************************************************************************
+ * Linked list queues
+ *****************************************************************************/
+
+struct llnode
+{
+    struct llnode *next;
+    uint32_t tag;//For consistency checks
+};
+
+union llht
+{
+    struct
+    {
+       struct llnode *head, *tail;
+    } st;
+    dintptr_t ui;
+};
+
+struct llqueue
+{
+    union llht u;
+//x86-64 seems faster using spin lock instead of CMPXCHG16
+    pthread_spinlock_t lock;
+};
+
+#define SENTINEL ((void *)~(uintptr_t)0)
+
+//static void llq_enqueue(struct llqueue *llq, struct llnode *node, uint32_t 
*numfailed) __attribute__((noinline));
+static inline void llq_enqueue(struct llqueue *llq, struct llnode *node, 
uint32_t *numfailed)
+{
+    union llht old;
+    assert(node->next == NULL);
+    node->next = SENTINEL;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+    old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+    __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+    (void)0;//Need statement after label
+#endif
+    union llht neu;
+    neu.st.head = old.st.head == NULL ? node : old.st.head;
+    neu.st.tail = node;
+#ifdef USE_LLSC
+    if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE)))
+#else
+    if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+                                           /*weak=*/false,
+                                           __ATOMIC_RELEASE,
+                                           __ATOMIC_RELAXED)))
+#endif
+    {
+       //Failed
+       doze();
+       if (numfailed != NULL)
+           (*numfailed)++;
+       goto retry;
+    }
+    if (old.st.tail != NULL)
+    {
+       //List was not empty
+       assert(old.st.tail->next == SENTINEL);
+       old.st.tail->next = node;
+    }
+}
+
+//static void llq_enqueue_l(struct llqueue *llq, struct llnode *node, uint32_t 
*numfailed) __attribute__((noinline));
+static inline void llq_enqueue_l(struct llqueue *llq, struct llnode *node, 
uint32_t *numfailed)
+{
+    assert(node->next == NULL);
+    node->next = SENTINEL;
+    pthread_spin_lock(&llq->lock);
+    if(llq->u.st.head == NULL)
+    {
+       llq->u.st.head = llq->u.st.tail = node;
+    }
+    else
+    {
+       llq->u.st.tail->next = node;
+       llq->u.st.tail = node;
+    }
+    pthread_spin_unlock(&llq->lock);
+}
+
+//static struct llnode *llq_dequeue(struct llqueue *llq, uint32_t *numfailed) 
__attribute__((noinline));
+static inline struct llnode *llq_dequeue(struct llqueue *llq, uint32_t 
*numfailed)
+{
+    struct llnode *head;
+
+    //llq_dequeue() may be used in a busy-waiting fashion
+    //Read head using plain load to avoid disturbing remote LL/SC
+    if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL)
+    {
+       return NULL;
+    }
+    //Read head->next before LL to minimize cache miss latency in LL/SC below
+    (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED);
+
+    union llht old;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+    old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+    __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+#endif
+    if (unlikely(old.st.head == NULL)) //Empty list
+    {
+       clrex();
+       return NULL;
+    }
+    else if (unlikely(old.st.head == old.st.tail))//Single-element in list
+    {
+       union llht neu;
+       neu.st.head = NULL;
+       neu.st.tail = NULL;
+#ifdef USE_LLSC
+       if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+       if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+                                               /*weak=*/false,
+                                               __ATOMIC_RELAXED,
+                                               __ATOMIC_RELAXED)))
+#endif
+       {
+           //Failed
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       assert(old.st.head->next == SENTINEL);
+    }
+    else//Multi-element list, dequeue head
+    {
+       struct llnode *next = __atomic_load_n(&old.st.head->next,
+                                             __ATOMIC_RELAXED);
+       //Check if llq_enqueue() has yet written true next pointer
+       if (unlikely(next == SENTINEL))
+       {
+           //Sorry, can't continue
+           clrex();
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       union llht neu;
+       neu.st.head = next;
+       neu.st.tail = old.st.tail;
+#ifdef USE_LLSC
+       if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+       if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+                                               /*weak=*/false,
+                                               __ATOMIC_RELAXED,
+                                               __ATOMIC_RELAXED)))
+#endif
+       {
+           //Failed
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       assert(old.st.head->next != SENTINEL);
+    }
+    old.st.head->next = NULL;
+    return old.st.head;
+}
+
+//static struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t 
*numfailed) __attribute__((noinline));
+static inline struct llnode *llq_dequeue_l(struct llqueue *llq, uint32_t 
*numfailed)
+{
+    struct llnode *head;
+    if ((head = __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED)) == NULL)
+    {
+       return NULL;
+    }
+
+    struct llnode *node = NULL;
+    pthread_spin_lock(&llq->lock);
+    if (llq->u.st.head != NULL)
+    {
+       node = llq->u.st.head;
+       if (llq->u.st.head == llq->u.st.tail)
+       {
+           assert(node->next == SENTINEL);
+           llq->u.st.head = llq->u.st.tail = NULL;
+       }
+       else
+       {
+           assert(node->next != SENTINEL);
+           llq->u.st.head = node->next;
+       }
+       node->next = NULL;
+    }
+    pthread_spin_unlock(&llq->lock);
+    return node;
+}
+
+static struct llnode *llq_dequeue_cond(struct llqueue *llq, struct llnode 
*exp, uint32_t *numfailed) __attribute__((always_inline));
+static inline struct llnode *llq_dequeue_cond(struct llqueue *llq, struct 
llnode *exp, uint32_t *numfailed)
+{
+    union llht old;
+#ifdef USE_LLSC
+retry: //Failed SC requires new LL
+    old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED);
+#else
+    __atomic_load(&llq->u, &old, __ATOMIC_RELAXED);
+retry: //Failed CAS returns existing value
+#endif
+    if (unlikely(old.st.head == NULL || old.st.head != exp)) //Empty list or 
wrong head
+    {
+       clrex();
+       return NULL;
+    }
+    else if (unlikely(old.st.head == old.st.tail))//Single-element in list
+    {
+       union llht neu;
+       neu.st.head = NULL;
+       neu.st.tail = NULL;
+#ifdef USE_LLSC
+       if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+       if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+                                               /*weak=*/false,
+                                               __ATOMIC_RELAXED,
+                                               __ATOMIC_RELAXED)))
+#endif
+       {
+           //Failed
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       assert(old.st.head->next == SENTINEL);
+    }
+    else//Multi-element list, dequeue head
+    {
+       struct llnode *next = __atomic_load_n(&old.st.head->next,
+                                             __ATOMIC_RELAXED);
+       //Check if llq_enqueue() has yet written true next pointer
+       if (unlikely(next == SENTINEL))
+       {
+           //Sorry, can't continue
+           clrex();
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       union llht neu;
+       neu.st.head = next;
+       neu.st.tail = old.st.tail;
+#ifdef USE_LLSC
+       if (unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED)))
+#else
+       if (unlikely(!__atomic_compare_exchange(&llq->u, &old, &neu,
+                                               /*weak=*/false,
+                                               __ATOMIC_RELAXED,
+                                               __ATOMIC_RELAXED)))
+#endif
+       {
+           //Failed
+           doze();
+           if (numfailed != NULL)
+               (*numfailed)++;
+           goto retry;
+       }
+       assert(old.st.head->next != SENTINEL);
+    }
+    old.st.head->next = NULL;
+    return old.st.head;
+}
+
+//static struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct llnode 
*exp, uint32_t *numfailed) __attribute__((noinline));
+static inline struct llnode *llq_dequeue_cond_l(struct llqueue *llq, struct 
llnode *exp, uint32_t *numfailed)
+{
+    struct llnode *node = NULL;
+    pthread_spin_lock(&llq->lock);
+    if (likely(llq->u.st.head != NULL && llq->u.st.head == exp))
+    {
+       node = llq->u.st.head;
+       if (llq->u.st.head == llq->u.st.tail)
+       {
+           assert(node->next == SENTINEL);
+           llq->u.st.head = llq->u.st.tail = NULL;
+       }
+       else
+       {
+           assert(node->next != SENTINEL);
+           llq->u.st.head = node->next;
+       }
+       node->next = NULL;
+    }
+    pthread_spin_unlock(&llq->lock);
+    return node;
+}
+
+static inline struct llnode *llq_head(struct llqueue *llq)
+{
+    return llq->u.st.head;
+}
+
+static inline uint32_t llq_assert(struct llqueue *llq)
+{
+    uint32_t nelems = 0;
+    struct llnode *node = llq->u.st.head;
+    if (node != NULL)
+    {
+        uint32_t tag = node->tag + 1;
+        node->tag = tag;
+        nelems++;
+        //Find last element in list
+        while (node->next != SENTINEL)
+        {
+            node = node->next;
+            assert(node->tag != tag);
+            node->tag = tag;
+            nelems++;
+        }
+        //Tail must point to last element
+        assert(llq->u.st.tail == node);
+    }
+    else//No elements in list
+    {
+        assert(llq->u.st.tail == NULL);
+    }
+    return nelems;
+}
+
+static void llqueue_init(struct llqueue *llq)
+{
+    llq->u.st.head = NULL;
+    llq->u.st.tail = NULL;
+    pthread_spin_init(&llq->lock, PTHREAD_PROCESS_PRIVATE);
+}
diff --git a/llsc.c b/llsc.c
new file mode 100644
index 0000000..b09d122
--- /dev/null
+++ b/llsc.c
@@ -0,0 +1,254 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#undef likely
+#undef unlikely
+#if defined __GNUC__
+#define likely(x)    __builtin_expect(!!(x), 1)
+#define unlikely(x)  __builtin_expect(!!(x), 0)
+#else
+#define likely(x)    (x)
+#define unlikely(x)  (x)
+#endif
+
+#define ALIGNED(x) __attribute__((__aligned__(x)))
+#define CACHE_LINE 64
+
+/******************************************************************************
+ * LL/SC primitives
+ *****************************************************************************/
+
+#if defined __ARM_ARCH && __ARM_ARCH == 7
+static inline void dmb()
+{
+    __asm __volatile("dmb" : : : "memory");
+}
+
+static inline uint32_t ll(uint32_t *var, int mm)
+{
+    uint32_t old;
+    __asm __volatile("ldrex %0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : );
+    //Barrier after an acquiring load
+    if (mm == __ATOMIC_ACQUIRE)
+       dmb();
+    return old;
+}
+#define ll32(a, b) ll((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc(uint32_t *var, uint32_t neu, int mm)
+{
+    uint32_t ret;
+    //Barrier before a releasing store
+    if (mm == __ATOMIC_RELEASE)
+       dmb();
+    __asm __volatile("strex %0, %1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : );
+    return ret;
+}
+#define sc32(a, b, c) sc((a), (b), (c))
+
+static inline uint64_t lld(uint64_t *var, int mm)
+{
+    uint64_t old;
+    __asm __volatile("ldrexd %0, %H0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : );
+    //Barrier after an acquiring load
+    if (mm == __ATOMIC_ACQUIRE)
+       dmb();
+    return old;
+}
+#define ll64(a, b) lld((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t scd(uint64_t *var, uint64_t neu, int mm)
+{
+    uint32_t ret;
+    //Barrier before a releasing store
+    if (mm == __ATOMIC_RELEASE)
+       dmb();
+    __asm __volatile("strexd %0, %1, %H1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : );
+    return ret;
+}
+#define sc64(a, b, c) scd((a), (b), (c))
+
+#endif
+
+#if defined __ARM_ARCH && __ARM_ARCH == 8
+static inline uint32_t ll32(uint32_t *var, int mm)
+{
+    uint32_t old;
+    if (mm == __ATOMIC_ACQUIRE)
+    __asm __volatile("ldaxr %w0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("ldxr %w0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : );
+    else
+       abort();
+    return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc32(uint32_t *var, uint32_t neu, int mm)
+{
+    uint32_t ret;
+    if (mm == __ATOMIC_RELEASE)
+    __asm __volatile("stlxr %w0, %w1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("stxr %w0, %w1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : );
+    else
+       abort();
+    return ret;
+}
+
+static inline uint64_t ll(uint64_t *var, int mm)
+{
+    uint64_t old;
+    if (mm == __ATOMIC_ACQUIRE)
+    __asm __volatile("ldaxr %0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("ldxr %0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : );
+    else
+       abort();
+    return old;
+}
+#define ll64(a, b) ll((a), (b))
+
+//Return 0 on success, 1 on failure
+static inline uint32_t sc(uint64_t *var, uint64_t neu, int mm)
+{
+    uint32_t ret;
+    if (mm == __ATOMIC_RELEASE)
+    __asm __volatile("stlxr %w0, %1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("stxr %w0, %1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : );
+    else
+       abort();
+    return ret;
+}
+#define sc64(a, b, c) sc((a), (b), (c))
+
+static inline __int128 lld(__int128 *var, int mm)
+{
+    __int128 old;
+    if (mm == __ATOMIC_ACQUIRE)
+    __asm __volatile("ldaxp %0, %H0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("ldxp %0, %H0, [%1]"
+                   : "=&r" (old)
+                   : "r" (var)
+                   : );
+    else
+       abort();
+    return old;
+}
+
+//Return 0 on success, 1 on failure
+static inline uint32_t scd(__int128 *var, __int128 neu, int mm)
+{
+    uint32_t ret;
+    if (mm == __ATOMIC_RELEASE)
+    __asm __volatile("stlxp %w0, %1, %H1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : "memory");
+    else if (mm == __ATOMIC_RELAXED)
+    __asm __volatile("stxp %w0, %1, %H1, [%2]"
+                   : "=&r" (ret)
+                   : "r" (neu), "r" (var)
+                   : );
+    else
+       abort();
+    return ret;
+}
+#endif
+
+//Clear exclusive monitor, used when LL is not followed by SC
+static inline void clrex(void)
+{
+#if defined __ARM_ARCH
+    __asm __volatile("clrex" : : : );
+#endif
+}
+
+static inline void sevl(void)
+{
+#if defined __ARM_ARCH
+    __asm __volatile("sevl" : : : );
+#endif
+}
+
+static inline void wfe(void)
+{
+#if defined __ARM_ARCH
+    __asm __volatile("wfe" : : : );
+#endif
+}
+
+static inline void doze(void)
+{
+#if defined __ARM_ARCH
+    //YIELD hints the CPU to switch to another thread if available
+    //but otherwise executes as a NOP
+//    __asm __volatile("yield" : : : "memory");
+    //ISB flushes the pipeline, then restarts. This is guaranteed to stall
+    //the CPU a number of cycles
+    __asm __volatile("isb" : : : "memory");
+#else
+    //Assume x86
+    __asm __volatile("pause" : : : "memory");
+#endif
+}
+
+//The scalar equivalent of a double pointer
+#if __SIZEOF_PTRDIFF_T__ == 4
+typedef uint64_t dintptr_t;
+#endif
+#if __SIZEOF_PTRDIFF_T__ == 8
+typedef __int128 dintptr_t;
+#endif
diff --git a/scheduler.c b/scheduler.c
new file mode 100644
index 0000000..7faee05
--- /dev/null
+++ b/scheduler.c
@@ -0,0 +1,2042 @@
+//Copyright (c) 2016, ARM Limited. All rights reserved.
+//
+//SPDX-License-Identifier:        BSD-3-Clause
+
+#define _GNU_SOURCE
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <inttypes.h>
+#include <limits.h>
+#include <pthread.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+//#define LOG
+
+#ifdef __ARM_ARCH
+#define USE_LLSC
+#endif
+
+#if defined __GNUC__
+#define likely(x)    __builtin_expect(!!(x), 1)
+#define unlikely(x)  __builtin_expect(!!(x), 0)
+#else
+#define likely(x)    (x)
+#define unlikely(x)  (x)
+#endif
+
+//Function to set breakpoint on
+void bp(void) __attribute((noinline));
+void bp(void)
+{
+}
+
+
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+
+//Enable for Cortex-A57!
+#if 0
+//Implement store-release (STLR) using DMB; STR (store-relaxed).
+//This alternative is interesting to test since it has proven more
+//performant in some cases on A57.
+//We implement this using a macro since it is used with different types of
+//parameters.
+#define far_atomic_store(_ptr, _val, _mo) \
+do \
+{ \
+    if ((_mo) == __ATOMIC_RELEASE) \
+    { \
+       __asm __volatile("dmb ishst" ::: "memory"); \
+       __atomic_store_n((_ptr), (_val), __ATOMIC_RELAXED); \
+    } \
+    else \
+       __atomic_store_n((_ptr), (_val), (_mo)); \
+} \
+while (0)
+#else
+#define far_atomic_store(_ptr, _val, _mo) \
+       __atomic_store_n((_ptr), (_val), (_mo))
+#endif
+
+//Possibly, store-release a ticket after CAS can use store-relaxed
+//Possibly, this has less overhead for the issuing thread
+#define __ATOMIC_RELEASE_AFTER_CAS __ATOMIC_RELEASE
+
+#define CAS_WEAK false
+
+static bool VERBOSE = false;
+
+static inline bool is_power_of_two(uint32_t n)
+{
+    return n != 0 && (n & (n - 1)) == 0;
+}
+
+//Thread priority and scheduling
+#define PRIO 1
+#define SCHED SCHED_FIFO
+//#define SCHED SCHED_OTHER
+
+#define ALIGNED(x) __attribute__((__aligned__(x)))
+#define CACHE_LINE 64
+
+/******************************************************************************
+ * Linked list queue and its LL/SC support
+ *****************************************************************************/
+
+#include "llsc.c"
+#include "llqueue.c"
+
+/******************************************************************************
+ * Type and forward declarations
+ *****************************************************************************/
+
+//Max 64 threads
+typedef uint64_t odp_thrmask_t;
+#define ODP_THRMASK_ALL ((uint64_t)~0ULL)
+
+typedef union
+{
+    struct
+    {
+       struct llqueue llq;
+       uint32_t prio;
+    };//Anonymous struct, access members directly
+    char dummy[CACHE_LINE];//Required so that sched_queue is size of alignment
+} sched_queue ALIGNED(CACHE_LINE);
+
+struct odp_event_s;
+typedef struct odp_event_s *odp_event_t;
+#define ODP_EVENT_INVALID ((odp_event_t)NULL)
+
+struct sched_obj;//Scheduler objects are the elements of the scheduler queues
+typedef struct sched_obj *odp_queue_t;//ODP queues are scheduler objects
+#define ODP_QUEUE_INVALID ((odp_queue_t)NULL)
+
+struct sched_group;
+typedef uint64_t sched_group_mask_t;
+#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT) //E.g. 64
+typedef uint32_t odp_schedule_group_t;//1..MAX_SCHED_GROUP
+#define ODP_SCHED_GROUP_INVALID 0
+
+static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp,
+                                                      uint32_t prio);
+
+#define NUM_PRIO 4 //High, medium, low and below priorities
+#define PRIO_MED (NUM_PRIO / 2)
+
+static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num);
+
+/*******************************************************************************
+ * Per thread state
+ 
******************************************************************************/
+
+struct reorder_context;
+struct reorder_window;
+struct odp_event_s;
+
+static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn);
+static void rwin_insert(struct reorder_window *rwin,
+                       struct reorder_context *rctx,
+                       uint32_t sn,
+                       void (*callback)(const struct reorder_context *));
+static struct odp_event_s *event_next_get(struct odp_event_s *evt);
+static odp_queue_t event_queue_get(struct odp_event_s *evt);
+static uint32_t event_number_get(struct odp_event_s *evt);
+
+struct reorder_context
+{
+    struct odp_event_s *head, *tail;//Linked list of deferred events
+    struct reorder_window *rwin;//Reorder window for source queue (or whatever)
+    uint32_t *rvec_free;//Pointer to TS->rvec_free
+    uint32_t sn;//Our slot in the reorder window
+    uint16_t idx;//Our index in thread_state rvec array
+    uint16_t olock_flags;
+} ALIGNED(CACHE_LINE);
+
+static inline void rctx_init(struct reorder_context *rctx, uint32_t 
*rvec_free, uint16_t idx, struct reorder_window *rwin)
+{
+    rctx->head = rctx->tail = NULL;
+    rctx->rwin = rwin;
+    rctx->rvec_free = rvec_free;
+    rctx->sn = 0;
+    rctx->idx = idx;
+    rctx->olock_flags = 0;
+    //Clear free bit
+    assert((*rctx->rvec_free & (1U << rctx->idx)) != 0);
+    __atomic_fetch_and(rctx->rvec_free, ~(1U << rctx->idx), __ATOMIC_RELAXED);
+}
+
+static inline void rctx_free(const struct reorder_context *rctx)
+{
+    assert(rctx->rwin != NULL);
+    //Set free bit
+    assert((*rctx->rvec_free & (1U << rctx->idx)) == 0);
+    //Relaxed order is OK since we haven't written to the reorder_context
+    __atomic_fetch_or(rctx->rvec_free, 1U << rctx->idx, __ATOMIC_RELAXED);
+}
+
+static void olock_release(const struct reorder_context *rctx);
+
+//rctx_retire may be called by any thread
+static void rctx_retire(const struct reorder_context *rctx)
+{
+    struct odp_event_s *evt = rctx->head;
+    while (likely(evt != NULL))
+    {
+       struct odp_event_s *next = event_next_get(evt);
+       //Prefetch next event
+       __builtin_prefetch(next, 0, 0);
+       int rc = odp_queue_enq(event_queue_get(evt), &evt, 1);
+       if (unlikely(rc != 1))
+       {
+           fprintf(stderr, "rctx_retire: failed to enqueue event %p/%u on 
queue %p\n", evt, event_number_get(evt), event_queue_get(evt));
+           fflush(NULL); abort();
+       }
+       evt = next;
+    }
+    olock_release(rctx);
+    rctx_free(rctx);
+}
+
+static inline void rctx_release(struct reorder_context *rctx)
+{
+    assert((*rctx->rvec_free & (1U << rctx->idx)) == 0);
+    //Insert reorder context into reorder window, potentially calling the
+    //rctx_retire function for all pending reorder_contexts
+    rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire);
+}
+
+#define TS_RVEC_SIZE 16
+
+struct thread_state
+{
+    struct sched_obj *atomq;//Atomic queue currently being processed or NULL
+    struct reorder_context *rctx;//Current reorder context or NULL
+    bool pause;
+    bool out_of_order;
+    uint32_t tidx;//Thread index
+    uint32_t ticket;//Ticket for atomic queue or TICKET_INVALID
+    uint32_t rvec_free;//Bitset of free entries in rvec
+    uint16_t num_schedq;
+    uint16_t sg_sem;//Set when sg_wanted is modified by other thread
+    sched_group_mask_t sg_actual[NUM_PRIO];//Current sched_group membership
+    sched_group_mask_t sg_wanted[NUM_PRIO];//Future sched_group membership
+#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * NUM_PRIO)
+    sched_queue *schedq_list[SCHEDQ_PER_THREAD];
+    struct reorder_context rvec[TS_RVEC_SIZE];
+} ALIGNED(CACHE_LINE);
+
+#define MAXTHREADS 32
+
+static struct thread_state thread_state[MAXTHREADS];
+static uint32_t NUMTHREADS = 2;
+static __thread struct thread_state *TS;
+
+static void thread_state_init(int tidx)
+{
+    struct thread_state *ts = &thread_state[tidx];
+    ts->atomq = ODP_QUEUE_INVALID;
+    ts->rctx = NULL;
+    ts->pause = false;
+    ts->out_of_order = false;
+    ts->tidx = tidx;
+    ts->rvec_free = 0;
+    assert(TS_RVEC_SIZE <= sizeof(ts->rvec_free) * CHAR_BIT);
+    ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1;
+    ts->num_schedq = 0;
+    ts->sg_sem = 1;//Start with sched group semaphore changed
+    memset(ts->sg_actual, 0, sizeof ts->sg_actual);
+    //clear ts->sg_wanted;//This might already have been set
+    TS = ts;
+}
+
+static void insert_schedq_in_list(struct thread_state *ts,
+                                 sched_queue *schedq)
+{
+    //Find slot for schedq
+    for (uint32_t i = 0; i < ts->num_schedq; i++)
+    {
+       //Higher value is higher priority and closer to start of list
+       if (schedq->prio >= ts->schedq_list[i]->prio)
+       {
+           //This is the slot!
+           sched_queue *tmp = ts->schedq_list[i];
+           ts->schedq_list[i] = schedq;
+           schedq = tmp;
+           //Continue the insertion procedure with the new schedq
+       }
+    }
+    //Insert schedq at end of list
+    if (ts->num_schedq == SCHEDQ_PER_THREAD)
+    {
+       fprintf(stderr, "insert_schedq_in_list: too many schedq's\n");
+       abort();
+    }
+    ts->schedq_list[ts->num_schedq++] = schedq;
+}
+
+static void remove_schedq_from_list(struct thread_state *ts,
+                                   sched_queue *schedq)
+{
+    //Find schedq
+    for (uint32_t i = 0; i < ts->num_schedq; i++)
+    {
+       if (ts->schedq_list[i] == schedq)
+       {
+           //Move remaining schedq's
+           for (uint32_t j = i + 1; j < ts->num_schedq; j++)
+           {
+               ts->schedq_list[j - 1] = ts->schedq_list[j];
+           }
+           ts->num_schedq--;
+           return;
+       }
+    }
+    //schedq not found, internal error
+    fprintf(stderr, "remove_schedq_from_list: schedq not found\n");
+    abort();
+}
+
+/******************************************************************************
+ * Scheduler queues
+ *****************************************************************************/
+
+typedef enum
+{
+    pktio, parallel_q, ordered_q, atomic_q
+} sched_obj_type;
+
+static inline void schedq_init(sched_queue *schedq, uint32_t prio)
+{
+    llqueue_init(&schedq->llq);
+    schedq->prio = prio;
+}
+
+static inline struct sched_obj *schedq_peek(sched_queue *schedq)
+{
+    return (struct sched_obj *)llq_head(&schedq->llq);
+}
+
+static bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj) 
__attribute__((always_inline));
+static inline bool schedq_cond_pop(sched_queue *schedq, struct sched_obj *obj)
+{
+    return llq_dequeue_cond(&schedq->llq, (struct llnode *)obj, NULL) ==
+          (struct llnode *)obj;
+}
+
+static void schedq_push(sched_queue *schedq,
+                       struct sched_obj *obj)
+{
+    llq_enqueue(&schedq->llq, (struct llnode *)obj, NULL);
+}
+
+/******************************************************************************
+ * ODP event
+ *****************************************************************************/
+
+struct odp_event_s
+{
+    struct odp_event_s *next;//Next pointer for linked list
+    odp_queue_t queue;//Queue this event is destined for
+    //Below are fields used by the application
+    unsigned fromqidx;
+    unsigned number;
+};
+
+static odp_event_t odp_event_alloc(void)
+{
+    struct odp_event_s *evt = aligned_alloc(CACHE_LINE,
+                                           sizeof(struct odp_event_s));
+    if (unlikely(evt == NULL))
+       return ODP_EVENT_INVALID;
+    return evt;
+}
+
+static inline struct odp_event_s *event_next_get(struct odp_event_s *evt)
+{
+    return evt->next;
+}
+
+static inline void event_next_set(struct odp_event_s *evt, struct odp_event_s 
*nxt)
+{
+    evt->next = nxt;
+}
+
+static inline odp_queue_t event_queue_get(struct odp_event_s *evt)
+{
+    return evt->queue;
+}
+
+static inline void event_queue_set(struct odp_event_s *evt, odp_queue_t q)
+{
+    evt->queue = q;
+}
+
+static inline uint32_t event_number_get(struct odp_event_s *evt)
+{
+    return evt->number;
+}
+
+/******************************************************************************
+ * Reorder window
+ *****************************************************************************/
+
+struct hc
+{
+    uint32_t head;//First missing context
+    uint32_t chgi;//Change indicator
+} ALIGNED(sizeof(uint64_t));
+
+#define RWIN_SIZE 32 //Should be at least one per CPU
+
+#define NUM_OLOCKS 2
+
+struct reorder_window
+{
+    struct hc hc;//head and chgi
+    uint32_t winmask;
+    uint32_t tail;
+    uint32_t turn;
+    uint16_t lock_count ALIGNED(CACHE_LINE);//Force new cache line
+    uint32_t olock[NUM_OLOCKS];
+    struct reorder_context *ring[RWIN_SIZE] ALIGNED(CACHE_LINE);//Force new 
cache line
+};
+
+static inline void olock_unlock(struct thread_state *ts, const struct 
reorder_context *rctx, struct reorder_window *rwin, unsigned lock_index)
+{
+    if ((rctx->olock_flags & (1U << lock_index)) == 0)
+    {
+       //Lock not used
+#ifdef LOG
+if (VERBOSE) printf("%u: release %p->olock[%u]=%u\n", TS->tidx, rwin, 
lock_index, rctx->sn + 1);
+#endif
+       //Use relaxed ordering, we are not releasing any updates
+       far_atomic_store(&rwin->olock[lock_index],
+                        rctx->sn + 1,
+                        __ATOMIC_RELAXED);
+    }
+}
+
+static void olock_release(const struct reorder_context *rctx)
+{
+    struct reorder_window *rwin = rctx->rwin;
+#ifdef LOG
+if (VERBOSE) printf("%u: release sn=%u %p->olock[0]=%u olock_flags=%x\n", 
TS->tidx, rctx->sn, rwin, rwin->olock[0], rctx->olock_flags);
+#endif
+    if (unlikely(rwin->lock_count != 0))
+    {
+       olock_unlock(TS, rctx, rwin, 0);
+       if (rwin->lock_count != 1)
+       {
+           olock_unlock(TS, rctx, rwin, 1);
+       }
+    }
+    assert(NUM_OLOCKS == 2);
+}
+
+static struct reorder_window *rwin_alloc(unsigned lock_count)
+{
+    assert(is_power_of_two(RWIN_SIZE));
+    struct reorder_window *rwin = aligned_alloc(CACHE_LINE, sizeof(struct 
reorder_window));
+    if (rwin != NULL)
+    {
+       assert(offsetof(struct reorder_window, hc) == 0);
+       assert(offsetof(struct reorder_window, lock_count) == CACHE_LINE);
+       assert(offsetof(struct reorder_window, ring) == 2 * CACHE_LINE);
+       rwin->hc.head = 0;
+       rwin->hc.chgi = 0;
+       rwin->winmask = RWIN_SIZE - 1;
+       rwin->tail = 0;
+       rwin->turn = 0;
+       rwin->lock_count = (uint16_t)lock_count;
+       memset(rwin->olock, 0, sizeof rwin->olock);
+       for (uint32_t i = 0; i < RWIN_SIZE; i++)
+           rwin->ring[i] = NULL;
+    }
+    return rwin;
+}
+
+static inline bool rwin_reserve(struct reorder_window *rwin, uint32_t *sn)
+{
+    uint32_t head, oldt, newt;
+    //Read head and tail separately
+#ifndef USE_LLSC
+    oldt = __atomic_load_n(&rwin->tail, __ATOMIC_RELAXED);
+#endif
+    do
+    {
+       head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED);
+#ifdef USE_LLSC
+       oldt = ll32(&rwin->tail, __ATOMIC_RELAXED);
+#endif
+       if (unlikely(oldt - head >= rwin->winmask))
+       {
+           return false;
+       }
+       newt = oldt + 1;
+    }
+#ifdef USE_LLSC
+    while (unlikely(sc32(&rwin->tail, newt, __ATOMIC_RELAXED)));
+#else
+    while (!__atomic_compare_exchange(&rwin->tail,
+               &oldt,
+               &newt,
+               CAS_WEAK,
+               __ATOMIC_RELAXED,
+               __ATOMIC_RELAXED));
+#endif
+    *sn = oldt;
+    return true;
+}
+
+static void rwin_insert(struct reorder_window *rwin,
+                       struct reorder_context *rctx,
+                       uint32_t sn,
+                       void (*callback)(const struct reorder_context *))
+{
+    struct hc old;
+    __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE);
+    uint32_t winmask = rwin->winmask;
+    if (old.head != sn)
+    {
+       //We are out-of-order
+       //Store context in reorder window, releasing its content
+       assert(rwin->ring[sn & winmask] == NULL);
+       __atomic_store_n(&rwin->ring[sn & winmask], rctx, __ATOMIC_RELEASE);
+       rctx = NULL;
+
+       do
+       {
+           struct hc new;
+           new.head = old.head;
+           new.chgi = old.chgi + 1;//Changed value
+           //Update head&chgi, fail if any has changed
+           if (__atomic_compare_exchange(&rwin->hc,
+                       &old,//Updated on failure
+                       &new,
+                       CAS_WEAK,
+                       __ATOMIC_RELEASE,//Release our ring update
+                       __ATOMIC_ACQUIRE))
+           {
+               //CAS succeeded => head same (we are not in-order), chgi updated
+               return;
+           }
+           //CAS failed => head and/or chgi changed
+           //We might not be out-of-order anymore
+       }
+       while (old.head != sn);
+       //old.head == sn => we are now in-order!
+    }
+
+    assert(old.head == sn);
+    //We are in-order so our responsibility to retire contexts
+    struct hc new;
+    new.head = old.head;
+    new.chgi = old.chgi + 1;//Changed value
+
+    //Retire our in-order context (if we still have it)
+    if (rctx != NULL)
+    {
+       callback(rctx);
+       new.head++;
+    }
+
+    //Retire in-order contexts in the ring
+    //The first context might actually be ours (if we were originally
+    //out-of-order)
+    do
+    {
+       for (;;)
+       {
+           rctx = __atomic_load_n(&rwin->ring[new.head & winmask],
+                                  __ATOMIC_ACQUIRE);
+           if (rctx == NULL)
+               break;
+           //We are the only thread that are in-order (until head updated)
+           //so don't have to use atomic load-and-clear (exchange)
+           rwin->ring[new.head & winmask] = NULL;
+           callback(rctx);
+           new.head++;
+       }
+    }
+    //Update head&chgi, fail if chgi has changed (head cannot change)
+    while (!__atomic_compare_exchange(&rwin->hc,
+                                     &old,//Updated on failure
+                                     &new,
+                                     /*weak=*/false,
+                                     __ATOMIC_RELEASE,//Release our ring 
updates
+                                     __ATOMIC_ACQUIRE));
+}
+
+/******************************************************************************
+ * sched_obj aka ODP queue
+ *****************************************************************************/
+
+//Number of events that can be stored in a queue
+#define RING_SIZE 2048
+
+typedef uint32_t ringidx_t;
+struct ringstate
+{
+    ringidx_t read;
+    ringidx_t write;
+} ALIGNED(8);
+#define RINGSIZE_MAX (1U << 31)
+
+struct sharedstate
+{
+    uint32_t numevts;
+    uint16_t cur_ticket;
+    uint16_t nxt_ticket;
+} ALIGNED(sizeof(uint32_t) * 2);
+#define TICKET_INVALID (uint32_t)(~0U)
+
+struct ring
+{
+    struct ringstate prod;
+    struct ringstate cons;
+    struct sharedstate shared;
+    uint32_t mask;
+    odp_event_t ring[RING_SIZE] ALIGNED(CACHE_LINE);
+};
+
+struct sched_obj//May actually be an ODP queue
+{
+    struct llnode node;
+    sched_queue *schedq;//Which schedq we belong to
+    sched_obj_type type;
+    void *user_ctx;
+    struct reorder_window *rwin;
+    struct ring queue;
+} ALIGNED(CACHE_LINE);
+
+static inline struct reorder_window *queue_rwin_get(const odp_queue_t q)
+{
+    return q->rwin;
+}
+
+static inline bool queue_is_empty(const odp_queue_t q)
+{
+    return q->queue.cons.read == q->queue.cons.write;
+}
+
+static inline ringidx_t ringstate_num_used(struct ringstate rs)
+{
+    return (ringidx_t)(rs.write - rs.read);
+}
+
+static inline ringidx_t ringstate_num_free(struct ringstate rs)
+{
+    return RING_SIZE - (ringidx_t)(rs.write - rs.read);
+}
+
+static odp_queue_t _odp_queue_create(uint32_t prio,
+                                    sched_obj_type sync,
+                                    odp_schedule_group_t group,
+                                    unsigned lock_count,
+                                    void *user_ctx)
+{
+    if (lock_count > (sync == ordered_q ? NUM_OLOCKS : 0))
+       return NULL;
+    odp_queue_t q = aligned_alloc(CACHE_LINE, sizeof(struct sched_obj));
+    if (q == NULL)
+       perror("aligned_alloc"), exit(EXIT_FAILURE);
+    q->schedq = schedq_from_sched_group(group, prio);
+    q->type = sync;
+    q->user_ctx = user_ctx;
+    assert(is_power_of_two(RING_SIZE));
+    q->queue.prod.read = 0;
+    q->queue.prod.write = 0;
+    q->queue.cons.read = 0;
+    q->queue.cons.write = 0;
+    q->queue.shared.numevts = 0;
+    q->queue.shared.cur_ticket = 0;
+    q->queue.shared.nxt_ticket = 0;
+    q->queue.mask = RING_SIZE - 1;
+    for (uint32_t i = 0; i < RING_SIZE; i++)
+    {
+       q->queue.ring[i] = ODP_EVENT_INVALID;
+    }
+    q->rwin = NULL;
+    if (sync == ordered_q)
+    {
+       q->rwin = rwin_alloc(lock_count);
+       if (q->rwin == NULL)
+           perror("rwin_alloc"), exit(EXIT_FAILURE);
+    }
+    assert(queue_is_empty(q));
+    return q;
+}
+
+static const char *qtype2str(odp_queue_t q)
+{
+    switch (q->type)
+    {
+       case pktio :
+           return "pktio";
+       case parallel_q :
+           return "parallel";
+       case ordered_q :
+           return "ordered";
+       case atomic_q :
+           return "atomic";
+    }
+    return "?";
+}
+
+static int odp_queue_enq(odp_queue_t q, const odp_event_t ev[], int num)
+{
+    struct thread_state *ts = TS;
+    if (unlikely(ts->out_of_order))//unlikely() improves performance for 
atomic and parallel queues but degrades it for ordered queues
+    {
+       int i = 0;
+       struct reorder_context *rctx = ts->rctx;
+       assert(ts->rctx != NULL);
+       while (i < num)
+       {
+#ifdef LOG
+if (VERBOSE) printf("%u: Deferring enqueue event %p/%u on queue %p\n", 
TS->tidx, ev[i], ev[i]->number, q);
+#endif
+           event_queue_set(ev[i], q);
+           if (rctx->head == NULL)
+           {
+               rctx->head = ev[i];
+               rctx->tail = ev[i];
+           }
+           else
+           {
+               event_next_set(rctx->tail, ev[i]);
+               rctx->tail = ev[i];
+           }
+           i++;
+       }
+       event_next_set(ev[i - 1], NULL);
+       rctx->tail = ev[i - 1];
+       return i;
+    }
+
+    struct ringstate old;
+    ringidx_t new_write;
+    uint32_t actual;
+
+    //Load producer ring state (read & write index)
+#ifdef NDEBUG
+    //No debug => no assert => relaxed ordering OK
+#define ATOMIC_READ_ON_ASSERT __ATOMIC_RELAXED
+#else
+    //Debug => assert reads from the ring => needs acquire ordering
+#define ATOMIC_READ_ON_ASSERT __ATOMIC_ACQUIRE
+#endif
+
+#ifndef USE_LLSC
+    old.write = __atomic_load_n(&q->queue.prod.write, __ATOMIC_RELAXED);
+#endif
+    do
+    {
+#ifdef USE_LLSC
+       old.write = ll32(&q->queue.prod.write, ATOMIC_READ_ON_ASSERT);
+#endif
+       old.read = __atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED);
+
+       actual = MIN(num, ringstate_num_free(old));
+       if (unlikely(actual == 0))
+       {
+           return 0;
+       }
+
+       new_write = old.write + actual;
+    }
+#ifdef USE_LLSC
+    while (unlikely(sc32(&q->queue.prod.write, new_write, __ATOMIC_RELAXED)));
+#else
+    while (!__atomic_compare_exchange_n(&q->queue.prod.write,
+                                       &old.write,//Updated on failure
+                                       new_write,
+                                       CAS_WEAK,
+                                       ATOMIC_READ_ON_ASSERT,
+                                       __ATOMIC_RELAXED));
+#endif
+
+    //Store our event(s) in the ring
+    uint32_t index = old.write & q->queue.mask;
+    for (uint32_t i = 0; i < actual; i++)
+    {
+       //The following assert reads from the ring, needs acquire ordering above
+       assert(ev[i] != ODP_EVENT_INVALID);
+       assert(q->queue.ring[index] == ODP_EVENT_INVALID);
+#ifdef LOG
+if (VERBOSE) printf("%u: Enqueue event %p/%u on queue %p (%u used)\n", 
TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(new_write - old.read));
+#endif
+       q->queue.ring[index] = ev[i];
+       index = (index + 1) & q->queue.mask;
+    }
+
+    //Wait for our turn to signal consumers
+    while (__atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED) != 
old.write)
+    {
+       doze();
+    }
+
+    //Update the event counter, optionally take a ticket
+    union
+    {
+       struct sharedstate ss;
+       uint64_t ui;
+    } oss, nss;
+    uint32_t ticket;
+#ifndef USE_LLSC
+    __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED);
+#endif
+    do
+    {
+       ticket = TICKET_INVALID;
+#ifdef USE_LLSC
+       oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED);
+#endif
+       nss = oss;
+       nss.ss.numevts += actual;
+       if (oss.ss.numevts == 0)//Empty -> non-empty transition
+       {
+           if (q->type != atomic_q || oss.ss.cur_ticket == oss.ss.nxt_ticket)
+           {
+               //Atomic queue: only take ticket if one is immediately available
+               //Otherwise ticket already taken => queue processed by some 
thread
+               {
+                   ticket = nss.ss.nxt_ticket++;
+               }
+               //Parallel or ordered queue
+               //Always take ticket
+           }
+       }
+       //Else queue already was non-empty
+    }
+    //Attempt to update numevts counter and optionally take ticket
+#ifdef USE_LLSC
+    while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED));
+#else
+    while (!__atomic_compare_exchange(&q->queue.shared,
+                                     &oss,
+                                     &nss,
+                                     CAS_WEAK,
+                                     __ATOMIC_RELAXED,
+                                     __ATOMIC_RELAXED));
+#endif
+
+    //Signal consumers that events are available (release events)
+    //Enable other producers to continue
+    far_atomic_store(&q->queue.cons.write, new_write, __ATOMIC_RELEASE);
+
+    if (ticket != TICKET_INVALID)
+    {
+       assert(oss.ss.numevts == 0);
+       //Wait for our turn to update schedq
+       while (__atomic_load_n(&q->queue.shared.cur_ticket, __ATOMIC_ACQUIRE) !=
+              ticket)
+       {
+           doze();
+       }
+
+       //Enqueue at end of scheduler queue
+       schedq_push(q->schedq, q);
+#ifdef LOG
+if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, 
q->schedq);
+#endif
+       far_atomic_store(&q->queue.shared.cur_ticket,
+                        ticket + 1,
+                        __ATOMIC_RELEASE_AFTER_CAS);
+    }
+    //Else queue was not empty or atomic queue already busy
+
+    return actual;
+}
+
+//We want _odp_queue_deq() to be inlined so that unexecuted paths caused by
+//threadsafe and atomic parameters are removed
+static int _odp_queue_deq(odp_queue_t q, odp_event_t ev[], int num, bool 
threadsafe, bool atomic) __attribute__((always_inline));
+static int _odp_queue_deq(odp_queue_t q,
+                         odp_event_t ev[],
+                         int num,
+                         bool threadsafe,
+                         bool atomic)
+{
+    uint32_t actual;
+    struct ringstate old;
+    ringidx_t new_read;
+
+    //Load consumer ring state (read & write index)
+    if (!threadsafe)
+    {
+       old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_ACQUIRE);
+       old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED);
+       actual = MIN(num, ringstate_num_used(old));
+       if (unlikely(actual == 0))
+       {
+           return 0;
+       }
+       new_read = old.read + actual;
+       q->queue.cons.read = new_read;
+    }
+    else
+    {
+#ifndef USE_LLSC
+       old.read = __atomic_load_n(&q->queue.cons.read, __ATOMIC_RELAXED);
+#endif
+       do
+       {
+#ifdef USE_LLSC
+           old.read = ll32(&q->queue.cons.read, __ATOMIC_ACQUIRE);
+#endif
+           old.write = __atomic_load_n(&q->queue.cons.write, __ATOMIC_RELAXED);
+
+           actual = MIN(num, ringstate_num_used(old));
+           if (unlikely(actual == 0))
+           {
+               return 0;
+           }
+
+           //Prefetch queue context for use by application
+           //__builtin_prefetch(q->user_ctx, 0, 0);
+
+           //Attempt to free ring slot(s)
+           new_read = old.read + actual;
+       }
+#ifdef USE_LLSC
+       while (unlikely(sc32(&q->queue.cons.read, new_read, __ATOMIC_RELAXED)));
+#else
+       while (!__atomic_compare_exchange_n(&q->queue.cons.read,
+                                           &old.read,//Updated on failure
+                                           new_read,
+                                           CAS_WEAK,
+                                           __ATOMIC_ACQUIRE,
+                                           __ATOMIC_ACQUIRE));
+#endif
+    }
+
+    uint32_t index = old.read & q->queue.mask;
+    uint32_t i;
+    for (i = 0; i < actual; i++)
+    {
+       //TODO Prefetch event data
+       ev[i] = q->queue.ring[index];
+       assert(ev[i] != ODP_EVENT_INVALID);
+#ifndef NDEBUG
+       q->queue.ring[index] = ODP_EVENT_INVALID;
+#endif
+       index = (index + 1) & q->queue.mask;
+#ifdef LOG
+if (VERBOSE) printf("%u: Dequeue event %p/%u from queue %p (%u used)\n", 
TS->tidx, ev[i], ev[i]->number, q, (uint32_t)(old.write - new_read));
+#endif
+    }
+
+    if (!threadsafe)
+    {
+       //Wait for our turn to signal producers
+       while (__atomic_load_n(&q->queue.prod.read, __ATOMIC_RELAXED) !=
+                              old.read)
+       {
+           doze();
+       }
+    }
+
+    if (atomic)
+    {
+       (void)__atomic_fetch_sub(&q->queue.shared.numevts,
+                                actual,
+                                __ATOMIC_RELAXED);
+
+       //Signal producers that empty slots are available (release ring slots)
+       //Enable other consumers to continue
+       far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE);
+    }
+    else
+    {
+       union
+       {
+           struct sharedstate ss;
+           uint64_t ui;
+       } oss, nss;
+       uint32_t ticket = TICKET_INVALID;
+#ifndef USE_LLSC
+       __atomic_load(&q->queue.shared, &oss, __ATOMIC_RELAXED);
+#endif
+       do
+       {
+#ifdef USE_LLSC
+           oss.ui = ll64((uint64_t *)&q->queue.shared, __ATOMIC_RELAXED);
+#endif
+           nss = oss;
+           nss.ss.numevts -= actual;
+           if (nss.ss.numevts == 0)
+           {
+               //If we emptied parallel/ordered queue, we need a ticket for a
+               //later pop
+               ticket = nss.ss.nxt_ticket++;
+           }
+       }
+       //Attempt update numevts and optionally take ticket
+#ifdef USE_LLSC
+       while (sc64((uint64_t *)&q->queue.shared, nss.ui, __ATOMIC_RELAXED));
+#else
+       while (!__atomic_compare_exchange(&q->queue.shared,
+                                         &oss,//Updated on failure
+                                         &nss,
+                                         CAS_WEAK,
+                                         __ATOMIC_RELAXED,
+                                         __ATOMIC_RELAXED));
+#endif
+
+       //Signal producers that empty slots are available (release ring slots)
+       //Enable other consumers to continue
+       far_atomic_store(&q->queue.prod.read, new_read, __ATOMIC_RELEASE);
+
+       if (nss.ss.numevts == 0)
+       {
+           assert(q->type != atomic_q);
+           //Wait for our turn update schedq
+           while (__atomic_load_n(&q->queue.shared.cur_ticket,
+                                  __ATOMIC_ACQUIRE) != ticket)
+           {
+               doze();
+           }
+
+           bool b = schedq_cond_pop(q->schedq, q);
+           (void)b;
+#ifdef LOG
+           if (VERBOSE) printf("%u: Pop queue %p from schedq %p %s\n", 
TS->tidx, q, q->schedq, b ? "success" : "failure");
+#endif
+           far_atomic_store(&q->queue.shared.cur_ticket,
+                            ticket + 1,
+                            __ATOMIC_RELEASE_AFTER_CAS);
+       }
+    }
+
+    return actual;
+}
+
+/******************************************************************************
+ * Behold, the scheduler!
+ *****************************************************************************/
+
+static inline void _odp_schedule_release_ordered(struct thread_state *ts)
+{
+    if (ts->rctx != NULL)
+    {
+#ifdef LOG
+if (VERBOSE) printf("%u: Release rctx %p\n", ts->tidx, ts->rctx);
+#endif
+       ts->out_of_order = false;
+       rctx_release(ts->rctx);
+       ts->rctx = NULL;
+    }
+}
+
+void odp_schedule_release_ordered(void)
+{
+    struct thread_state *ts = TS;
+    if (unlikely(ts->rctx == NULL))
+    {
+       fprintf(stderr, "odp_schedule_release_ordered: unexpected call\n");
+       fflush(NULL); abort();
+    }
+    _odp_schedule_release_ordered(ts);
+}
+
+static inline void _odp_schedule_release_atomic(struct thread_state *ts)
+{
+    struct sched_obj *q = ts->atomq;
+    bool pushed = false;
+    struct sharedstate oss, nss;
+    assert(ts->atomq != ODP_QUEUE_INVALID);
+    assert(ts->ticket != TICKET_INVALID);
+    //Only we have this queue, only we can dequeue but others can enqueue so
+    //numevts can increase but not decrease
+    __atomic_load(&q->queue.shared, &oss, __ATOMIC_ACQUIRE);
+    do
+    {
+       assert(oss.cur_ticket == ts->ticket);
+       if (oss.numevts != 0 && !pushed)
+       {
+           schedq_push(q->schedq, q);
+#ifdef LOG
+if (VERBOSE) printf("%u: Push queue %p on schedq %p\n", TS->tidx, q, 
q->schedq);
+#endif
+           pushed = true;//Only push once
+       }
+       nss = oss;
+       //Release ticket
+       nss.cur_ticket = ts->ticket + 1;
+    }
+    //Attempt to release ticket expecting our view of numevts to be correct
+    while (!__atomic_compare_exchange(&q->queue.shared,
+                                     &oss,
+                                     &nss,
+                                     CAS_WEAK,
+                                     __ATOMIC_RELEASE,
+                                     __ATOMIC_ACQUIRE));
+    //CAS succeed => if (numevts != 0) then queue pushed to schedq
+    ts->atomq = ODP_QUEUE_INVALID;
+    ts->ticket = TICKET_INVALID;
+}
+
+void odp_schedule_release_atomic(void)
+{
+    struct thread_state *ts = TS;
+    if (unlikely(ts->atomq == ODP_QUEUE_INVALID ||
+                ts->ticket == TICKET_INVALID))
+    {
+       fprintf(stderr, "odp_schedule_release_atomic: unexpected call\n");
+       fflush(NULL); abort();
+    }
+    _odp_schedule_release_atomic(ts);
+}
+
+static void update_sg_membership(struct thread_state *ts);
+
+static int odp_schedule_multi(odp_queue_t *from, uint64_t wait,
+                             odp_event_t ev[], int num) 
__attribute__((noinline));
+static int odp_schedule_multi(odp_queue_t *from, uint64_t wait,
+                             odp_event_t ev[], int num)
+{
+    (void)wait;//TODO implement timeout
+    //Get pointer to our per-thread state
+    struct thread_state *ts = TS;
+    if (unlikely(ts->pause))
+    {
+       return 0;
+    }
+    odp_queue_t atomq = ts->atomq;
+    //Check if we are currently processing an atomic queue
+    if (atomq != ODP_QUEUE_INVALID)
+    {
+       //Yes, continue to process this queue (optimise for throughput)
+       int ret;
+       assert(ts->ticket != TICKET_INVALID);
+dequeue_atomic: //No side effects before this label!
+       //Atomic queues can be dequeued without lock since this thread has the
+       //only reference to the atomic queue being processed
+       //We are the only thread that can dequeue but other threads can enqueue
+       if (likely((ret = _odp_queue_deq(atomq,
+                                        ev,
+                                        num,
+                                        /*threadsafe=*/false,
+                                        /*atomic=*/true)) != 0))
+       {
+           *from = atomq;
+           //This thread must continue to "own" this atomic queue  until all
+           //events processed and the thread re-invokes the scheduler
+           return ret;
+       }
+       //Atomic queue was empty, release it
+       _odp_schedule_release_atomic(ts);
+    }
+    //No atomic queue processing
+    //else
+    {
+       //Release any previous reorder context
+       _odp_schedule_release_ordered(ts);
+    }
+
+    if (unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0))
+    {
+       (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE);
+       __atomic_store_n(&ts->sg_sem, 0, __ATOMIC_RELAXED);//FIXME?
+       update_sg_membership(ts);
+    }
+
+    //Iterate through our list of scheduler queues which are sorted with
+    //higher priority first
+    for (uint32_t i = 0; i < ts->num_schedq; i++)
+    {
+       //__builtin_prefetch(ts->schedq_list[i + 1], 0, 0);
+       sched_queue *schedq = ts->schedq_list[i];
+       struct sched_obj *obj;
+restart_same:
+       //Peek at the head of the scheduler queue
+       obj = schedq_peek(schedq);
+       if (likely(obj == NULL))
+       {
+           //schedq empty
+           continue;//Look at next schedq
+       }
+       if (obj->type == atomic_q)
+       {
+           //Dequeue object only if it is still at head of schedq
+           bool b = schedq_cond_pop(schedq, obj);
+#ifdef LOG
+if (VERBOSE) printf("%u: Pop atomic queue %p from schedq %p %s\n", ts->tidx, 
obj, obj->schedq, b ? "success" : "failure");
+#endif
+           if (unlikely(!b))
+           {
+               //atomq not at head of schedq anymore, some other thread
+               //stole it
+               goto restart_same;//Restart at the same schedq
+           }
+           ts->atomq = atomq = obj;
+           //Dequeued atomic queue from the schedq, only we can process it
+           ts->ticket = __atomic_fetch_add(&atomq->queue.shared.nxt_ticket, 1, 
__ATOMIC_RELAXED);
+           while (__atomic_load_n(&atomq->queue.shared.cur_ticket, 
__ATOMIC_RELAXED) != ts->ticket)
+           {
+               doze();
+           }
+           goto dequeue_atomic;
+       }
+       else if (obj->type == ordered_q)
+       {
+           odp_queue_t ordq = obj;
+           assert(queue_rwin_get(ordq) != NULL);
+           //The scheduler object (probably an ordered queue) has a
+           //reorder window so requires order restoration
+           //We must use a reorder context to collect all outgoing events
+           //Find and initialise an unused reorder context
+           uint32_t i = __atomic_load_n(&ts->rvec_free, __ATOMIC_RELAXED);
+           if (unlikely(i == 0))
+           {
+               //No free reorder contexts for this thread
+#ifdef LOG
+if (VERBOSE) printf("%u: Out of reorder contexts, queue ignored\n", ts->tidx);
+#endif
+               continue;//Look at next schedq, hope we find non-ordered queue
+           }
+           //Get first bit set (starting from 0)
+           i = __builtin_ffs(i) - 1;
+           struct reorder_context *rctx = ts->rctx = &ts->rvec[i];
+           rctx_init(rctx, &ts->rvec_free, i, queue_rwin_get(ordq));
+#ifdef LOG
+if (VERBOSE) printf("%u: Using rctx %p\n", ts->tidx, rctx);
+#endif
+           //rwin_reserve and odp_queue_deq must be atomic or we will
+           //have a potential race condition
+           //Allocate a slot in the reorder window
+           if (unlikely(!rwin_reserve(rctx->rwin, &rctx->sn)))
+           {
+               //Reorder window full
+#ifdef LOG
+if (VERBOSE) printf("%u: Reorder window full, queue ignored\n", ts->tidx);
+#endif
+//bp();
+               rctx_free(rctx);
+               ts->rctx = NULL;
+               ts->out_of_order = false;
+               continue;//Look at next schedq, find other queue
+           }
+           //Are we in-order or out-of-order?
+           ts->out_of_order = rctx->sn != rctx->rwin->hc.head;
+#ifdef LOG
+if (VERBOSE) printf("%u: Reserved pos %u in rwin %p\n", ts->tidx, rctx->sn, 
rctx->rwin);
+#endif
+           //Wait for our turn to dequeue
+           while (__atomic_load_n(&rctx->rwin->turn, __ATOMIC_RELAXED) != 
rctx->sn)
+           {
+               doze();
+           }
+           int ret = _odp_queue_deq(ordq,
+                                    ev,
+                                    num,
+                                    /*threadsafe=*/false,
+                                    /*atomic=*/false);
+           //Someone else's turn
+           far_atomic_store(&rctx->rwin->turn,
+                            rctx->sn + 1,
+                            __ATOMIC_RELEASE_AFTER_CAS);
+           if (likely(ret != 0))
+           {
+               *from = ordq;
+               return ret;
+           }
+#ifdef LOG
+if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, ordq);
+//Queue will either become non-empty or will be removed by thread which made 
it empty
+if (VERBOSE) printf("%u: Release unused rctx %p\n", ts->tidx, ts->rctx);
+#endif
+           ts->out_of_order = false;
+           rctx_release(ts->rctx);
+#ifdef LOG
+if (VERBOSE) printf("%u: Release unused rctx %p rwin %p\n", ts->tidx, 
ts->rctx, ts->rctx->rwin);
+#endif
+           ts->rctx = NULL;
+           continue;//Look at next schedq
+       }
+       else if (obj->type == parallel_q)
+       {
+           odp_queue_t pq = obj;
+           int ret = _odp_queue_deq(pq,
+                                    ev,
+                                    num,
+                                    /*threadsafe=*/true,
+                                    /*atomic=*/false);
+           if (likely(ret != 0))
+           {
+               *from = pq;
+               return ret;
+           }
+#ifdef LOG
+if (VERBOSE) printf("%u: Queue %p seems empty, ignoring\n", ts->tidx, pq);
+//Queue will either become non-empty or will be removed by thread which made 
it empty
+#endif
+           continue;//Look at next schedq
+       }
+       else if (obj->type == pktio)
+       {
+       }
+    }
+    return 0;
+}
+
+static odp_event_t odp_schedule(odp_queue_t *from, uint64_t wait)
+{
+    odp_event_t evt;
+    if (likely(odp_schedule_multi(from, wait, &evt, 1) == 1))
+    {
+       return evt;
+    }
+    return ODP_EVENT_INVALID;
+}
+
+int odp_schedule_num_prio(void)
+{
+    return NUM_PRIO;
+}
+
+void odp_schedule_pause(void)
+{
+    struct thread_state *ts = TS;
+    ts->pause = true;
+}
+
+void odp_schedule_resume(void)
+{
+    struct thread_state *ts = TS;
+    ts->pause = false;
+}
+
+void odp_schedule_prefetch(int num)
+{
+    (void)num;
+    //No-op for the SW scheduler which is only driven by the application
+    //threads themselves
+}
+
+/******************************************************************************
+ * Scheduler groups
+ *****************************************************************************/
+
+struct sched_group
+{
+    odp_thrmask_t thr_actual[NUM_PRIO];//Threads currently associated with the 
sched group
+    odp_thrmask_t thr_wanted;
+    uint32_t xcount[NUM_PRIO];//Used to spread queues over schedq's
+    uint32_t xfactor;//Number of schedq's per prio
+    sched_queue schedq[1];//NUMPRIO * xfactor
+};
+
+static sched_group_mask_t sg_used;
+static struct sched_group *sg_vec[MAX_SCHED_GROUP];
+
+static sched_queue *schedq_from_sched_group(odp_schedule_group_t grp,
+                                               uint32_t prio)
+{
+    assert(grp > 0 && grp <= MAX_SCHED_GROUP);
+    assert((sg_used & (1ULL << (grp - 1))) != 0);
+    assert(prio >= 0 && prio < NUM_PRIO);
+    uint32_t sgi = grp - 1;
+    struct sched_group *sg = sg_vec[sgi];
+    //Use xcount to spread queues over the xfactor schedq's per priority
+    uint32_t x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED);
+    if (x == 0)
+    {
+       //First ODP queue for this priority
+       //Notify all threads in sg->thr_wanted that they should join
+       sched_group_mask_t thrds = sg->thr_wanted;
+       while (thrds != 0)
+       {
+           uint32_t thr = __builtin_ffsl(thrds) - 1;
+           thrds &= ~(1ULL << thr);
+           //Notify the thread about membership in this group/priority
+           (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[prio],
+                                   1ULL << sgi,
+                                   __ATOMIC_RELEASE);
+           __atomic_store_n(&thread_state[thr].sg_sem, 1, __ATOMIC_RELEASE);
+       }
+    }
+    return &sg->schedq[prio * sg->xfactor + x % sg->xfactor];
+}
+
+static void update_sg_membership(struct thread_state *ts)
+{
+    for (uint32_t p = 0; p < NUM_PRIO; p++)
+    {
+       sched_group_mask_t sg_wanted = __atomic_load_n(&ts->sg_wanted[p],
+                                                      __ATOMIC_ACQUIRE);
+       if (ts->sg_actual[p] != sg_wanted)
+       {
+           //Our sched_group membership has changed
+           sched_group_mask_t added = sg_wanted & ~ts->sg_actual[p];
+           while (added != 0)
+           {
+               uint32_t sgi = __builtin_ffsl(added) - 1;
+               struct sched_group *sg = sg_vec[sgi];
+               for (uint32_t x = 0; x < sg->xfactor; x++)
+               {
+                   //Include our thread index to shift (rotate) the order of
+                   //schedq's
+                   insert_schedq_in_list(ts,
+                                         &sg->schedq[p * sg->xfactor +
+                                         (x + ts->tidx) % sg->xfactor]);
+               }
+               (void)__atomic_fetch_or(&sg->thr_actual[p],
+                                       1ULL << ts->tidx,
+                                       __ATOMIC_RELAXED);
+               added &= ~(1ULL << sgi);
+           }
+           sched_group_mask_t removed = ~sg_wanted & ts->sg_actual[p];
+           while (removed != 0)
+           {
+               uint32_t sgi = __builtin_ffsl(removed) - 1;
+               struct sched_group *sg = sg_vec[sgi];
+               for (uint32_t x = 0; x < sg->xfactor; x++)
+               {
+                   remove_schedq_from_list(ts,
+                                           &sg->schedq[p * sg->xfactor + x]);
+               }
+               (void)__atomic_fetch_and(&sg->thr_actual[p],
+                                        ~(1ULL << ts->tidx),
+                                        __ATOMIC_RELAXED);
+               removed &= ~(1ULL << sgi);
+           }
+           ts->sg_actual[p] = sg_wanted;
+       }
+    }
+}
+
+int odp_schedule_group_join(odp_schedule_group_t group,
+                           const odp_thrmask_t *mask);
+
+odp_schedule_group_t odp_schedule_group_create(const char *name,
+                                              const odp_thrmask_t *mask)
+{
+    uint32_t sgi;
+    sched_group_mask_t used = __atomic_load_n(&sg_used, __ATOMIC_ACQUIRE);
+    do
+    {
+       if (~used == 0)
+           return -1;//All sched_groups in use
+       sgi = __builtin_ffsl(~used) - 1;
+       if (sgi >= MAX_SCHED_GROUP)
+           return -1;//All sched_groups in use
+    } while (!__atomic_compare_exchange_n(&sg_used,
+                                         &used,
+                                         used | (1ULL << sgi),
+                                         CAS_WEAK,
+                                         __ATOMIC_ACQUIRE,
+                                         __ATOMIC_ACQUIRE));
+    //Compute xfactor (spread factor) from the number of threads present in the
+    //thread mask
+    //Preferable this would be an explicit parameter
+    uint32_t xfactor = __builtin_popcountll(*mask);
+    if (xfactor < 1)
+    {
+       xfactor = 1;
+    }
+    struct sched_group *sg = aligned_alloc(CACHE_LINE,
+                                          sizeof(struct sched_group) +
+                                          (NUM_PRIO * xfactor - 1) *
+                                          sizeof(sched_queue));
+    if (sg == NULL)
+    {
+       return -1;
+    }
+    sg_vec[sgi] = sg;
+    memset(sg->thr_actual, 0, sizeof sg->thr_actual);
+    sg->thr_wanted = 0;
+    sg->xfactor = xfactor;
+    for (uint32_t p = 0; p < NUM_PRIO; p++)
+    {
+       sg->xcount[p] = 0;
+       for (uint32_t x = 0; x < xfactor; x++)
+       {
+           schedq_init(&sg->schedq[p * xfactor + x], p);
+       }
+    }
+    if (__builtin_popcountll(*mask) != 0)
+    {
+       odp_schedule_group_join(sgi + 1, mask);
+    }
+    return sgi + 1;
+}
+
+int odp_schedule_group_join(odp_schedule_group_t group,
+                           const odp_thrmask_t *mask)
+{
+    if (group < 1 && group > MAX_SCHED_GROUP)
+       return -1;
+    uint32_t sgi = group - 1;
+    if ((sg_used & (1ULL << sgi)) == 0)
+       return -1;
+    struct sched_group *sg = sg_vec[sgi];
+    odp_thrmask_t toadd = *mask;
+    //Add threads to scheduler group wanted thread mask
+    (void)__atomic_fetch_or(&sg->thr_wanted, toadd, __ATOMIC_RELAXED);
+    //Notify relevant threads about the change
+    while (toadd != 0)
+    {
+       uint32_t thr = __builtin_ffsl(toadd) - 1;
+       toadd &= ~(1ULL << thr);
+       for (uint32_t p = 0; p < NUM_PRIO; p++)
+       {
+           if (sg->xcount[p] != 0)
+           {
+               //This priority level has ODP queues
+               //Notify the thread about membership in this group/priority
+               (void)__atomic_fetch_or(&thread_state[thr].sg_wanted[p],
+                                       1ULL << sgi,
+                                       __ATOMIC_RELEASE);
+               __atomic_store_n(&thread_state[thr].sg_sem,
+                                1,
+                                __ATOMIC_RELEASE);
+           }
+       }
+    }
+    return 0;
+}
+
+int odp_schedule_group_leave(odp_schedule_group_t group,
+                            const odp_thrmask_t *mask)
+{
+    if (group < 1 && group > MAX_SCHED_GROUP)
+       return -1;
+    uint32_t sgi = group - 1;
+    if ((sg_used & (1ULL << sgi)) == 0)
+       return -1;
+    struct sched_group *sg = sg_vec[sgi];
+    odp_thrmask_t torem = *mask;
+    //Remove threads from scheduler group wanted thread mask
+    (void)__atomic_fetch_and(&sg->thr_wanted, ~torem, __ATOMIC_RELAXED);
+    //Notify relevant threads about the change
+    while (torem != 0)
+    {
+       uint32_t thr = __builtin_ffsl(torem) - 1;
+       torem &= ~(1ULL << thr);
+       for (uint32_t p = 0; p < NUM_PRIO; p++)
+       {
+           if (sg->xcount[p] != 0)
+           {
+               //Clear bit which specifies membership in this sched_group/prio
+               (void)__atomic_fetch_and(&thread_state[thr].sg_wanted[p],
+                                        ~(1ULL << sgi),
+                                        __ATOMIC_RELEASE);
+               __atomic_store_n(&thread_state[thr].sg_sem,
+                                1,
+                                __ATOMIC_RELEASE);
+           }
+       }
+    }
+    return 0;
+}
+
+int odp_schedule_group_thrmask(odp_schedule_group_t group,
+                              odp_thrmask_t *thrmask)
+{
+    if (group < 1 && group > MAX_SCHED_GROUP)
+       return -1;
+    uint32_t sgi = group - 1;
+    if ((sg_used & (1ULL << sgi)) == 0)
+       return -1;
+    struct sched_group *sg = sg_vec[sgi];
+    *thrmask = sg->thr_wanted;
+    return 0;
+}
+
+/******************************************************************************
+ * Ordered locks
+  
*****************************************************************************/
+
+void odp_schedule_order_lock(unsigned lock_index)
+{
+    struct thread_state *ts = TS;
+#ifndef NDEBUG
+    if (unlikely(ts->rctx == NULL))
+    {
+       fprintf(stderr, "odp_schedule_order_lock: unexpected call\n");
+       abort();
+    }
+#endif
+    struct reorder_context *rctx = ts->rctx;
+    struct reorder_window *rwin = rctx->rwin;
+#ifndef NDEBUG
+    if (unlikely(lock_index >= rwin->lock_count))
+    {
+       fprintf(stderr, "odp_schedule_order_lock: invalid lock index %u\n",
+               lock_index);
+       abort();
+    }
+#endif
+#ifdef LOG
+if (VERBOSE) printf("%u: lock acquire sn=%u %p->olock[0]=%u\n", TS->tidx, 
rctx->sn, rwin, rwin->olock[0]);
+#endif
+    while (__atomic_load_n(&rwin->olock[lock_index], __ATOMIC_ACQUIRE) !=
+          rctx->sn)
+    {
+       doze();
+    }
+#ifdef LOG
+if (VERBOSE) printf("%u: lock taken sn=%u %p->olock[0]=%u\n", TS->tidx, 
rctx->sn, rwin, rwin->olock[0]);
+#endif
+}
+
+void odp_schedule_order_unlock(unsigned lock_index)
+{
+    struct thread_state *ts = TS;
+#ifndef NDEBUG
+    if (unlikely(ts->rctx == NULL))
+    {
+       fprintf(stderr, "odp_schedule_order_unlock: unexpected call\n");
+       abort();
+    }
+#endif
+    struct reorder_context *rctx = ts->rctx;
+    struct reorder_window *rwin = rctx->rwin;
+#ifndef NDEBUG
+    if (unlikely(lock_index >= rwin->lock_count))
+    {
+       fprintf(stderr, "odp_schedule_order_unlock: invalid lock index %u\n",
+               lock_index);
+       abort();
+    }
+    if (unlikely(rwin->olock[lock_index] != rctx->sn))
+    {
+       fprintf(stderr, "odp_schedule_order_unlock: mismatched call\n");
+    }
+#endif
+#ifdef LOG
+if (VERBOSE) printf("%u: lock released %p->olock[0]=%u\n", TS->tidx, rwin, 
rctx->sn + 1);
+#endif
+    __atomic_store_n(&rwin->olock[lock_index], rctx->sn + 1, __ATOMIC_RELEASE);
+    rctx->olock_flags |= 1U << lock_index;
+}
+
+/******************************************************************************
+ * 
+ *****************************************************************************/
+
+static pthread_t tid[MAXTHREADS];
+static unsigned long CPUFREQ;
+static bool AFFINITY = false;
+static bool PARALLEL = false;
+static bool ORDERED = false;
+static pthread_barrier_t BAR;
+#define MAXQUEUES 256
+static uint32_t NUMQUEUES = 20;
+static odp_queue_t ODPQ[MAXQUEUES];
+#define MAXEVENTS 100000
+static uint32_t NUMEVENTS = 2048;
+static odp_event_t EVENTS[MAXEVENTS];
+static uint32_t NUMCOMPLETED ALIGNED(CACHE_LINE);
+
+static void *entrypoint(void *arg)
+{
+    unsigned tidx = (unsigned)(long)arg;
+    thread_state_init(tidx);
+
+    if (pthread_barrier_wait(&BAR) < PTHREAD_BARRIER_SERIAL_THREAD)
+    {
+       perror("pthread_barrier_wait"), abort();
+    }
+
+    if (tidx == 0)
+    {
+       //Enqueue events from events array into queue 0
+       for (unsigned i = 0; i < NUMEVENTS; i++)
+       {
+           odp_event_t evt = EVENTS[i];
+           evt->fromqidx = 0;
+           unsigned j;
+           for (j = 0; j < 100000; j++)
+           {
+               int rc = odp_queue_enq(ODPQ[0], &evt, 1);
+               if (rc == 1)
+                   break;
+               doze();
+               fprintf(stderr, "i=%u, read=%u, write=%u\n", i, 
ODPQ[0]->queue.prod.read, ODPQ[0]->queue.prod.write);
+               fflush(NULL); abort();
+           }
+           if (j == 100000)
+               fprintf(stderr, "Failed initial enqueue\n"), fflush(NULL), 
abort();
+       }
+    }
+
+    //Move events from queue N to queue N+1
+    uint32_t fails = 0;
+    while (__atomic_load_n(&NUMCOMPLETED, __ATOMIC_RELAXED) != NUMEVENTS)
+    {
+       odp_queue_t q;
+       odp_event_t evt = odp_schedule(&q, 0);
+       if (evt != ODP_EVENT_INVALID)
+       {
+           evt->fromqidx++;
+           if (evt->fromqidx < NUMQUEUES)
+           {
+               int rc = odp_queue_enq(ODPQ[evt->fromqidx], &evt, 1);
+               if (rc != 1)
+               {
+                   fprintf(stderr, "Queue full\n");
+                   fflush(NULL); abort();
+               }
+           }
+           else//Event has passed through all queues
+           {
+               if (ORDERED)
+               {
+                   odp_schedule_order_lock(0);
+               }
+
+               uint32_t expected = __atomic_fetch_add(&NUMCOMPLETED,
+                                                      1, __ATOMIC_RELAXED);
+#ifdef LOG
+               if (VERBOSE) printf("%u: Event %u completed\n", TS->tidx, 
evt->number);
+#endif
+               if (!PARALLEL && evt->number != expected)
+               {
+                   //Ordered or atomic queues
+                   fprintf(stderr, "%u: Event %u wrong order, expected %u\n",
+                           TS->tidx, evt->number, expected);
+               }
+               //Else parallel queues, order not preserved
+               if (ORDERED)
+               {
+                   odp_schedule_order_unlock(0);
+               }
+           }
+           fails = 0;
+       }
+       else
+       {
+           doze(); doze(); doze(); doze();
+           doze(); doze(); doze(); doze();
+           doze(); doze(); doze(); doze();
+           doze(); doze(); doze(); doze();
+           doze(); doze(); doze(); doze();
+           doze(); doze(); doze(); doze();
+           if (++fails == 10000000)
+           {
+               fprintf(stderr, "%u: Deadlock suspected\n", TS->tidx);
+               fflush(NULL);
+               bp();//break;
+           }
+       }
+    }
+
+#ifdef LOG
+    if (VERBOSE)
+    {
+       printf("NUMCOMPLETED %u\n", NUMCOMPLETED);
+       static int THREADEXIT = 0;
+       if (__atomic_fetch_add(&THREADEXIT, 1, __ATOMIC_ACQUIRE) == 0)
+       {
+           for (int i = 0; i < NUMQUEUES; i++)
+           {
+               printf("queue %p: numevts %u, cur_ticket %u, nxt_ticket %u\n", 
ODPQ[i], ODPQ[i]->queue.shared.numevts, ODPQ[i]->queue.shared.cur_ticket, 
ODPQ[i]->queue.shared.nxt_ticket);
+               struct ringstate rs;
+               rs = ODPQ[i]->queue.cons;
+               if (ringstate_num_used(rs) != 0)
+               {
+                   printf("queue %p.cons has %u elements\n", ODPQ[i], 
ringstate_num_used(rs));
+               }
+               rs = ODPQ[i]->queue.prod;
+               if (ringstate_num_used(rs) != 0)
+               {
+                   printf("queue %p.prod has %u elements\n", ODPQ[i], 
ringstate_num_used(rs));
+               }
+           }
+       }
+    }
+#endif
+
+    return NULL;
+}
+
+static void
+initialize_attr(pthread_attr_t *attr, int sched, int prio, unsigned cpu, const 
char *name)
+{
+    int err;
+    if (pthread_attr_init(attr) != 0)
+    {
+       perror("pthread_attr_init"), abort();
+    }
+    if (AFFINITY)
+    {
+       cpu_set_t cpuset;
+       CPU_ZERO(&cpuset);
+       CPU_SET(cpu + 1, &cpuset);
+       if (pthread_attr_setaffinity_np(attr, sizeof cpuset, &cpuset))
+       {
+           perror("pthread_attr_setaffinity_np"), abort();
+       }
+    }
+    if (pthread_attr_setschedpolicy(attr, sched))
+    {
+       perror("pthread_attr_setschedpolicy"), abort();
+    }
+    //Get scheduling policy from attr
+    if (pthread_attr_setinheritsched(attr, PTHREAD_EXPLICIT_SCHED))
+    {
+       perror("pthread_attr_setinheritsched"), abort();
+    }
+    struct sched_param schedp;
+    if (sched == SCHED_FIFO || sched == SCHED_RR)
+    {
+       memset(&schedp, 0, sizeof schedp);
+       schedp.sched_priority = prio;
+       if ((err = pthread_attr_setschedparam(attr, &schedp)) != 0)
+       {
+           errno = err;
+           perror("pthread_attr_setschedparam"), abort();
+       }
+    }
+}
+
+static void create_threads(void)
+{
+    unsigned thr;
+    void *(*ep)(void *) = entrypoint;
+    for (thr = 0; thr < NUMTHREADS; thr++)
+    {
+       int err;
+       pthread_attr_t pt_attr;
+       initialize_attr(&pt_attr, SCHED, PRIO, /*cpu=*/thr, "task");
+       if ((err = pthread_create(&tid[thr], &pt_attr, ep, 
/*arg=*/(void*)(long)thr)) != 0)
+       {
+           if (err == EPERM)
+           {
+               //Work-around for some platforms that do not support/allow
+               //SCHED_FIFO/SCHED_RR
+               initialize_attr(&pt_attr, SCHED_OTHER, PRIO, /*cpu=*/thr, 
"task");
+               err = pthread_create(&tid[thr], &pt_attr, ep, 
/*arg=*/(void*)(long)thr);
+           }
+           if (err != 0)
+           {
+               errno = err;
+               perror("pthread_create");
+               exit(20);
+           }
+       }
+    }
+}
+
+#if 0
+static unsigned permille(uint32_t rel, uint32_t tot)
+{
+    return (unsigned)(1000ULL * rel / tot);
+}
+#endif
+
+int main(int argc, char *argv[])
+{
+    unsigned thr;
+    int c;
+
+    while ((c = getopt(argc, argv, "ae:f:opq:t:v")) != -1)
+    {
+       switch (c)
+       {
+           case 'a' :
+               AFFINITY = true;
+               break;
+           case 'e' :
+           {
+               int numevents = atoi(optarg);
+               if (numevents < 1 || numevents > MAXEVENTS)
+               {
+                   fprintf(stderr, "Invalid number of events %d\n", numevents);
+                   exit(EXIT_FAILURE);
+               }
+               NUMEVENTS = (unsigned)numevents;
+               break;
+           }
+           case 'f' :
+           {
+               CPUFREQ = atol(optarg);
+               break;
+           }
+           case 'o' :
+               ORDERED = true;
+               break;
+           case 'p' :
+               PARALLEL = true;
+               break;
+           case 'q' :
+           {
+               int numqueues = atoi(optarg);
+               if (numqueues < 1 || numqueues > MAXQUEUES)
+               {
+                   fprintf(stderr, "Invalid number of queues %d\n", numqueues);
+                   exit(EXIT_FAILURE);
+               }
+               NUMQUEUES = (unsigned)numqueues;
+               break;
+           }
+           case 't' :
+           {
+               int numthreads = atoi(optarg);
+               if (numthreads < 1 || numthreads > MAXTHREADS)
+               {
+                   fprintf(stderr, "Invalid number of threads %d\n", 
numthreads);
+                   exit(EXIT_FAILURE);
+               }
+               NUMTHREADS = (unsigned)numthreads;
+               break;
+           }
+           default :
+usage :
+               fprintf(stderr, "Usage: scheduler <options>\n"
+                       "-a              Make threads CPU affine\n"
+                       "-e <numevents>  Number of events\n"
+                       "-f <cpufreq>    CPU frequency in kHz\n"
+                       "-o              Use ordered queues\n"
+                       "-p              Use parallel queues\n"
+                       "-q <numqueues>  Number of queues\n"
+                       "-t <numthr>     Number of threads\n"
+                       "-v              Verbose\n"
+                       );
+               exit(EXIT_FAILURE);
+           case 'v' :
+               VERBOSE = true;
+               break;
+       }
+    }
+    if (optind > argc || (PARALLEL && ORDERED))
+    {
+       goto usage;
+    }
+
+    printf("%u events, %u %s queue%s, %u thread%s\n",
+          NUMEVENTS,
+          NUMQUEUES,
+          PARALLEL ? "parallel" : ORDERED ? "ordered" : "atomic",
+          NUMQUEUES != 1 ? "s" : "",
+          NUMTHREADS,
+          NUMTHREADS != 1 ? "s" : "");
+
+    if (pthread_barrier_init(&BAR, NULL, NUMTHREADS + 1) != 0)
+    {
+       perror("pthread_barrier_init"), abort();
+    }
+
+    //Create scheduler group with thread mask = all threads (0..NUMTHREADS-1)
+    //so the scheduler knows how many schedq's are needed for best spread
+    odp_thrmask_t all = (1ULL << NUMTHREADS) - 1;
+    odp_schedule_group_t grp_all = odp_schedule_group_create("ALL", &all);
+
+    //Create all our ODP queues
+    for (unsigned i = 0; i < NUMQUEUES; i++)
+    {
+       //The last queue is atomic so that we can safely test ordering of events
+       odp_queue_t q = _odp_queue_create(/*prio=*/PRIO_MED,
+                                         /*sync=*/PARALLEL ? parallel_q :
+                                                  ORDERED ? ordered_q :
+                                                            atomic_q,
+                                         /*group=*/grp_all,
+                                         /*lock_count=*/ORDERED && (i == 
NUMQUEUES - 1),
+                                         /*user_ctx=*/NULL);
+       if (q == ODP_QUEUE_INVALID)
+           perror("_odp_queue_create"), abort();
+if (VERBOSE) printf("ODPQ[%u]=%p, type=%s, schedq %p\n", i, q, qtype2str(q), 
q->schedq);
+       ODPQ[i] = q;
+    }
+
+    for (unsigned i = 0; i < NUMEVENTS; i++)
+    {
+       odp_event_t evt = odp_event_alloc();
+       if (evt == ODP_EVENT_INVALID)
+           abort();
+       evt->number = i;
+       EVENTS[i] = evt;
+    }
+    NUMCOMPLETED = 0;
+
+    //Create threads
+    create_threads();
+
+    struct timespec ts;
+    clock_gettime(CLOCK_MONOTONIC, &ts);
+    uint64_t start = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
+
+    //Release threads by joining the barrier
+    pthread_barrier_wait(&BAR);
+
+    //Wait for threads to terminate
+    for (thr = 0; thr < NUMTHREADS; thr++)
+    {
+       pthread_join(tid[thr], NULL);
+    }
+
+    clock_gettime(CLOCK_MONOTONIC, &ts);
+    if (AFFINITY && CPUFREQ == 0)
+    {
+       unsigned long cpufreq[MAXTHREADS];
+       for (thr = 0; thr < NUMTHREADS; thr++)
+       {
+           char s[200];
+           cpufreq[thr] = 0;
+           sprintf(s, 
"/sys/devices/system/cpu/cpu%u/cpufreq/cpuinfo_cur_freq", thr + 1);
+           int fd = open(s, O_RDONLY);
+           if (fd != -1)
+           {
+               char buf[40];
+               int l = read(fd, buf, sizeof buf);
+               if (l > 0)
+               {
+                   cpufreq[thr] = atol(buf);
+               }
+               close(fd);
+           }
+       }
+       CPUFREQ = 0;
+       for (thr = 0; thr < NUMTHREADS; thr++)
+       {
+           printf("Thread %u current CPU frequency %lukHz\n", thr, 
cpufreq[thr]);
+           CPUFREQ += cpufreq[thr] / NUMTHREADS;
+       }
+       printf("Average CPU frequency %lukHz\n", CPUFREQ);
+    }
+    uint64_t numops = NUMEVENTS * NUMQUEUES;
+    uint64_t elapsed = ts.tv_sec * 1000000000ULL + ts.tv_nsec - start;
+    printf("%llu.%03llu seconds, ", elapsed / 1000000000LLU, (elapsed % 
1000000000LLU) / 1000000LLU);
+    if (elapsed / 1000000 != 0)
+    {
+       printf("%"PRIu32" ops/second", (uint32_t)((numops / (elapsed / 
1000000)) * 1000));
+    }
+    printf("\n");
+    printf("%"PRIu32" nanoseconds/update\n", (uint32_t)(elapsed / numops));
+    if (CPUFREQ != 0)
+    {
+       uint64_t cycles = NUMTHREADS * elapsed * CPUFREQ / 1000000ULL;
+       printf("%"PRIu32" cycles/update\n", (uint32_t)(cycles / numops));
+    }
+
+if (VERBOSE)
+{
+    for (uint32_t thr = 0; thr < NUMTHREADS; thr++)
+    {
+       for (uint32_t j = 0; j < thread_state[thr].num_schedq; j++)
+       {
+           sched_queue *schedq = thread_state[thr].schedq_list[j];
+           printf("%u: schedq[%u]=%p (prio=%u)\n", thr, j, schedq, 
schedq->prio);
+       }
+    }
+    uint32_t numpushpop = 0;
+    for (uint32_t i = 0; i < NUMQUEUES; i++)
+    {
+       numpushpop += ODPQ[i]->queue.shared.nxt_ticket;
+    }
+    printf("%u push/pop operations\n", numpushpop);
+}
+
+    return 0;
+}
-- 
2.1.4

Reply via email to