Signed-off-by: Brian Brooks <brian.bro...@arm.com> Signed-off-by: Kevin Wang <kevin.w...@arm.com> Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> Signed-off-by: Ola Liljedahl <ola.liljed...@arm.com> --- platform/linux-generic/Makefile.am | 7 + .../include/odp/api/plat/schedule_types.h | 4 +- .../linux-generic/include/odp_config_internal.h | 17 +- .../include/odp_queue_scalable_internal.h | 102 + platform/linux-generic/include/odp_schedule_if.h | 2 +- .../linux-generic/include/odp_schedule_scalable.h | 137 ++ .../include/odp_schedule_scalable_config.h | 55 + .../include/odp_schedule_scalable_ordered.h | 132 ++ platform/linux-generic/m4/odp_schedule.m4 | 55 +- platform/linux-generic/odp_queue_if.c | 8 + platform/linux-generic/odp_queue_scalable.c | 1020 ++++++++++ platform/linux-generic/odp_schedule_if.c | 6 + platform/linux-generic/odp_schedule_scalable.c | 1978 ++++++++++++++++++++ .../linux-generic/odp_schedule_scalable_ordered.c | 347 ++++ 14 files changed, 3848 insertions(+), 22 deletions(-) create mode 100644 platform/linux-generic/include/odp_queue_scalable_internal.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable_config.h create mode 100644 platform/linux-generic/include/odp_schedule_scalable_ordered.h create mode 100644 platform/linux-generic/odp_queue_scalable.c create mode 100644 platform/linux-generic/odp_schedule_scalable.c create mode 100644 platform/linux-generic/odp_schedule_scalable_ordered.c
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 3cb7511b..570760ba 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -171,9 +171,13 @@ noinst_HEADERS = \ ${srcdir}/include/odp_pool_internal.h \ ${srcdir}/include/odp_posix_extensions.h \ ${srcdir}/include/odp_queue_internal.h \ + ${srcdir}/include/odp_queue_scalable_internal.h \ ${srcdir}/include/odp_ring_internal.h \ ${srcdir}/include/odp_queue_if.h \ ${srcdir}/include/odp_schedule_if.h \ + ${srcdir}/include/odp_schedule_scalable.h \ + ${srcdir}/include/odp_schedule_scalable_config.h \ + ${srcdir}/include/odp_schedule_scalable_ordered.h \ ${srcdir}/include/odp_sorted_list_internal.h \ ${srcdir}/include/odp_shm_internal.h \ ${srcdir}/include/odp_time_internal.h \ @@ -230,12 +234,15 @@ __LIB__libodp_linux_la_SOURCES = \ odp_pool.c \ odp_queue.c \ odp_queue_if.c \ + odp_queue_scalable.c \ odp_rwlock.c \ odp_rwlock_recursive.c \ odp_schedule.c \ odp_schedule_if.c \ odp_schedule_sp.c \ odp_schedule_iquery.c \ + odp_schedule_scalable.c \ + odp_schedule_scalable_ordered.c \ odp_shared_memory.c \ odp_sorted_list.c \ odp_spinlock.c \ diff --git a/platform/linux-generic/include/odp/api/plat/schedule_types.h b/platform/linux-generic/include/odp/api/plat/schedule_types.h index 535fd6d0..4e75f9ee 100644 --- a/platform/linux-generic/include/odp/api/plat/schedule_types.h +++ b/platform/linux-generic/include/odp/api/plat/schedule_types.h @@ -18,6 +18,8 @@ extern "C" { #endif +#include <odp/api/std_types.h> + /** @addtogroup odp_scheduler * @{ */ @@ -44,7 +46,7 @@ typedef int odp_schedule_sync_t; typedef int odp_schedule_group_t; /* These must be kept in sync with thread_globals_t in odp_thread.c */ -#define ODP_SCHED_GROUP_INVALID -1 +#define ODP_SCHED_GROUP_INVALID ((odp_schedule_group_t)-1) #define ODP_SCHED_GROUP_ALL 0 #define ODP_SCHED_GROUP_WORKER 1 #define ODP_SCHED_GROUP_CONTROL 2 diff --git a/platform/linux-generic/include/odp_config_internal.h b/platform/linux-generic/include/odp_config_internal.h index dadd59e7..6cc844f3 100644 --- a/platform/linux-generic/include/odp_config_internal.h +++ b/platform/linux-generic/include/odp_config_internal.h @@ -7,9 +7,7 @@ #ifndef ODP_CONFIG_INTERNAL_H_ #define ODP_CONFIG_INTERNAL_H_ -#ifdef __cplusplus -extern "C" { -#endif +#include <odp_schedule_scalable_config.h> /* * Maximum number of pools @@ -22,6 +20,13 @@ extern "C" { #define ODP_CONFIG_QUEUES 1024 /* + * Maximum queue depth. Maximum number of elements that can be stored in a + * queue. This value is used only when the size is not explicitly provided + * during queue creation. + */ +#define CONFIG_QUEUE_SIZE 4096 + +/* * Maximum number of ordered locks per queue */ #define CONFIG_QUEUE_MAX_ORD_LOCKS 4 @@ -120,7 +125,7 @@ extern "C" { * * This the the number of separate SHM areas that can be reserved concurrently */ -#define ODPDRV_CONFIG_SHM_BLOCKS 48 +#define ODPDRV_CONFIG_SHM_BLOCKS ODP_CONFIG_SHM_BLOCKS /* Maximum event burst size * @@ -139,8 +144,4 @@ extern "C" { */ #define CONFIG_POOL_CACHE_SIZE 256 -#ifdef __cplusplus -} -#endif - #endif diff --git a/platform/linux-generic/include/odp_queue_scalable_internal.h b/platform/linux-generic/include/odp_queue_scalable_internal.h new file mode 100644 index 00000000..00abcaac --- /dev/null +++ b/platform/linux-generic/include/odp_queue_scalable_internal.h @@ -0,0 +1,102 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_QUEUE_SCALABLE_INTERNAL_H_ +#define ODP_QUEUE_SCALABLE_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/queue.h> +#include <odp_forward_typedefs_internal.h> +#include <odp_queue_if.h> +#include <odp_buffer_internal.h> +#include <odp_align_internal.h> +#include <odp/api/packet_io.h> +#include <odp/api/align.h> +#include <odp/api/hints.h> +#include <odp/api/ticketlock.h> +#include <odp_config_internal.h> +#include <odp_schedule_scalable.h> +#include <odp_schedule_scalable_ordered.h> + +#define QUEUE_STATUS_FREE 0 +#define QUEUE_STATUS_DESTROYED 1 +#define QUEUE_STATUS_READY 2 + +struct queue_entry_s { + sched_elem_t sched_elem; + + odp_ticketlock_t lock ODP_ALIGNED_CACHE; + int status; + + queue_enq_fn_t enqueue ODP_ALIGNED_CACHE; + queue_deq_fn_t dequeue; + queue_enq_multi_fn_t enqueue_multi; + queue_deq_multi_fn_t dequeue_multi; + + uint32_t index; + odp_queue_t handle; + odp_queue_type_t type; + odp_queue_param_t param; + odp_pktin_queue_t pktin; + odp_pktout_queue_t pktout; + char name[ODP_QUEUE_NAME_LEN]; +}; + +union queue_entry_u { + struct queue_entry_s s; + uint8_t pad[ROUNDUP_CACHE_LINE(sizeof(struct queue_entry_s))]; +}; + +int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num); +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num); +int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num); + +/* Round up memory size to next cache line size to + * align all memory addresses on cache line boundary. + */ +static inline void *shm_pool_alloc_align(_odp_ishm_pool_t *pool, uint32_t size) +{ + void *addr; + + addr = _odp_ishm_pool_alloc(pool, ROUNDUP_CACHE_LINE(size)); + ODP_ASSERT(((uintptr_t)addr & (ODP_CACHE_LINE_SIZE - 1)) == 0); + + return addr; +} + +static inline uint32_t queue_to_id(odp_queue_t handle) +{ + return _odp_typeval(handle) - 1; +} + +static inline queue_entry_t *qentry_from_int(queue_t handle) +{ + return (queue_entry_t *)(void *)(handle); +} + +static inline queue_t qentry_to_int(queue_entry_t *qentry) +{ + return (queue_t)(qentry); +} + +static inline odp_queue_t queue_get_handle(queue_entry_t *queue) +{ + return queue->s.handle; +} + +static inline reorder_window_t *queue_get_rwin(queue_entry_t *queue) +{ + return queue->s.sched_elem.rwin; +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h index 9adacef7..66ccad3f 100644 --- a/platform/linux-generic/include/odp_schedule_if.h +++ b/platform/linux-generic/include/odp_schedule_if.h @@ -12,7 +12,7 @@ extern "C" { #endif #include <odp/api/queue.h> -#include <odp_queue_internal.h> +#include <odp_queue_if.h> #include <odp/api/schedule.h> typedef void (*schedule_pktio_start_fn_t)(int pktio_index, int num_in_queue, diff --git a/platform/linux-generic/include/odp_schedule_scalable.h b/platform/linux-generic/include/odp_schedule_scalable.h new file mode 100644 index 00000000..4afb0878 --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable.h @@ -0,0 +1,137 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_H +#define ODP_SCHEDULE_SCALABLE_H + +#include <odp/api/align.h> +#include <odp/api/schedule.h> +#include <odp/api/ticketlock.h> + +#include <odp_schedule_scalable_config.h> +#include <odp_schedule_scalable_ordered.h> +#include <odp_llqueue.h> + +/* + * ODP_SCHED_PRIO_HIGHEST/NORMAL/LOWEST/DEFAULT are compile time + * constants, but not ODP_SCHED_PRIO_NUM. The current API for this + * is odp_schedule_num_prio(). The other schedulers also define + * this internally as NUM_PRIO. + */ +#define ODP_SCHED_PRIO_NUM 8 + +typedef struct { + union { + struct { + struct llqueue llq; + uint32_t prio; + }; + char line[ODP_CACHE_LINE_SIZE]; + }; +} sched_queue_t ODP_ALIGNED_CACHE; + +#define TICKET_INVALID (uint16_t)(~0U) + +typedef struct { + int32_t numevts; + uint16_t wrr_budget; + uint8_t cur_ticket; + uint8_t nxt_ticket; +} qschedstate_t ODP_ALIGNED(sizeof(uint64_t)); + +typedef uint32_t ringidx_t; + +#ifdef CONFIG_SPLIT_PRODCONS +#define SPLIT_PC ODP_ALIGNED_CACHE +#else +#define SPLIT_PC +#endif + +#define ODP_NO_SCHED_QUEUE (ODP_SCHED_SYNC_ORDERED + 1) + +typedef struct { + struct llnode node; /* must be first */ + sched_queue_t *schedq; +#ifdef CONFIG_QSCHST_LOCK + odp_ticketlock_t qschlock; +#endif + qschedstate_t qschst; + uint16_t pop_deficit; + uint16_t qschst_type; + ringidx_t prod_read SPLIT_PC; + ringidx_t prod_write; + ringidx_t prod_mask; + odp_buffer_hdr_t **prod_ring; + ringidx_t cons_write SPLIT_PC; + ringidx_t cons_read; + reorder_window_t *rwin; + void *user_ctx; +#ifdef CONFIG_SPLIT_PRODCONS + odp_buffer_hdr_t **cons_ring; + ringidx_t cons_mask; + uint16_t cons_type; +#else +#define cons_mask prod_mask +#define cons_ring prod_ring +#define cons_type qschst_type +#endif +} sched_elem_t ODP_ALIGNED_CACHE; + +/* Number of scheduling groups */ +#define MAX_SCHED_GROUP (sizeof(sched_group_mask_t) * CHAR_BIT) + +typedef bitset_t sched_group_mask_t; + +typedef struct { + /* Threads currently associated with the sched group */ + bitset_t thr_actual[ODP_SCHED_PRIO_NUM] ODP_ALIGNED_CACHE; + bitset_t thr_wanted; + /* Used to spread queues over schedq's */ + uint32_t xcount[ODP_SCHED_PRIO_NUM]; + /* Number of schedq's per prio */ + uint32_t xfactor; + char name[ODP_SCHED_GROUP_NAME_LEN]; + /* ODP_SCHED_PRIO_NUM * xfactor. Must be last. */ + sched_queue_t schedq[1] ODP_ALIGNED_CACHE; +} sched_group_t; + +/* Number of reorder contexts per thread */ +#define TS_RVEC_SIZE 16 + +typedef struct { + /* Atomic queue currently being processed or NULL */ + sched_elem_t *atomq; + /* Current reorder context or NULL */ + reorder_context_t *rctx; + uint8_t pause; + uint8_t out_of_order; + uint8_t tidx; + uint8_t pad; + uint32_t dequeued; /* Number of events dequeued from atomic queue */ + uint16_t pktin_next; /* Next pktin tag to poll */ + uint16_t pktin_poll_cnts; + uint16_t ticket; /* Ticket for atomic queue or TICKET_INVALID */ + uint16_t num_schedq; + uint16_t sg_sem; /* Set when sg_wanted is modified by other thread */ +#define SCHEDQ_PER_THREAD (MAX_SCHED_GROUP * ODP_SCHED_PRIO_NUM) + sched_queue_t *schedq_list[SCHEDQ_PER_THREAD]; + /* Current sched_group membership */ + sched_group_mask_t sg_actual[ODP_SCHED_PRIO_NUM]; + /* Future sched_group membership. */ + sched_group_mask_t sg_wanted[ODP_SCHED_PRIO_NUM]; + bitset_t priv_rvec_free; + /* Bitset of free entries in rvec[] */ + bitset_t rvec_free ODP_ALIGNED_CACHE; + /* Reordering contexts to allocate from */ + reorder_context_t rvec[TS_RVEC_SIZE] ODP_ALIGNED_CACHE; +} sched_scalable_thread_state_t ODP_ALIGNED_CACHE; + +void sched_update_enq(sched_elem_t *q, uint32_t actual); +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual); +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio); +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio); + +#endif /* ODP_SCHEDULE_SCALABLE_H */ diff --git a/platform/linux-generic/include/odp_schedule_scalable_config.h b/platform/linux-generic/include/odp_schedule_scalable_config.h new file mode 100644 index 00000000..febf379b --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable_config.h @@ -0,0 +1,55 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_CONFIG_H_ +#define ODP_SCHEDULE_SCALABLE_CONFIG_H_ + +/* + * Default scaling factor for the scheduler group + * + * This scaling factor is used when the application creates a scheduler + * group with no worker threads. + */ +#define CONFIG_DEFAULT_XFACTOR 4 + +/* + * Default weight (in events) for WRR in scalable scheduler + * + * This controls the per-queue weight for WRR between queues of the same + * priority in the scalable scheduler + * A higher value improves throughput while a lower value increases fairness + * and thus likely decreases latency + * + * If WRR is undesired, set the value to ~0 which will use the largest possible + * weight + * + * Note: an API for specifying this on a per-queue basis would be useful but is + * not yet available + */ +#define CONFIG_WRR_WEIGHT 64 + +/* + * Split queue producer/consumer metadata into separate cache lines. + * This is beneficial on e.g. Cortex-A57 but not so much on A53. + */ +#define CONFIG_SPLIT_PRODCONS + +/* + * Use locks to protect queue (ring buffer) and scheduler state updates + * On x86, this decreases overhead noticeably. + */ +#ifndef __ARM_ARCH +#define CONFIG_QSCHST_LOCK +/* Keep all ring buffer/qschst data together when using locks */ +#undef CONFIG_SPLIT_PRODCONS +#endif + +/* + * Maximum number of ordered locks per queue. + */ +#define CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE 2 + +#endif /* ODP_SCHEDULE_SCALABLE_CONFIG_H_ */ diff --git a/platform/linux-generic/include/odp_schedule_scalable_ordered.h b/platform/linux-generic/include/odp_schedule_scalable_ordered.h new file mode 100644 index 00000000..9f3acf7a --- /dev/null +++ b/platform/linux-generic/include/odp_schedule_scalable_ordered.h @@ -0,0 +1,132 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_SCHEDULE_SCALABLE_ORDERED_H +#define ODP_SCHEDULE_SCALABLE_ORDERED_H + +#include <odp/api/shared_memory.h> + +#include <odp_internal.h> +#include <odp_align_internal.h> +#include <odp_bitset.h> +#include <_ishmpool_internal.h> + +/* High level functioning of reordering + * Datastructures - + * Reorder Window - Every ordered queue is associated with a reorder window. + * Reorder window stores reorder contexts from threads that + * have completed processing out-of-order. + * Reorder Context - Reorder context consists of events that a thread + * wants to enqueue while processing a batch of events + * from an ordered queue. + * + * Algorithm - + * 1) Thread identifies the ordered queue. + * 2) It 'reserves a slot in the reorder window and dequeues the + * events' atomically. Atomicity is achieved by using a ticket-lock + * like design where the reorder window slot is the ticket. + * 3a) Upon order-release/next schedule call, the thread + * checks if it's slot (ticket) equals the head of the reorder window. + * If yes, enqueues the events to the destination queue till + * i) the reorder window is empty or + * ii) there is a gap in the reorder window + * If no, the reorder context is stored in the reorder window at + * the reserved slot. + * 3b) Upon the first enqueue, the thread checks if it's slot (ticket) + * equals the head of the reorder window. + * If yes, enqueues the events immediately to the destination queue + * If no, these (and subsequent) events are stored in the reorder context + * (in the application given order) + */ + +/* Head and change indicator variables are used to synchronise between + * concurrent insert operations in the reorder window. A thread performing + * an in-order insertion must be notified about the newly inserted + * reorder contexts so that it doesn’t halt the retire process too early. + * A thread performing an out-of-order insertion must correspondingly + * notify the thread doing in-order insertion of the new waiting reorder + * context, which may need to be handled by that thread. + * + * Also, an out-of-order insertion may become an in-order insertion if the + * thread doing an in-order insertion completes before this thread completes. + * We need a point of synchronisation where this knowledge and potential state + * change can be transferred between threads. + */ +typedef struct hc { + /* First missing context */ + uint32_t head; + /* Change indicator */ + uint32_t chgi; +} hc_t ODP_ALIGNED(sizeof(uint64_t)); + +/* Number of reorder contects in the reorder window. + * Should be at least one per CPU. + */ +#define RWIN_SIZE 32 +ODP_STATIC_ASSERT(CHECK_IS_POWER2(RWIN_SIZE), "RWIN_SIZE is not a power of 2"); + +#define NUM_OLOCKS 2 + +typedef struct reorder_context reorder_context_t; + +typedef struct reorder_window { + /* head and change indicator */ + hc_t hc; + uint32_t winmask; + uint32_t tail; + uint32_t turn; + uint32_t olock[NUM_OLOCKS]; + uint16_t lock_count; + /* Reorder contexts in this window */ + reorder_context_t *ring[RWIN_SIZE]; +} reorder_window_t; + +/* Number of events that can be stored in a reorder context. + * This size is chosen so that there is no space left unused at the end + * of the last cache line (for 64b architectures and 64b handles). + */ +#define RC_EVT_SIZE 18 + +struct reorder_context { + /* Reorder window to which this context belongs */ + reorder_window_t *rwin; + /* Pointer to TS->rvec_free */ + bitset_t *rvec_free; + /* Our slot number in the reorder window */ + uint32_t sn; + uint8_t olock_flags; + /* Our index in thread_state rvec array */ + uint8_t idx; + /* Use to link reorder contexts together */ + uint8_t next_idx; + /* Current reorder context to save events in */ + uint8_t cur_idx; + /* Number of events stored in this reorder context */ + uint8_t numevts; + /* Events stored in this context */ + odp_buffer_hdr_t *events[RC_EVT_SIZE]; + queue_entry_t *destq[RC_EVT_SIZE]; +} ODP_ALIGNED_CACHE; + +reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool, + unsigned lock_count); +int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin); +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn); +void rwin_insert(reorder_window_t *rwin, + reorder_context_t *rctx, + uint32_t sn, + void (*callback)(reorder_context_t *)); +void rctx_init(reorder_context_t *rctx, uint16_t idx, + reorder_window_t *rwin, uint32_t sn); +void rctx_free(const reorder_context_t *rctx); +void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin, + uint32_t lock_index); +void olock_release(const reorder_context_t *rctx); +void rctx_retire(reorder_context_t *first); +void rctx_release(reorder_context_t *rctx); +int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); + +#endif /* ODP_SCHEDULE_SCALABLE_ORDERED_H */ diff --git a/platform/linux-generic/m4/odp_schedule.m4 b/platform/linux-generic/m4/odp_schedule.m4 index 91c19f21..d862b8b2 100644 --- a/platform/linux-generic/m4/odp_schedule.m4 +++ b/platform/linux-generic/m4/odp_schedule.m4 @@ -1,13 +1,44 @@ -AC_ARG_ENABLE([schedule-sp], - [ --enable-schedule-sp enable strict priority scheduler], - [if test x$enableval = xyes; then - schedule_sp_enabled=yes - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" - fi]) +# Checks for --enable-schedule-sp and defines ODP_SCHEDULE_SP and adds +# -DODP_SCHEDULE_SP to CFLAGS. +AC_ARG_ENABLE( + [schedule_sp], + [AC_HELP_STRING([--enable-schedule-sp], + [enable strict priority scheduler])], + [if test "x$enableval" = xyes; then + schedule_sp=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SP" + else + schedule_sp=false + fi], + [schedule_sp=false]) +AM_CONDITIONAL([ODP_SCHEDULE_SP], [test x$schedule_sp = xtrue]) -AC_ARG_ENABLE([schedule-iquery], - [ --enable-schedule-iquery enable interests query (sparse bitmap) scheduler], - [if test x$enableval = xyes; then - schedule_iquery_enabled=yes - ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" - fi]) +# Checks for --enable-schedule-iquery and defines ODP_SCHEDULE_IQUERY and adds +# -DODP_SCHEDULE_IQUERY to CFLAGS. +AC_ARG_ENABLE( + [schedule_iquery], + [AC_HELP_STRING([--enable-schedule-iquery], + [enable interests query (sparse bitmap) scheduler])], + [if test "x$enableval" = xyes; then + schedule_iquery=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_IQUERY" + else + schedule_iquery=false + fi], + [schedule_iquery=false]) +AM_CONDITIONAL([ODP_SCHEDULE_IQUERY], [test x$schedule_iquery = xtrue]) + +# Checks for --enable-schedule-scalable and defines ODP_SCHEDULE_SCALABLE and +# adds -DODP_SCHEDULE_SCALABLE to CFLAGS. +AC_ARG_ENABLE( + [schedule_scalable], + [AC_HELP_STRING([--enable-schedule-scalable], + [enable scalable scheduler])], + [if test "x$enableval" = xyes; then + schedule_scalable=true + ODP_CFLAGS="$ODP_CFLAGS -DODP_SCHEDULE_SCALABLE" + else + schedule_scalable=false + fi], + [schedule_scalable=false]) +AM_CONDITIONAL([ODP_SCHEDULE_SCALABLE], [test x$schedule_scalable = xtrue]) diff --git a/platform/linux-generic/odp_queue_if.c b/platform/linux-generic/odp_queue_if.c index c91f00eb..d7471dfc 100644 --- a/platform/linux-generic/odp_queue_if.c +++ b/platform/linux-generic/odp_queue_if.c @@ -6,11 +6,19 @@ #include <odp_queue_if.h> +extern const queue_api_t queue_scalable_api; +extern const queue_fn_t queue_scalable_fn; + extern const queue_api_t queue_default_api; extern const queue_fn_t queue_default_fn; +#ifdef ODP_SCHEDULE_SCALABLE +const queue_api_t *queue_api = &queue_scalable_api; +const queue_fn_t *queue_fn = &queue_scalable_fn; +#else const queue_api_t *queue_api = &queue_default_api; const queue_fn_t *queue_fn = &queue_default_fn; +#endif odp_queue_t odp_queue_create(const char *name, const odp_queue_param_t *param) { diff --git a/platform/linux-generic/odp_queue_scalable.c b/platform/linux-generic/odp_queue_scalable.c new file mode 100644 index 00000000..d5c6d0ae --- /dev/null +++ b/platform/linux-generic/odp_queue_scalable.c @@ -0,0 +1,1020 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/hints.h> +#include <odp/api/plat/ticketlock_inlines.h> +#include <odp/api/queue.h> +#include <odp/api/schedule.h> +#include <odp/api/shared_memory.h> +#include <odp/api/sync.h> +#include <odp/api/traffic_mngr.h> + +#include <odp_internal.h> +#include <odp_config_internal.h> +#include <odp_debug_internal.h> + +#include <odp_buffer_inlines.h> +#include <odp_packet_io_internal.h> +#include <odp_packet_io_queue.h> +#include <odp_pool_internal.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <_ishm_internal.h> +#include <_ishmpool_internal.h> + +#include <string.h> +#include <inttypes.h> + +#define NUM_INTERNAL_QUEUES 64 + +#define MIN(a, b) \ + ({ \ + __typeof__(a) tmp_a = (a); \ + __typeof__(b) tmp_b = (b); \ + tmp_a < tmp_b ? tmp_a : tmp_b; \ + }) + +#define LOCK(a) _odp_ticketlock_lock(a) +#define UNLOCK(a) _odp_ticketlock_unlock(a) +#define LOCK_INIT(a) odp_ticketlock_init(a) + +extern __thread sched_scalable_thread_state_t *sched_ts; + +typedef struct queue_table_t { + queue_entry_t queue[ODP_CONFIG_QUEUES]; +} queue_table_t; + +static queue_table_t *queue_tbl; +_odp_ishm_pool_t *queue_shm_pool; + +static inline odp_queue_t queue_from_id(uint32_t queue_id) +{ + return _odp_cast_scalar(odp_queue_t, queue_id + 1); +} + +static queue_t queue_from_ext(odp_queue_t handle); +static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr); +static odp_buffer_hdr_t *_queue_deq(queue_t handle); +static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num); +static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num); + +static queue_entry_t *get_qentry(uint32_t queue_id) +{ + return &queue_tbl->queue[queue_id]; +} + +static int _odp_queue_disable_enq(sched_elem_t *q) +{ + ringidx_t old_read, old_write, new_write; + uint32_t size; + + old_write = q->prod_write; + size = q->prod_mask + 1; + do { + /* Need __atomic_load to avoid compiler reordering */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + if (old_write != old_read) { + /* Queue is not empty, cannot claim all elements + * Cannot disable enqueue. + */ + return -1; + } + /* Claim all elements in ring */ + new_write = old_write + size; + } while (!__atomic_compare_exchange_n(&q->prod_write, + &old_write, /* Updated on failure */ + new_write, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + /* All remaining elements claimed, no one else can enqueue */ + return 0; +} + +static int queue_init(queue_entry_t *queue, const char *name, + const odp_queue_param_t *param) +{ + ringidx_t ring_idx; + sched_elem_t *sched_elem; + uint32_t ring_size; + odp_buffer_hdr_t **ring; + uint32_t size; + + sched_elem = &queue->s.sched_elem; + ring_size = param->size > 0 ? + ROUNDUP_POWER2_U32(param->size) : CONFIG_QUEUE_SIZE; + strncpy(queue->s.name, name ? name : "", ODP_QUEUE_NAME_LEN - 1); + queue->s.name[ODP_QUEUE_NAME_LEN - 1] = 0; + memcpy(&queue->s.param, param, sizeof(odp_queue_param_t)); + + size = ring_size * sizeof(odp_buffer_hdr_t *); + ring = (odp_buffer_hdr_t **)shm_pool_alloc_align(queue_shm_pool, size); + if (NULL == ring) + return -1; + + for (ring_idx = 0; ring_idx < ring_size; ring_idx++) + ring[ring_idx] = NULL; + + queue->s.type = queue->s.param.type; + queue->s.enqueue = _queue_enq; + queue->s.dequeue = _queue_deq; + queue->s.enqueue_multi = _queue_enq_multi; + queue->s.dequeue_multi = _queue_deq_multi; + queue->s.pktin = PKTIN_INVALID; + + sched_elem->node.next = NULL; +#ifdef CONFIG_QSCHST_LOCK + LOCK_INIT(&sched_elem->qschlock); +#endif + sched_elem->qschst.numevts = 0; + sched_elem->qschst.wrr_budget = CONFIG_WRR_WEIGHT; + sched_elem->qschst.cur_ticket = 0; + sched_elem->qschst.nxt_ticket = 0; + sched_elem->pop_deficit = 0; + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) + sched_elem->qschst_type = queue->s.param.sched.sync; + else + sched_elem->qschst_type = ODP_NO_SCHED_QUEUE; + /* 2nd cache line - enqueue */ + sched_elem->prod_read = 0; + sched_elem->prod_write = 0; + sched_elem->prod_ring = ring; + sched_elem->prod_mask = ring_size - 1; + /* 3rd cache line - dequeue */ + sched_elem->cons_read = 0; + sched_elem->cons_write = 0; + sched_elem->rwin = NULL; + sched_elem->schedq = NULL; + sched_elem->user_ctx = queue->s.param.context; +#ifdef CONFIG_SPLIT_PRODCONS + sched_elem->cons_ring = ring; + sched_elem->cons_mask = ring_size - 1; + sched_elem->cons_type = sched_elem->qschst_type; +#endif + + /* Queue initialized successfully, add it to the sched group */ + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) { + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) { + sched_elem->rwin = + rwin_alloc(queue_shm_pool, + queue->s.param.sched.lock_count); + if (sched_elem->rwin == NULL) { + ODP_ERR("Reorder window not created\n"); + goto rwin_create_failed; + } + } + sched_elem->schedq = + schedq_from_sched_group(param->sched.group, + param->sched.prio); + } + + return 0; + +rwin_create_failed: + _odp_ishm_pool_free(queue_shm_pool, ring); + + return -1; +} + +static int queue_init_global(void) +{ + uint32_t i; + uint64_t pool_size; + uint64_t min_alloc; + uint64_t max_alloc; + + ODP_DBG("Queue init ... "); + + /* Attach to the pool if it exists */ + queue_shm_pool = _odp_ishm_pool_lookup("queue_shm_pool"); + if (queue_shm_pool == NULL) { + /* Create shared memory pool to allocate shared memory for the + * queues. Use the default queue size. + */ + /* Add size of the array holding the queues */ + pool_size = sizeof(queue_table_t); + /* Add storage required for queues */ + pool_size += (CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *)) * + ODP_CONFIG_QUEUES; + /* Add the reorder window size */ + pool_size += sizeof(reorder_window_t) * ODP_CONFIG_QUEUES; + /* Choose min_alloc and max_alloc such that buddy allocator is + * is selected. + */ + min_alloc = 0; + max_alloc = CONFIG_QUEUE_SIZE * sizeof(odp_buffer_hdr_t *); + queue_shm_pool = _odp_ishm_pool_create("queue_shm_pool", + pool_size, + min_alloc, max_alloc, + _ODP_ISHM_SINGLE_VA); + if (queue_shm_pool == NULL) { + ODP_ERR("Failed to allocate shared memory pool for" + " queues\n"); + goto queue_shm_pool_create_failed; + } + } + + queue_tbl = (queue_table_t *) + shm_pool_alloc_align(queue_shm_pool, + sizeof(queue_table_t)); + if (queue_tbl == NULL) { + ODP_ERR("Failed to reserve shared memory for queue table\n"); + goto queue_tbl_ishm_alloc_failed; + } + + memset(queue_tbl, 0, sizeof(queue_table_t)); + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + /* init locks */ + queue_entry_t *queue; + + queue = get_qentry(i); + LOCK_INIT(&queue->s.lock); + queue->s.index = i; + queue->s.handle = queue_from_id(i); + } + + ODP_DBG("done\n"); + ODP_DBG("Queue init global\n"); + ODP_DBG(" struct queue_entry_s size %zu\n", + sizeof(struct queue_entry_s)); + ODP_DBG(" queue_entry_t size %zu\n", + sizeof(queue_entry_t)); + ODP_DBG("\n"); + + return 0; + +queue_shm_pool_create_failed: + +queue_tbl_ishm_alloc_failed: + _odp_ishm_pool_destroy(queue_shm_pool); + + return -1; +} + +static int queue_term_global(void) +{ + int ret = 0; + int rc = 0; + queue_entry_t *queue; + int i; + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + queue = &queue_tbl->queue[i]; + if (__atomic_load_n(&queue->s.status, + __ATOMIC_RELAXED) != QUEUE_STATUS_FREE) { + ODP_ERR("Not destroyed queue: %s\n", queue->s.name); + rc = -1; + } + } + + _odp_ishm_pool_free(queue_shm_pool, queue_tbl); + + ret = _odp_ishm_pool_destroy(queue_shm_pool); + if (ret < 0) { + ODP_ERR("Failed to destroy shared memory pool for queues\n"); + rc = -1; + } + + return rc; +} + +static int queue_init_local(void) +{ + return 0; +} + +static int queue_term_local(void) +{ + return 0; +} + +static int queue_capability(odp_queue_capability_t *capa) +{ + memset(capa, 0, sizeof(odp_queue_capability_t)); + + /* Reserve some queues for internal use */ + capa->max_queues = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->max_ordered_locks = sched_fn->max_ordered_locks(); + capa->max_sched_groups = sched_fn->num_grps(); + capa->sched_prios = odp_schedule_num_prio(); + capa->plain.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->plain.max_size = 0; + capa->sched.max_num = ODP_CONFIG_QUEUES - NUM_INTERNAL_QUEUES; + capa->sched.max_size = 0; + + return 0; +} + +static odp_queue_type_t queue_type(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.type; +} + +static odp_schedule_sync_t queue_sched_type(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.sync; +} + +static odp_schedule_prio_t queue_sched_prio(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.prio; +} + +static odp_schedule_group_t queue_sched_group(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.sched.group; +} + +static int queue_lock_count(odp_queue_t handle) +{ + queue_entry_t *queue = qentry_from_int(queue_from_ext(handle)); + + return queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED ? + (int)queue->s.param.sched.lock_count : -1; +} + +static odp_queue_t queue_create(const char *name, + const odp_queue_param_t *param) +{ + int queue_idx; + odp_queue_t handle = ODP_QUEUE_INVALID; + queue_entry_t *queue; + odp_queue_param_t default_param; + + if (param == NULL) { + odp_queue_param_init(&default_param); + param = &default_param; + } + + for (queue_idx = 0; queue_idx < ODP_CONFIG_QUEUES; queue_idx++) { + queue = &queue_tbl->queue[queue_idx]; + + if (queue->s.status != QUEUE_STATUS_FREE) + continue; + + LOCK(&queue->s.lock); + if (queue->s.status == QUEUE_STATUS_FREE) { + if (queue_init(queue, name, param)) { + UNLOCK(&queue->s.lock); + return handle; + } + queue->s.status = QUEUE_STATUS_READY; + handle = queue->s.handle; + UNLOCK(&queue->s.lock); + break; + } + UNLOCK(&queue->s.lock); + } + return handle; +} + +static int queue_destroy(odp_queue_t handle) +{ + queue_entry_t *queue; + sched_elem_t *q; + + if (handle == ODP_QUEUE_INVALID) + return -1; + + queue = qentry_from_int(queue_from_ext(handle)); + LOCK(&queue->s.lock); + if (queue->s.status != QUEUE_STATUS_READY) { + UNLOCK(&queue->s.lock); + return -1; + } + q = &queue->s.sched_elem; + +#ifdef CONFIG_QSCHST_LOCK + LOCK(&q->qschlock); +#endif + if (_odp_queue_disable_enq(q)) { + /* Producer side not empty */ +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + UNLOCK(&queue->s.lock); + return -1; + } + /* Enqueue is now disabled */ + if (q->cons_read != q->cons_write) { + /* Consumer side is not empty + * Roll back previous change, enable enqueue again. + */ + uint32_t size; + + size = q->prod_mask + 1; + __atomic_fetch_sub(&q->prod_write, size, __ATOMIC_RELAXED); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + UNLOCK(&queue->s.lock); + return -1; + } +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&q->qschlock); +#endif + /* Producer and consumer sides empty, enqueue disabled + * Now wait until schedq state is empty and no outstanding tickets + */ + while (__atomic_load_n(&q->qschst.numevts, __ATOMIC_RELAXED) != 0 || + __atomic_load_n(&q->qschst.cur_ticket, __ATOMIC_RELAXED) != + __atomic_load_n(&q->qschst.nxt_ticket, __ATOMIC_RELAXED)) { + sevl(); + while (wfe() && monitor32((uint32_t *)&q->qschst.numevts, + __ATOMIC_RELAXED) != 0) + doze(); + } + + /* Adjust the spread factor for the queues in the schedule group */ + if (queue->s.type == ODP_QUEUE_TYPE_SCHED) + sched_group_xcount_dec(queue->s.param.sched.group, + queue->s.param.sched.prio); + + _odp_ishm_pool_free(queue_shm_pool, q->prod_ring); + + if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) { + if (rwin_free(queue_shm_pool, q->rwin) < 0) { + ODP_ERR("Failed to free reorder window\n"); + UNLOCK(&queue->s.lock); + return -1; + } + } + queue->s.status = QUEUE_STATUS_FREE; + UNLOCK(&queue->s.lock); + return 0; +} + +static int queue_context_set(odp_queue_t handle, void *context, + uint32_t len ODP_UNUSED) +{ + odp_mb_full(); + qentry_from_int(queue_from_ext(handle))->s.param.context = context; + odp_mb_full(); + return 0; +} + +static void *queue_context(odp_queue_t handle) +{ + return qentry_from_int(queue_from_ext(handle))->s.param.context; +} + +static odp_queue_t queue_lookup(const char *name) +{ + uint32_t i; + + for (i = 0; i < ODP_CONFIG_QUEUES; i++) { + queue_entry_t *queue = &queue_tbl->queue[i]; + + if (queue->s.status == QUEUE_STATUS_FREE || + queue->s.status == QUEUE_STATUS_DESTROYED) + continue; + + LOCK(&queue->s.lock); + if (strcmp(name, queue->s.name) == 0) { + /* found it */ + UNLOCK(&queue->s.lock); + return queue->s.handle; + } + UNLOCK(&queue->s.lock); + } + + return ODP_QUEUE_INVALID; +} + +#ifndef CONFIG_QSCHST_LOCK +static inline int _odp_queue_enq(sched_elem_t *q, + odp_buffer_hdr_t *buf_hdr[], + int num) +{ + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_write; + int actual; + uint32_t mask; + odp_buffer_hdr_t **ring; + + mask = q->prod_mask; + ring = q->prod_ring; + + /* Load producer ring state (read & write index) */ + old_write = __atomic_load_n(&q->prod_write, __ATOMIC_RELAXED); + do { + /* Consumer does store-release prod_read, we need + * load-acquire. + */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + + actual = MIN(num, (int)((mask + 1) - (old_write - old_read))); + if (odp_unlikely(actual <= 0)) + return 0; + + new_write = old_write + actual; + } while (!__atomic_compare_exchange_n(&q->prod_write, + &old_write, /* Updated on failure */ + new_write, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->cons_write, 0, 0); +#endif + /* Store our event(s) in the ring */ + do { + ring[old_write & mask] = *buf_hdr++; + } while (++old_write != new_write); + old_write -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + /* Wait for our turn to signal consumers */ + if (odp_unlikely(__atomic_load_n(&q->cons_write, + __ATOMIC_RELAXED) != old_write)) { + sevl(); + while (wfe() && monitor32(&q->cons_write, + __ATOMIC_RELAXED) != old_write) + doze(); + } + + /* Signal consumers that events are available (release events) + * Enable other producers to continue + */ + /* Wait for writes (to ring slots) to complete */ + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false); + + return actual; +} + +#else + +static inline int _odp_queue_enq_sp(sched_elem_t *q, + odp_buffer_hdr_t *buf_hdr[], + int num) +{ + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_write; + int actual; + uint32_t mask; + odp_buffer_hdr_t **ring; + + mask = q->prod_mask; + ring = q->prod_ring; + + /* Load producer ring state (read & write index) */ + old_write = q->prod_write; + /* Consumer does store-release prod_read, we need load-acquire */ + old_read = __atomic_load_n(&q->prod_read, __ATOMIC_ACQUIRE); + actual = MIN(num, (int)((mask + 1) - (old_write - old_read))); + if (odp_unlikely(actual <= 0)) + return 0; + + new_write = old_write + actual; + q->prod_write = new_write; + + /* Store our event(s) in the ring */ + do { + ring[old_write & mask] = *buf_hdr++; + } while (++old_write != new_write); + old_write -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + + /* Signal consumers that events are available (release events) + * Enable other producers to continue + */ +#ifdef CONFIG_QSCHST_LOCK + q->cons_write = new_write; +#else + atomic_store_release(&q->cons_write, new_write, /*readonly=*/false); +#endif + + return actual; +} +#endif + +static int _queue_enq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + int actual; + queue_entry_t *queue; + sched_scalable_thread_state_t *ts; + + queue = qentry_from_int(handle); + ts = sched_ts; + if (ts && odp_unlikely(ts->out_of_order)) { + actual = rctx_save(queue, buf_hdr, num); + return actual; + } + +#ifdef CONFIG_QSCHST_LOCK + LOCK(&queue->s.sched_elem.qschlock); + actual = _odp_queue_enq_sp(&queue->s.sched_elem, buf_hdr, num); +#else + actual = _odp_queue_enq(&queue->s.sched_elem, buf_hdr, num); +#endif + + if (odp_likely(queue->s.sched_elem.schedq != NULL && actual != 0)) { + /* Perform scheduler related updates. */ +#ifdef CONFIG_QSCHST_LOCK + sched_update_enq_sp(&queue->s.sched_elem, actual); +#else + sched_update_enq(&queue->s.sched_elem, actual); +#endif + } + +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&queue->s.sched_elem.qschlock); +#endif + return actual; +} + +static int _queue_enq(queue_t handle, odp_buffer_hdr_t *buf_hdr) +{ + return odp_likely( + _queue_enq_multi(handle, &buf_hdr, 1) == 1) ? 0 : -1; +} + +static int queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) +{ + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + queue_entry_t *queue; + int i; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = qentry_from_int(queue_from_ext(handle)); + + for (i = 0; i < num; i++) + buf_hdr[i] = buf_hdl_to_hdr(odp_buffer_from_event(ev[i])); + + return queue->s.enqueue_multi(qentry_to_int(queue), buf_hdr, num); +} + +static int queue_enq(odp_queue_t handle, odp_event_t ev) +{ + odp_buffer_hdr_t *buf_hdr; + queue_entry_t *queue; + + queue = qentry_from_int(queue_from_ext(handle)); + buf_hdr = buf_hdl_to_hdr(odp_buffer_from_event(ev)); + + return queue->s.enqueue(qentry_to_int(queue), buf_hdr); +} + +/* Single-consumer dequeue. */ +int _odp_queue_deq_sc(sched_elem_t *q, odp_event_t *evp, int num) +{ + int actual; + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_read; + uint32_t mask; + odp_buffer_hdr_t **ring; + + /* Load consumer ring state (read & write index). */ + old_read = q->cons_read; + /* Producer does store-release cons_write, we need load-acquire */ + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE); + actual = MIN(num, (int)(old_write - old_read)); + + if (odp_unlikely(actual <= 0)) + return 0; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + new_read = old_read + actual; + q->cons_read = new_read; + + mask = q->cons_mask; + ring = q->cons_ring; + do { + *evp++ = odp_buffer_to_event( + odp_hdr_to_buf(ring[old_read & mask])); + } while (++old_read != new_read); + + /* Signal producers that empty slots are available + * (release ring slots). Enable other consumers to continue. + */ +#ifdef CONFIG_QSCHST_LOCK + q->prod_read = new_read; +#else + /* Wait for loads (from ring slots) to complete. */ + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true); +#endif + return actual; +} + +inline int _odp_queue_deq(sched_elem_t *q, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int actual; + ringidx_t old_read; + ringidx_t old_write; + ringidx_t new_read; + uint32_t mask; + odp_buffer_hdr_t **ring; + odp_buffer_hdr_t **p_buf_hdr; + + mask = q->cons_mask; + ring = q->cons_ring; + + /* Load consumer ring state (read & write index) */ + old_read = __atomic_load_n(&q->cons_read, __ATOMIC_RELAXED); + do { + /* Need __atomic_load to avoid compiler reordering + * Producer does store-release cons_write, we need + * load-acquire. + */ + old_write = __atomic_load_n(&q->cons_write, __ATOMIC_ACQUIRE); + /* Prefetch ring buffer array */ + __builtin_prefetch(&q->cons_ring[old_read & mask], 0, 0); + + actual = MIN(num, (int)(old_write - old_read)); + if (odp_unlikely(actual <= 0)) + return 0; + + /* Attempt to free ring slot(s) */ + new_read = old_read + actual; + } while (!__atomic_compare_exchange_n(&q->cons_read, + &old_read, /* Updated on failure */ + new_read, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->prod_read, 0, 0); +#endif + p_buf_hdr = buf_hdr; + do { + *p_buf_hdr++ = ring[old_read & mask]; + } while (++old_read != new_read); + old_read -= actual; + +#ifdef CONFIG_SPLIT_PRODCONS + __builtin_prefetch(&q->node, 1, 0); +#endif + /* Wait for our turn to signal producers */ + if (odp_unlikely(__atomic_load_n(&q->prod_read, __ATOMIC_RELAXED) != + old_read)) { + sevl(); + while (wfe() && monitor32(&q->prod_read, + __ATOMIC_RELAXED) != old_read) + doze(); + } + + /* Signal producers that empty slots are available + * (release ring slots) + * Enable other consumers to continue + */ + /* Wait for loads (from ring slots) to complete */ + atomic_store_release(&q->prod_read, new_read, /*readonly=*/true); + + return actual; +} + +inline int _odp_queue_deq_mc(sched_elem_t *q, odp_event_t *evp, int num) +{ + int ret, evt_idx; + odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + ret = _odp_queue_deq(q, hdr_tbl, num); + if (odp_likely(ret != 0)) { + for (evt_idx = 0; evt_idx < num; evt_idx++) + evp[evt_idx] = odp_buffer_to_event( + odp_hdr_to_buf(hdr_tbl[evt_idx])); + } + + return ret; +} + +static int _queue_deq_multi(queue_t handle, odp_buffer_hdr_t *buf_hdr[], + int num) +{ + sched_elem_t *q; + queue_entry_t *queue; + + queue = qentry_from_int(handle); + q = &queue->s.sched_elem; + return _odp_queue_deq(q, buf_hdr, num); +} + +static odp_buffer_hdr_t *_queue_deq(queue_t handle) +{ + sched_elem_t *q; + odp_buffer_hdr_t *buf_hdr; + queue_entry_t *queue; + + queue = qentry_from_int(handle); + q = &queue->s.sched_elem; + if (_odp_queue_deq(q, &buf_hdr, 1) == 1) + return buf_hdr; + else + return NULL; +} + +static int queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num) +{ + queue_entry_t *queue; + odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; + int i, ret; + + if (num > QUEUE_MULTI_MAX) + num = QUEUE_MULTI_MAX; + + queue = qentry_from_int(queue_from_ext(handle)); + + ret = queue->s.dequeue_multi(qentry_to_int(queue), buf_hdr, num); + + for (i = 0; i < ret; i++) + events[i] = odp_buffer_to_event(buf_hdr[i]->handle.handle); + + return ret; +} + +static odp_event_t queue_deq(odp_queue_t handle) +{ + queue_entry_t *queue; + odp_buffer_hdr_t *buf_hdr; + + queue = qentry_from_int(queue_from_ext(handle)); + buf_hdr = queue->s.dequeue(qentry_to_int(queue)); + + if (buf_hdr) + return odp_buffer_to_event(buf_hdr->handle.handle); + + return ODP_EVENT_INVALID; +} + +static void queue_param_init(odp_queue_param_t *params) +{ + memset(params, 0, sizeof(odp_queue_param_t)); + params->type = ODP_QUEUE_TYPE_PLAIN; + params->enq_mode = ODP_QUEUE_OP_MT; + params->deq_mode = ODP_QUEUE_OP_MT; + params->sched.prio = ODP_SCHED_PRIO_DEFAULT; + params->sched.sync = ODP_SCHED_SYNC_PARALLEL; + params->sched.group = ODP_SCHED_GROUP_ALL; +} + +static int queue_info(odp_queue_t handle, odp_queue_info_t *info) +{ + uint32_t queue_id; + queue_entry_t *queue; + int status; + + if (odp_unlikely(info == NULL)) { + ODP_ERR("Unable to store info, NULL ptr given\n"); + return -1; + } + + queue_id = queue_to_id(handle); + + if (odp_unlikely(queue_id >= ODP_CONFIG_QUEUES)) { + ODP_ERR("Invalid queue handle:%" PRIu64 "\n", + odp_queue_to_u64(handle)); + return -1; + } + + queue = get_qentry(queue_id); + + LOCK(&queue->s.lock); + status = queue->s.status; + + if (odp_unlikely(status == QUEUE_STATUS_FREE || + status == QUEUE_STATUS_DESTROYED)) { + UNLOCK(&queue->s.lock); + ODP_ERR("Invalid queue status:%d\n", status); + return -1; + } + + info->name = queue->s.name; + info->param = queue->s.param; + + UNLOCK(&queue->s.lock); + + return 0; +} + +static uint64_t queue_to_u64(odp_queue_t hdl) +{ + return _odp_pri(hdl); +} + +static odp_pktout_queue_t queue_get_pktout(queue_t handle) +{ + return qentry_from_int(handle)->s.pktout; +} + +static void queue_set_pktout(queue_t handle, odp_pktio_t pktio, int index) +{ + qentry_from_int(handle)->s.pktout.pktio = pktio; + qentry_from_int(handle)->s.pktout.index = index; +} + +static odp_pktin_queue_t queue_get_pktin(queue_t handle) +{ + return qentry_from_int(handle)->s.pktin; +} + +static void queue_set_pktin(queue_t handle, odp_pktio_t pktio, int index) +{ + qentry_from_int(handle)->s.pktin.pktio = pktio; + qentry_from_int(handle)->s.pktin.index = index; +} + +static void queue_set_enq_func(queue_t handle, queue_enq_fn_t func) +{ + qentry_from_int(handle)->s.enqueue = func; +} + +static void queue_set_enq_multi_func(queue_t handle, queue_enq_multi_fn_t func) +{ + qentry_from_int(handle)->s.enqueue_multi = func; +} + +static void queue_set_deq_func(queue_t handle, queue_deq_fn_t func) +{ + qentry_from_int(handle)->s.dequeue = func; +} + +static void queue_set_deq_multi_func(queue_t handle, queue_deq_multi_fn_t func) +{ + qentry_from_int(handle)->s.dequeue_multi = func; +} + +static void queue_set_type(queue_t handle, odp_queue_type_t type) +{ + qentry_from_int(handle)->s.type = type; +} + +static queue_t queue_from_ext(odp_queue_t handle) +{ + uint32_t queue_id; + + queue_id = queue_to_id(handle); + return qentry_to_int(get_qentry(queue_id)); +} + +static odp_queue_t queue_to_ext(queue_t handle) +{ + return qentry_from_int(handle)->s.handle; +} + +/* API functions */ +queue_api_t queue_scalable_api = { + .queue_create = queue_create, + .queue_destroy = queue_destroy, + .queue_lookup = queue_lookup, + .queue_capability = queue_capability, + .queue_context_set = queue_context_set, + .queue_context = queue_context, + .queue_enq = queue_enq, + .queue_enq_multi = queue_enq_multi, + .queue_deq = queue_deq, + .queue_deq_multi = queue_deq_multi, + .queue_type = queue_type, + .queue_sched_type = queue_sched_type, + .queue_sched_prio = queue_sched_prio, + .queue_sched_group = queue_sched_group, + .queue_lock_count = queue_lock_count, + .queue_to_u64 = queue_to_u64, + .queue_param_init = queue_param_init, + .queue_info = queue_info +}; + +/* Functions towards internal components */ +queue_fn_t queue_scalable_fn = { + .init_global = queue_init_global, + .term_global = queue_term_global, + .init_local = queue_init_local, + .term_local = queue_term_local, + .from_ext = queue_from_ext, + .to_ext = queue_to_ext, + .enq = _queue_enq, + .enq_multi = _queue_enq_multi, + .deq = _queue_deq, + .deq_multi = _queue_deq_multi, + .get_pktout = queue_get_pktout, + .set_pktout = queue_set_pktout, + .get_pktin = queue_get_pktin, + .set_pktin = queue_set_pktin, + .set_enq_fn = queue_set_enq_func, + .set_enq_multi_fn = queue_set_enq_multi_func, + .set_deq_fn = queue_set_deq_func, + .set_deq_multi_fn = queue_set_deq_multi_func, + .set_type = queue_set_type +}; diff --git a/platform/linux-generic/odp_schedule_if.c b/platform/linux-generic/odp_schedule_if.c index a9ede98d..2f07aafe 100644 --- a/platform/linux-generic/odp_schedule_if.c +++ b/platform/linux-generic/odp_schedule_if.c @@ -15,12 +15,18 @@ extern const schedule_api_t schedule_default_api; extern const schedule_fn_t schedule_iquery_fn; extern const schedule_api_t schedule_iquery_api; +extern const schedule_fn_t schedule_scalable_fn; +extern const schedule_api_t schedule_scalable_api; + #ifdef ODP_SCHEDULE_SP const schedule_fn_t *sched_fn = &schedule_sp_fn; const schedule_api_t *sched_api = &schedule_sp_api; #elif defined(ODP_SCHEDULE_IQUERY) const schedule_fn_t *sched_fn = &schedule_iquery_fn; const schedule_api_t *sched_api = &schedule_iquery_api; +#elif defined(ODP_SCHEDULE_SCALABLE) +const schedule_fn_t *sched_fn = &schedule_scalable_fn; +const schedule_api_t *sched_api = &schedule_scalable_api; #else const schedule_fn_t *sched_fn = &schedule_default_fn; const schedule_api_t *sched_api = &schedule_default_api; diff --git a/platform/linux-generic/odp_schedule_scalable.c b/platform/linux-generic/odp_schedule_scalable.c new file mode 100644 index 00000000..089b8713 --- /dev/null +++ b/platform/linux-generic/odp_schedule_scalable.c @@ -0,0 +1,1978 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/align.h> +#include <odp/api/atomic.h> +#include <odp/api/cpu.h> +#include <odp/api/hints.h> +#include <odp/api/schedule.h> +#include <odp/api/shared_memory.h> +#include <odp/api/sync.h> +#include <odp/api/thread.h> +#include <odp/api/thrmask.h> +#include <odp/api/time.h> + +#include <odp_internal.h> +#include <odp_config_internal.h> +#include <odp_debug_internal.h> +#include <_ishm_internal.h> +#include <_ishmpool_internal.h> + +#include <odp_align_internal.h> +#include <odp_buffer_inlines.h> +#include <odp_llqueue.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <odp_bitset.h> +#include <odp_packet_io_internal.h> + +#include <limits.h> +#include <stdbool.h> +#include <string.h> + +#include <odp/api/plat/ticketlock_inlines.h> +#define LOCK(a) _odp_ticketlock_lock((a)) +#define UNLOCK(a) _odp_ticketlock_unlock((a)) + +#define TAG_EMPTY 0U +#define TAG_USED (1U << 15) +#define TAG_BUSY (1U << 31) +#define PKTIO_QUEUE_2_TAG(p, q) ((p) << 16 | (q) | TAG_USED) +#define TAG_2_PKTIO(t) (((t) >> 16) & 0x7FFF) +#define TAG_2_QUEUE(t) ((t) & 0x7FFF) +#define TAG_IS_READY(t) (((t) & (TAG_USED | TAG_BUSY)) == TAG_USED) +#define PKTIN_MAX (ODP_CONFIG_PKTIO_ENTRIES * PKTIO_MAX_QUEUES) +#define MAXTHREADS ATOM_BITSET_SIZE + +static _odp_ishm_pool_t *sched_shm_pool; +static uint32_t pktin_num; +static uint32_t pktin_hi; +static uint16_t pktin_count[ODP_CONFIG_PKTIO_ENTRIES]; +static uint32_t pktin_tags[PKTIN_MAX] ODP_ALIGNED_CACHE; + +#define __atomic_fetch_max(var, v, mo) do { \ + /* Evalulate 'v' once */ \ + __typeof__(v) tmp_v = (v); \ + __typeof__(*var) old_var = \ + __atomic_load_n((var), __ATOMIC_RELAXED); \ + while (tmp_v > old_var) { \ + /* Attempt to store 'v' in '*var' */ \ + if (__atomic_compare_exchange_n((var), &old_var, \ + tmp_v, true, (mo), \ + (mo))) \ + break; \ + } \ + /* v <= old_var, nothing to do */ \ + } while (0) + +ODP_STATIC_ASSERT(ODP_SCHED_PRIO_LOWEST == (ODP_SCHED_PRIO_NUM - 1), + "lowest_prio_does_not_match_with_num_prios"); + +ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && + (ODP_SCHED_PRIO_NORMAL < (ODP_SCHED_PRIO_NUM - 1)), + "normal_prio_is_not_between_highest_and_lowest"); + +ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES), + "Number_of_queues_is_not_power_of_two"); + +/* + * Scheduler group related variables. + */ +/* Currently used scheduler groups */ +static sched_group_mask_t sg_free; +static sched_group_t *sg_vec[MAX_SCHED_GROUP]; +/* Group lock for MT-safe APIs */ +odp_spinlock_t sched_grp_lock; + +#define SCHED_GROUP_JOIN 0 +#define SCHED_GROUP_LEAVE 1 + +/* + * Per thread state + */ +static sched_scalable_thread_state_t thread_state[MAXTHREADS]; +__thread sched_scalable_thread_state_t *sched_ts; + +/* + * Forward declarations. + */ +static int thread_state_init(int tidx) +{ + sched_scalable_thread_state_t *ts; + uint32_t i; + + ODP_ASSERT(tidx < MAXTHREADS); + ts = &thread_state[tidx]; + ts->atomq = NULL; + ts->rctx = NULL; + ts->pause = false; + ts->out_of_order = false; + ts->tidx = tidx; + ts->dequeued = 0; + ts->pktin_next = 0; + ts->pktin_poll_cnts = 0; + ts->ticket = TICKET_INVALID; + ts->priv_rvec_free = 0; + ts->rvec_free = (1ULL << TS_RVEC_SIZE) - 1; + ts->num_schedq = 0; + ts->sg_sem = 1; /* Start with sched group semaphore changed */ + memset(ts->sg_actual, 0, sizeof(ts->sg_actual)); + for (i = 0; i < TS_RVEC_SIZE; i++) { + ts->rvec[i].rvec_free = &ts->rvec_free; + ts->rvec[i].idx = i; + } + sched_ts = ts; + + return 0; +} + +static void insert_schedq_in_list(sched_scalable_thread_state_t *ts, + sched_queue_t *schedq) +{ + /* Find slot for schedq */ + for (uint32_t i = 0; i < ts->num_schedq; i++) { + /* Lower value is higher priority and closer to start of list */ + if (schedq->prio <= ts->schedq_list[i]->prio) { + /* This is the slot! */ + sched_queue_t *tmp; + + tmp = ts->schedq_list[i]; + ts->schedq_list[i] = schedq; + schedq = tmp; + /* Continue the insertion procedure with the + * new schedq. + */ + } + } + if (ts->num_schedq == SCHEDQ_PER_THREAD) + ODP_ABORT("Too many schedqs\n"); + ts->schedq_list[ts->num_schedq++] = schedq; +} + +static void remove_schedq_from_list(sched_scalable_thread_state_t *ts, + sched_queue_t *schedq) +{ + /* Find schedq */ + for (uint32_t i = 0; i < ts->num_schedq; i++) + if (ts->schedq_list[i] == schedq) { + /* Move remaining schedqs */ + for (uint32_t j = i + 1; j < ts->num_schedq; j++) + ts->schedq_list[j - 1] = ts->schedq_list[j]; + ts->num_schedq--; + return; + } + ODP_ABORT("Cannot find schedq\n"); +} + +/******************************************************************************* + * Scheduler queues + ******************************************************************************/ +#ifndef odp_container_of +#define odp_container_of(pointer, type, member) \ + ((type *)(void *)(((char *)pointer) - offsetof(type, member))) +#endif + +static inline void schedq_init(sched_queue_t *schedq, uint32_t prio) +{ + llqueue_init(&schedq->llq); + schedq->prio = prio; +} + +static inline sched_elem_t *schedq_peek(sched_queue_t *schedq) +{ + struct llnode *ptr; + + ptr = llq_head(&schedq->llq); + return odp_container_of(ptr, sched_elem_t, node); +} + +static inline odp_bool_t schedq_cond_pop(sched_queue_t *schedq, + sched_elem_t *elem) +{ + return llq_dequeue_cond(&schedq->llq, &elem->node); +} + +static inline void schedq_push(sched_queue_t *schedq, sched_elem_t *elem) +{ + llq_enqueue(&schedq->llq, &elem->node); +} + +static inline odp_bool_t schedq_cond_rotate(sched_queue_t *schedq, + sched_elem_t *elem) +{ + return llq_cond_rotate(&schedq->llq, &elem->node); +} + +static inline bool schedq_elem_on_queue(sched_elem_t *elem) +{ + return llq_on_queue(&elem->node); +} + +/******************************************************************************* + * Shared metadata btwn scheduler and queue + ******************************************************************************/ + +void sched_update_enq(sched_elem_t *q, uint32_t actual) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + oss = q->qschst; + /* Update event counter, optionally taking a ticket. */ + do { + ticket = TICKET_INVALID; + nss = oss; + nss.numevts += actual; + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) + /* E -> NE transition */ + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC || + oss.cur_ticket == oss.nxt_ticket) + /* Parallel or ordered queues: always take + * ticket. + * Atomic queue: only take ticket if one is + * immediately available. + * Otherwise ticket already taken => queue + * processed by some thread. + */ + ticket = nss.nxt_ticket++; + /* Else queue already was non-empty. */ + /* Attempt to update numevts counter and optionally take ticket. */ + } while (!__atomic_compare_exchange( + &q->qschst, &oss, &nss, + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if (odp_unlikely(ticket != TICKET_INVALID)) { + /* Wait for our turn to update schedq. */ + if (odp_unlikely( + __atomic_load_n(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && + monitor8(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + /* Enqueue at end of scheduler queue */ + /* We are here because of empty-to-non-empty transition + * This means queue must be pushed to schedq if possible + * but we can't do that if it already is on the schedq + */ + if (odp_likely(!schedq_elem_on_queue(q) && + q->pop_deficit == 0)) { + /* Queue not already on schedq and no pop deficit means + * we can push queue to schedq */ + schedq_push(q->schedq, q); + } else { + /* Missed push => cancels one missed pop */ + q->pop_deficit--; + } + atomic_store_release(&q->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); + } + /* Else queue was not empty or atomic queue already busy. */ +} + +void sched_update_enq_sp(sched_elem_t *q, uint32_t actual) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + oss = q->qschst; + /* Update event counter, optionally taking a ticket. */ + ticket = TICKET_INVALID; + nss = oss; + nss.numevts += actual; + if (odp_unlikely(oss.numevts <= 0 && nss.numevts > 0)) { + /* E -> NE transition */ + if (q->qschst_type != ODP_SCHED_SYNC_ATOMIC || + oss.cur_ticket == oss.nxt_ticket) { + /* Parallel or ordered queues: always take + * ticket. + * Atomic queue: only take ticket if one is + * immediately available. Otherwise ticket already + * taken => queue owned/processed by some thread + */ + ticket = nss.nxt_ticket++; + } + } + /* Else queue already was non-empty. */ + /* Attempt to update numevts counter and optionally take ticket. */ + q->qschst = nss; + + if (odp_unlikely(ticket != TICKET_INVALID)) { + /* Enqueue at end of scheduler queue */ + /* We are here because of empty-to-non-empty transition + * This means queue must be pushed to schedq if possible + * but we can't do that if it already is on the schedq + */ + if (odp_likely(!schedq_elem_on_queue(q) && + q->pop_deficit == 0)) { + /* Queue not already on schedq and no pop deficit means + * we can push queue to schedq */ + schedq_push(q->schedq, q); + } else { + /* Missed push => cancels one missed pop */ + q->pop_deficit--; + } + q->qschst.cur_ticket = ticket + 1; + } + /* Else queue was not empty or atomic queue already busy. */ +} + +#ifndef CONFIG_QSCHST_LOCK +/* The scheduler is the only entity that performs the dequeue from a queue. */ +static void +sched_update_deq(sched_elem_t *q, + uint32_t actual, + bool atomic) __attribute__((always_inline)); +static inline void +sched_update_deq(sched_elem_t *q, + uint32_t actual, bool atomic) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + if (atomic) { + bool pushed = false; + + /* We own this atomic queue, only we can dequeue from it and + * thus decrease numevts. Other threads may enqueue and thus + * increase numevts. + * This means that numevts can't unexpectedly become 0 and + * invalidate a push operation already performed + */ + oss = q->qschst; + do { + ODP_ASSERT(oss.cur_ticket == sched_ts->ticket); + nss = oss; + nss.numevts -= actual; + if (nss.numevts > 0 && !pushed) { + schedq_push(q->schedq, q); + pushed = true; + } + /* Attempt to release ticket expecting our view of + * numevts to be correct + * Unfortunately nxt_ticket will also be included in + * the CAS operation + */ + nss.cur_ticket = sched_ts->ticket + 1; + } while (odp_unlikely(!__atomic_compare_exchange( + &q->qschst, + &oss, &nss, + true, + __ATOMIC_RELEASE, + __ATOMIC_RELAXED))); + return; + } + + oss = q->qschst; + do { + ticket = TICKET_INVALID; + nss = oss; + nss.numevts -= actual; + nss.wrr_budget -= actual; + if ((oss.numevts > 0 && nss.numevts <= 0) || + oss.wrr_budget <= actual) { + /* If we have emptied parallel/ordered queue or + * exchausted its WRR budget, we need a ticket + * for a later pop. + */ + ticket = nss.nxt_ticket++; + /* Reset wrr_budget as we might also push the + * queue to the schedq. + */ + nss.wrr_budget = CONFIG_WRR_WEIGHT; + } + /* Attempt to update numevts and optionally take ticket. */ + } while (!__atomic_compare_exchange( + &q->qschst, &oss, &nss, + true, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if (odp_unlikely(ticket != TICKET_INVALID)) { + ODP_ASSERT(q->qschst_type != ODP_SCHED_SYNC_ATOMIC); + /* Wait for our turn to update schedq. */ + if (odp_unlikely( + __atomic_load_n(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && + monitor8(&q->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + /* We are here because of non-empty-to-empty transition or + * WRR budget exhausted + * This means the queue must be popped from the schedq, now or + * later + * If there was no NE->E transition but instead the WRR budget + * was exhausted, the queue needs to be moved (popped and + * pushed) to the tail of the schedq + */ + if (oss.numevts > 0 && nss.numevts <= 0) { + /* NE->E transition, need to pop */ + if (!schedq_elem_on_queue(q) || + !schedq_cond_pop(q->schedq, q)) { + /* Queue not at head, failed to dequeue + * Missed a pop. + */ + q->pop_deficit++; + } + } else { + /* WRR budget exhausted + * Need to move queue to tail of schedq if possible + */ + if (odp_likely(schedq_elem_on_queue(q))) { + /* Queue is on schedq, try to move it to + * the tail + */ + (void)schedq_cond_rotate(q->schedq, q); + } + /* Else queue not on schedq or not at head of schedq + * No pop => no push + */ + } + atomic_store_release(&q->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); + } +} +#endif + +#ifdef CONFIG_QSCHST_LOCK +static void +sched_update_deq_sc(sched_elem_t *q, + uint32_t actual, + bool atomic) __attribute__((always_inline)); +static inline void +sched_update_deq_sc(sched_elem_t *q, + uint32_t actual, bool atomic) +{ + qschedstate_t oss, nss; + uint32_t ticket; + + if (atomic) { + ODP_ASSERT(q->qschst.cur_ticket == sched_ts->ticket); + ODP_ASSERT(q->qschst.cur_ticket != q->qschst.nxt_ticket); + q->qschst.numevts -= actual; + q->qschst.cur_ticket = sched_ts->ticket + 1; + if (q->qschst.numevts > 0) + schedq_push(q->schedq, q); + return; + } + + oss = q->qschst; + ticket = TICKET_INVALID; + nss = oss; + nss.numevts -= actual; + nss.wrr_budget -= actual; + if ((oss.numevts > 0 && nss.numevts <= 0) || oss.wrr_budget <= actual) { + /* If we emptied the queue or + * if we have served the maximum number of events + * then we need a ticket for a later pop. + */ + ticket = nss.nxt_ticket++; + /* Also reset wrr_budget as we might also push the + * queue to the schedq. + */ + nss.wrr_budget = CONFIG_WRR_WEIGHT; + } + q->qschst = nss; + + if (ticket != TICKET_INVALID) { + if (oss.numevts > 0 && nss.numevts <= 0) { + /* NE->E transition, need to pop */ + if (!schedq_elem_on_queue(q) || + !schedq_cond_pop(q->schedq, q)) { + /* Queue not at head, failed to dequeue. + * Missed a pop. + */ + q->pop_deficit++; + } + } else { + /* WRR budget exhausted + * Need to move queue to tail of schedq if possible + */ + if (odp_likely(schedq_elem_on_queue(q))) { + /* Queue is on schedq, try to move it to + * the tail + */ + (void)schedq_cond_rotate(q->schedq, q); + } + /* Else queue not on schedq or not at head of schedq + * No pop => no push + */ + } + q->qschst.cur_ticket = ticket + 1; + } +} +#endif + +static inline void sched_update_popd_sc(sched_elem_t *elem) +{ + if (elem->pop_deficit != 0 && + schedq_elem_on_queue(elem) && + schedq_cond_pop(elem->schedq, elem)) + elem->pop_deficit--; +} + +#ifndef CONFIG_QSCHST_LOCK +static inline void sched_update_popd(sched_elem_t *elem) +{ + uint32_t ticket = __atomic_fetch_add(&elem->qschst.nxt_ticket, + 1, + __ATOMIC_RELAXED); + if (odp_unlikely(__atomic_load_n(&elem->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket)) { + sevl(); + while (wfe() && monitor8(&elem->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ticket) + doze(); + } + sched_update_popd_sc(elem); + atomic_store_release(&elem->qschst.cur_ticket, ticket + 1, + /*readonly=*/false); +} +#endif + +sched_queue_t *schedq_from_sched_group(odp_schedule_group_t grp, uint32_t prio) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP); + ODP_ASSERT((sg_free & (1ULL << grp)) == 0); + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM); + + sgi = grp; + sg = sg_vec[sgi]; + + /* Use xcount to spread queues over the xfactor schedq's + * per priority. + */ + x = __atomic_fetch_add(&sg->xcount[prio], 1, __ATOMIC_RELAXED); + if (x == 0) { + /* First ODP queue for this priority + * Notify all threads in sg->thr_wanted that they + * should join. + */ + sched_group_mask_t thrds = sg->thr_wanted; + + while (!bitset_is_null(thrds)) { + uint32_t thr; + + thr = bitset_ffs(thrds) - 1; + thrds = bitset_clr(thrds, thr); + /* Notify the thread about membership in this + * group/priority. + */ + atom_bitset_set(&thread_state[thr].sg_wanted[prio], + sgi, __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, 1, + __ATOMIC_RELEASE); + } + } + return &sg->schedq[prio * sg->xfactor + x % sg->xfactor]; +} + +void sched_group_xcount_dec(odp_schedule_group_t grp, uint32_t prio) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + ODP_ASSERT(grp >= 0 && grp < (odp_schedule_group_t)MAX_SCHED_GROUP); + ODP_ASSERT((sg_free & (1ULL << grp)) == 0); + ODP_ASSERT(prio < ODP_SCHED_PRIO_NUM); + + sgi = grp; + sg = sg_vec[sgi]; + x = __atomic_sub_fetch(&sg->xcount[prio], 1, __ATOMIC_RELAXED); + + if (x == 0) { + /* Last ODP queue for this priority + * Notify all threads in sg->thr_wanted that they + * should leave. + */ + sched_group_mask_t thrds = sg->thr_wanted; + + while (!bitset_is_null(thrds)) { + uint32_t thr; + + thr = bitset_ffs(thrds) - 1; + thrds = bitset_clr(thrds, thr); + /* Notify the thread about membership in this + * group/priority. + */ + atom_bitset_clr(&thread_state[thr].sg_wanted[prio], + sgi, __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, 1, + __ATOMIC_RELEASE); + } + } +} + +static void update_sg_membership(sched_scalable_thread_state_t *ts) +{ + uint32_t p; + sched_group_mask_t sg_wanted; + sched_group_mask_t added; + sched_group_mask_t removed; + uint32_t sgi; + sched_group_t *sg; + uint32_t x; + + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + sg_wanted = atom_bitset_load(&ts->sg_wanted[p], + __ATOMIC_ACQUIRE); + if (!bitset_is_eql(ts->sg_actual[p], sg_wanted)) { + /* Our sched_group membership has changed */ + added = bitset_andn(sg_wanted, ts->sg_actual[p]); + while (!bitset_is_null(added)) { + sgi = bitset_ffs(added) - 1; + sg = sg_vec[sgi]; + for (x = 0; x < sg->xfactor; x++) { + /* Include our thread index to shift + * (rotate) the order of schedq's + */ + insert_schedq_in_list + (ts, + &sg->schedq[p * sg->xfactor + + (x + ts->tidx) % sg->xfactor]); + } + atom_bitset_set(&sg->thr_actual[p], ts->tidx, + __ATOMIC_RELAXED); + added = bitset_clr(added, sgi); + } + removed = bitset_andn(ts->sg_actual[p], sg_wanted); + while (!bitset_is_null(removed)) { + sgi = bitset_ffs(removed) - 1; + sg = sg_vec[sgi]; + for (x = 0; x < sg->xfactor; x++) { + remove_schedq_from_list + (ts, + &sg->schedq[p * + sg->xfactor + x]); + } + atom_bitset_clr(&sg->thr_actual[p], ts->tidx, + __ATOMIC_RELAXED); + removed = bitset_clr(removed, sgi); + } + ts->sg_actual[p] = sg_wanted; + } + } +} + +/******************************************************************************* + * Scheduler + ******************************************************************************/ + +static inline void _schedule_release_atomic(sched_scalable_thread_state_t *ts) +{ +#ifdef CONFIG_QSCHST_LOCK + sched_update_deq_sc(ts->atomq, ts->dequeued, true); + ODP_ASSERT(ts->atomq->qschst.cur_ticket != ts->ticket); + ODP_ASSERT(ts->atomq->qschst.cur_ticket == + ts->atomq->qschst.nxt_ticket); +#else + sched_update_deq(ts->atomq, ts->dequeued, true); +#endif + ts->atomq = NULL; + ts->ticket = TICKET_INVALID; +} + +static inline void _schedule_release_ordered(sched_scalable_thread_state_t *ts) +{ + ts->out_of_order = false; + rctx_release(ts->rctx); + ts->rctx = NULL; +} + +static void pktin_poll(sched_scalable_thread_state_t *ts) +{ + uint32_t i, tag, hi, npolls = 0; + int pktio_index, queue_index; + + hi = __atomic_load_n(&pktin_hi, __ATOMIC_RELAXED); + if (hi == 0) + return; + + for (i = ts->pktin_next; npolls != hi; i = (i + 1) % hi, npolls++) { + tag = __atomic_load_n(&pktin_tags[i], __ATOMIC_RELAXED); + if (!TAG_IS_READY(tag)) + continue; + if (!__atomic_compare_exchange_n(&pktin_tags[i], &tag, + tag | TAG_BUSY, + true, + __ATOMIC_ACQUIRE, + __ATOMIC_RELAXED)) + continue; + /* Tag grabbed */ + pktio_index = TAG_2_PKTIO(tag); + queue_index = TAG_2_QUEUE(tag); + if (odp_unlikely(sched_cb_pktin_poll(pktio_index, + 1, &queue_index))) { + /* Pktio stopped or closed + * Remove tag from pktin_tags + */ + __atomic_store_n(&pktin_tags[i], + TAG_EMPTY, __ATOMIC_RELAXED); + __atomic_fetch_sub(&pktin_num, + 1, __ATOMIC_RELEASE); + /* Call stop_finalize when all queues + * of the pktio have been removed + */ + if (__atomic_sub_fetch(&pktin_count[pktio_index], 1, + __ATOMIC_RELAXED) == 0) + sched_cb_pktio_stop_finalize(pktio_index); + } else { + /* We don't know whether any packets were found and enqueued + * Write back original tag value to release pktin queue + */ + __atomic_store_n(&pktin_tags[i], tag, __ATOMIC_RELAXED); + /* Do not iterate through all pktin queues every time */ + if ((ts->pktin_poll_cnts & 0xf) != 0) + break; + } + } + ODP_ASSERT(i < hi); + ts->pktin_poll_cnts++; + ts->pktin_next = i; +} + +static int _schedule(odp_queue_t *from, odp_event_t ev[], int num_evts) +{ + sched_scalable_thread_state_t *ts; + sched_elem_t *atomq; + int num; + uint32_t i; + + ts = sched_ts; + atomq = ts->atomq; + + /* Once an atomic queue has been scheduled to a thread, it will stay + * on that thread until empty or 'rotated' by WRR + */ + if (atomq != NULL) { + ODP_ASSERT(ts->ticket != TICKET_INVALID); +#ifdef CONFIG_QSCHST_LOCK + LOCK(&atomq->qschlock); +#endif +dequeue_atomic: + ODP_ASSERT(ts->ticket == atomq->qschst.cur_ticket); + ODP_ASSERT(ts->ticket != atomq->qschst.nxt_ticket); + /* Atomic queues can be dequeued without lock since this thread + * has the only reference to the atomic queue being processed. + */ + if (ts->dequeued < atomq->qschst.wrr_budget) { + num = _odp_queue_deq_sc(atomq, ev, num_evts); + if (odp_likely(num != 0)) { +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + ts->dequeued += num; + /* Allow this thread to continue to 'own' this + * atomic queue until all events have been + * processed and the thread re-invokes the + * scheduler. + */ + if (from) + *from = queue_get_handle( + (queue_entry_t *)atomq); + return num; + } + } + /* Atomic queue was empty or interrupted by WRR, release it. */ + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } + + /* Release any previous reorder context. */ + if (ts->rctx != NULL) + _schedule_release_ordered(ts); + + /* Check for and perform any scheduler group updates. */ + if (odp_unlikely(__atomic_load_n(&ts->sg_sem, __ATOMIC_RELAXED) != 0)) { + (void)__atomic_load_n(&ts->sg_sem, __ATOMIC_ACQUIRE); + ts->sg_sem = 0; + update_sg_membership(ts); + } + + /* Scan our schedq list from beginning to end */ + for (i = 0; i < ts->num_schedq; i++) { + sched_queue_t *schedq = ts->schedq_list[i]; + sched_elem_t *elem; +restart_same: + elem = schedq_peek(schedq); + if (odp_unlikely(elem == NULL)) { + /* Schedq empty, look at next one. */ + continue; + } + + if (elem->cons_type == ODP_SCHED_SYNC_ATOMIC) { + /* Dequeue element only if it is still at head + * of schedq. + */ + if (odp_unlikely(!schedq_cond_pop(schedq, elem))) { + /* Queue not at head of schedq anymore, some + * other thread popped it. + */ + goto restart_same; + } + ts->atomq = elem; + atomq = elem; + ts->dequeued = 0; +#ifdef CONFIG_QSCHST_LOCK + LOCK(&atomq->qschlock); + ts->ticket = atomq->qschst.nxt_ticket++; + ODP_ASSERT(atomq->qschst.cur_ticket == ts->ticket); +#else + /* Dequeued atomic queue from the schedq, only we + * can process it and any qschst updates are our + * responsibility. + */ + /* The ticket taken below will signal producers */ + ts->ticket = __atomic_fetch_add( + &atomq->qschst.nxt_ticket, 1, __ATOMIC_RELAXED); + while (__atomic_load_n( + &atomq->qschst.cur_ticket, + __ATOMIC_ACQUIRE) != ts->ticket) { + /* No need to use WFE, spinning here seems + * very infrequent. + */ + odp_cpu_pause(); + } +#endif + goto dequeue_atomic; + } else if (elem->cons_type == ODP_SCHED_SYNC_PARALLEL) { +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); + num = _odp_queue_deq_sc(elem, ev, num_evts); + if (odp_likely(num != 0)) { + sched_update_deq_sc(elem, num, false); + UNLOCK(&elem->qschlock); + if (from) + *from = + queue_get_handle((queue_entry_t *)elem); + return num; + } + UNLOCK(&elem->qschlock); +#else + num = _odp_queue_deq_mc(elem, ev, num_evts); + if (odp_likely(num != 0)) { + sched_update_deq(elem, num, false); + if (from) + *from = + queue_get_handle((queue_entry_t *)elem); + return num; + } +#endif + } else if (elem->cons_type == ODP_SCHED_SYNC_ORDERED) { + reorder_window_t *rwin; + reorder_context_t *rctx; + uint32_t sn; + uint32_t idx; + + /* The ordered queue has a reorder window so requires + * order restoration. We must use a reorder context to + * collect all outgoing events. Ensure there is at least + * one available reorder context. + */ + if (odp_unlikely(bitset_is_null(ts->priv_rvec_free))) { + ts->priv_rvec_free = atom_bitset_xchg( + &ts->rvec_free, 0, + __ATOMIC_RELAXED); + if (odp_unlikely(bitset_is_null( + ts->priv_rvec_free))) { + /* No free reorder contexts for + * this thread. Look at next schedq, + * hope we find non-ordered queue. + */ + continue; + } + } + /* rwin_reserve and odp_queue_deq must be atomic or + * there will be a potential race condition. + * Allocate a slot in the reorder window. + */ + rwin = queue_get_rwin((queue_entry_t *)elem); + ODP_ASSERT(rwin != NULL); + if (odp_unlikely(!rwin_reserve(rwin, &sn))) { + /* Reorder window full */ + /* Look at next schedq, find other queue */ + continue; + } + /* Wait for our turn to dequeue */ + if (odp_unlikely(__atomic_load_n(&rwin->turn, + __ATOMIC_ACQUIRE) + != sn)) { + sevl(); + while (wfe() && + monitor32(&rwin->turn, __ATOMIC_ACQUIRE) + != sn) + doze(); + } +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); +#endif + num = _odp_queue_deq_sc(elem, ev, num_evts); + /* Wait for prod_read write in _odp_queue_dequeue_sc() + * to complete before we signal the next consumer + */ + atomic_store_release(&rwin->turn, sn + 1, + /*readonly=*/false); + /* Find and initialise an unused reorder context. */ + idx = bitset_ffs(ts->priv_rvec_free) - 1; + ts->priv_rvec_free = + bitset_clr(ts->priv_rvec_free, idx); + rctx = &ts->rvec[idx]; + /* Need to initialise reorder context or we can't + * release it later. + */ + rctx_init(rctx, idx, rwin, sn); + + /* Was dequeue successful? */ + if (odp_likely(num != 0)) { + /* Perform scheduler related updates */ +#ifdef CONFIG_QSCHST_LOCK + sched_update_deq_sc(elem, num, + /*atomic=*/false); + UNLOCK(&elem->qschlock); +#else + sched_update_deq(elem, num, /*atomic=*/false); +#endif + + /* Are we in-order or out-of-order? */ + ts->out_of_order = sn != rwin->hc.head; + + ts->rctx = rctx; + if (from) + *from = queue_get_handle( + (queue_entry_t *)elem); + return num; + } +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&elem->qschlock); +#endif + /* Since a slot was reserved in the reorder window, + * the reorder context needs to be released and + * inserted into the reorder window. + */ + rctx_release(rctx); + ODP_ASSERT(ts->rctx == NULL); + } + /* Dequeue from parallel/ordered queue failed + * Check if we have a queue at the head of the schedq that needs + * to be popped + */ + if (odp_unlikely(__atomic_load_n(&elem->pop_deficit, + __ATOMIC_RELAXED) != 0)) { +#ifdef CONFIG_QSCHST_LOCK + LOCK(&elem->qschlock); + sched_update_popd_sc(elem); + UNLOCK(&elem->qschlock); +#else + sched_update_popd(elem); +#endif + } + } + + pktin_poll(ts); + return 0; +} + +/******************************************************************************/ + +static void schedule_order_lock(unsigned lock_index) +{ + struct reorder_context *rctx = sched_ts->rctx; + + if (odp_unlikely(rctx == NULL || + rctx->rwin == NULL || + lock_index >= rctx->rwin->lock_count)) { + ODP_ERR("Invalid call to odp_schedule_order_lock\n"); + return; + } + if (odp_unlikely(__atomic_load_n(&rctx->rwin->olock[lock_index], + __ATOMIC_ACQUIRE) != rctx->sn)) { + sevl(); + while (wfe() && + monitor32(&rctx->rwin->olock[lock_index], + __ATOMIC_ACQUIRE) != rctx->sn) + doze(); + } +} + +static void schedule_order_unlock(unsigned lock_index) +{ + struct reorder_context *rctx; + + rctx = sched_ts->rctx; + if (odp_unlikely(rctx == NULL || + rctx->rwin == NULL || + lock_index >= rctx->rwin->lock_count || + rctx->rwin->olock[lock_index] != rctx->sn)) { + ODP_ERR("Invalid call to odp_schedule_order_unlock\n"); + return; + } + atomic_store_release(&rctx->rwin->olock[lock_index], + rctx->sn + 1, + /*readonly=*/false); + rctx->olock_flags |= 1U << lock_index; +} + +static void schedule_release_atomic(void) +{ + sched_scalable_thread_state_t *ts; + + ts = sched_ts; + if (odp_likely(ts->atomq != NULL)) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } +} + +static void schedule_release_ordered(void) +{ + sched_scalable_thread_state_t *ts; + + ts = sched_ts; + if (ts->rctx != NULL) + _schedule_release_ordered(ts); +} + +static int schedule_multi(odp_queue_t *from, uint64_t wait, odp_event_t ev[], + int num) +{ + sched_scalable_thread_state_t *ts; + int n; + odp_time_t start; + odp_time_t delta; + odp_time_t deadline; + + ts = sched_ts; + if (odp_unlikely(ts->pause)) { + if (ts->atomq != NULL) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } else if (ts->rctx != NULL) { + _schedule_release_ordered(ts); + } + return 0; + } + + if (wait == ODP_SCHED_NO_WAIT) + return _schedule(from, ev, num); + + if (wait == ODP_SCHED_WAIT) { + for (;;) { + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + } + } + + start = odp_time_local(); + + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + + delta = odp_time_local_from_ns(wait); + deadline = odp_time_sum(start, delta); + + while (odp_time_cmp(deadline, odp_time_local()) > 0) { + n = _schedule(from, ev, num); + if (odp_likely(n > 0)) + return n; + } + + return 0; +} + +static odp_event_t schedule(odp_queue_t *from, uint64_t wait) +{ + odp_event_t ev = ODP_EVENT_INVALID; + const int num = 1; + sched_scalable_thread_state_t *ts; + int n; + odp_time_t start; + odp_time_t delta; + odp_time_t deadline; + + ts = sched_ts; + if (odp_unlikely(ts->pause)) { + if (ts->atomq != NULL) { +#ifdef CONFIG_QSCHST_LOCK + sched_elem_t *atomq; + + atomq = ts->atomq; + LOCK(&atomq->qschlock); +#endif + _schedule_release_atomic(ts); +#ifdef CONFIG_QSCHST_LOCK + UNLOCK(&atomq->qschlock); +#endif + } else if (ts->rctx != NULL) { + _schedule_release_ordered(ts); + } + return ev; + } + + if (wait == ODP_SCHED_NO_WAIT) { + (void)_schedule(from, &ev, num); + return ev; + } + + if (wait == ODP_SCHED_WAIT) { + for (;;) { + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + } + } + + start = odp_time_local(); + + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + + delta = odp_time_local_from_ns(wait); + deadline = odp_time_sum(start, delta); + + while (odp_time_cmp(deadline, odp_time_local()) > 0) { + n = _schedule(from, &ev, num); + if (odp_likely(n > 0)) + return ev; + } + + return ev; +} + +static void schedule_pause(void) +{ + sched_ts->pause = true; +} + +static void schedule_resume(void) +{ + sched_ts->pause = false; +} + +static uint64_t schedule_wait_time(uint64_t ns) +{ + return ns; +} + +static int schedule_num_prio(void) +{ + return ODP_SCHED_PRIO_NUM; +} + +static int schedule_group_update(sched_group_t *sg, + uint32_t sgi, + const odp_thrmask_t *mask, + int join_leave) +{ + int thr; + uint32_t p; + + /* Internal function, do not validate inputs */ + + /* Notify relevant threads about the change */ + thr = odp_thrmask_first(mask); + while (0 <= thr) { + /* Add thread to scheduler group's wanted thread mask */ + if (join_leave == SCHED_GROUP_JOIN) + atom_bitset_set(&sg->thr_wanted, thr, __ATOMIC_RELAXED); + else + atom_bitset_clr(&sg->thr_wanted, thr, __ATOMIC_RELAXED); + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (sg->xcount[p] != 0) { + /* This priority level has ODP queues + * Notify the thread about membership in + * this group/priority + */ + if (join_leave == SCHED_GROUP_JOIN) + atom_bitset_set( + &thread_state[thr].sg_wanted[p], + sgi, + __ATOMIC_RELEASE); + else + atom_bitset_clr( + &thread_state[thr].sg_wanted[p], + sgi, + __ATOMIC_RELEASE); + __atomic_store_n(&thread_state[thr].sg_sem, + 1, + __ATOMIC_RELEASE); + } + } + thr = odp_thrmask_next(mask, thr); + } + + return 0; +} + +static int _schedule_group_thrmask(sched_group_t *sg, odp_thrmask_t *mask) +{ + bitset_t bs; + uint32_t bit; + + /* Internal function, do not validate inputs */ + + odp_thrmask_zero(mask); + bs = sg->thr_wanted; + while (!bitset_is_null(bs)) { + bit = bitset_ffs(bs) - 1; + bs = bitset_clr(bs, bit); + odp_thrmask_set(mask, bit); + } + + return 0; +} + +static odp_schedule_group_t schedule_group_create(const char *name, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_mask_t free; + uint32_t xfactor; + sched_group_t *sg; + uint32_t p; + uint32_t x; + uint32_t size; + + /* Validate inputs */ + if (mask == NULL) + ODP_ABORT("mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + /* Allocate a scheduler group */ + free = atom_bitset_load(&sg_free, __ATOMIC_RELAXED); + do { + /* All sched_groups in use */ + if (bitset_is_null(free)) + goto no_free_sched_group; + + sgi = bitset_ffs(free) - 1; + /* All sched_groups in use */ + if (sgi >= MAX_SCHED_GROUP) + goto no_free_sched_group; + } while (!atom_bitset_cmpxchg(&sg_free, + &free, + bitset_clr(free, sgi), + true, + __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE)); + + /* Compute xfactor (spread factor) from the number of threads + * present in the thread mask. Preferable this would be an + * explicit parameter. + */ + xfactor = odp_thrmask_count(mask); + if (xfactor < 1) + xfactor = CONFIG_DEFAULT_XFACTOR; + + size = sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * xfactor - 1) * sizeof(sched_queue_t); + sg = (sched_group_t *)shm_pool_alloc_align(sched_shm_pool, size); + if (sg == NULL) + goto shm_pool_alloc_failed; + + strncpy(sg->name, name ? name : "", ODP_SCHED_GROUP_NAME_LEN - 1); + sg_vec[sgi] = sg; + memset(sg->thr_actual, 0, sizeof(sg->thr_actual)); + sg->thr_wanted = bitset_null(); + sg->xfactor = xfactor; + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + sg->xcount[p] = 0; + for (x = 0; x < xfactor; x++) + schedq_init(&sg->schedq[p * xfactor + x], p); + } + if (odp_thrmask_count(mask) != 0) + schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN); + + odp_spinlock_unlock(&sched_grp_lock); + + return (odp_schedule_group_t)(sgi); + +shm_pool_alloc_failed: + /* Free the allocated group index */ + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELAXED); + +no_free_sched_group: + odp_spinlock_unlock(&sched_grp_lock); + + return ODP_SCHED_GROUP_INVALID; +} + +static int schedule_group_destroy(odp_schedule_group_t group) +{ + uint32_t sgi; + sched_group_t *sg; + uint32_t p; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) { + ret = -1; + goto invalid_group; + } + + if (sched_ts && + odp_unlikely(__atomic_load_n(&sched_ts->sg_sem, + __ATOMIC_RELAXED) != 0)) { + (void)__atomic_load_n(&sched_ts->sg_sem, + __ATOMIC_ACQUIRE); + sched_ts->sg_sem = 0; + update_sg_membership(sched_ts); + } + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + /* First ensure all threads have processed group_join/group_leave + * requests. + */ + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (sg->xcount[p] != 0) { + bitset_t wanted = atom_bitset_load( + &sg->thr_wanted, __ATOMIC_RELAXED); + + sevl(); + while (wfe() && + !bitset_is_eql(wanted, + bitset_monitor(&sg->thr_actual[p], + __ATOMIC_RELAXED))) + doze(); + } + /* Else ignore because no ODP queues on this prio */ + } + + /* Check if all threads/queues have left the group */ + for (p = 0; p < ODP_SCHED_PRIO_NUM; p++) { + if (!bitset_is_null(sg->thr_actual[p])) { + ODP_ERR("Group has threads\n"); + ret = -1; + goto thrd_q_present_in_group; + } + if (sg->xcount[p] != 0) { + ODP_ERR("Group has queues\n"); + ret = -1; + goto thrd_q_present_in_group; + } + } + + _odp_ishm_pool_free(sched_shm_pool, sg); + sg_vec[sgi] = NULL; + atom_bitset_set(&sg_free, sgi, __ATOMIC_RELEASE); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +thrd_q_present_in_group: + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + + return ret; +} + +static odp_schedule_group_t schedule_group_lookup(const char *name) +{ + uint32_t sgi; + odp_schedule_group_t group; + + /* Validate inputs */ + if (name == NULL) + ODP_ABORT("name or mask is NULL\n"); + + group = ODP_SCHED_GROUP_INVALID; + + odp_spinlock_lock(&sched_grp_lock); + + /* Scan through the schedule group array */ + for (sgi = 0; sgi < MAX_SCHED_GROUP; sgi++) { + if ((sg_vec[sgi] != NULL) && + (strncmp(name, sg_vec[sgi]->name, + ODP_SCHED_GROUP_NAME_LEN) == 0)) { + group = (odp_schedule_group_t)sgi; + break; + } + } + + odp_spinlock_unlock(&sched_grp_lock); + + return group; +} + +static int schedule_group_join(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) + return -1; + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + odp_spinlock_unlock(&sched_grp_lock); + return -1; + } + + sg = sg_vec[sgi]; + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_JOIN); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; +} + +static int schedule_group_leave(odp_schedule_group_t group, + const odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= (odp_schedule_group_t)MAX_SCHED_GROUP) { + ret = -1; + goto invalid_group; + } + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + + ret = schedule_group_update(sg, sgi, mask, SCHED_GROUP_LEAVE); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_group_thrmask(odp_schedule_group_t group, + odp_thrmask_t *mask) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) { + ret = -1; + goto invalid_group; + } + + if (mask == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + ret = _schedule_group_thrmask(sg, mask); + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_group_info(odp_schedule_group_t group, + odp_schedule_group_info_t *info) +{ + uint32_t sgi; + sched_group_t *sg; + int ret = 0; + + /* Validate inputs */ + if (group < 0 || group >= ((odp_schedule_group_t)MAX_SCHED_GROUP)) { + ret = -1; + goto invalid_group; + } + + if (info == NULL) + ODP_ABORT("name or mask is NULL\n"); + + odp_spinlock_lock(&sched_grp_lock); + + sgi = (uint32_t)group; + if (bitset_is_set(sg_free, sgi)) { + ret = -1; + goto group_not_found; + } + + sg = sg_vec[sgi]; + + ret = _schedule_group_thrmask(sg, &info->thrmask); + + info->name = sg->name; + + odp_spinlock_unlock(&sched_grp_lock); + + return ret; + +group_not_found: + odp_spinlock_unlock(&sched_grp_lock); + +invalid_group: + return ret; +} + +static int schedule_init_global(void) +{ + odp_thrmask_t mask; + odp_schedule_group_t tmp_all; + odp_schedule_group_t tmp_wrkr; + odp_schedule_group_t tmp_ctrl; + uint32_t bits; + uint32_t pool_size; + uint64_t min_alloc; + uint64_t max_alloc; + + /* Attach to the pool if it exists */ + sched_shm_pool = _odp_ishm_pool_lookup("sched_shm_pool"); + if (sched_shm_pool == NULL) { + /* Add storage required for sched groups. Assume worst case + * xfactor of MAXTHREADS. + */ + pool_size = (sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) * + sizeof(sched_queue_t)) * MAX_SCHED_GROUP; + /* Choose min_alloc and max_alloc such that slab allocator + * is selected. + */ + min_alloc = sizeof(sched_group_t) + + (ODP_SCHED_PRIO_NUM * MAXTHREADS - 1) * + sizeof(sched_queue_t); + max_alloc = min_alloc; + sched_shm_pool = _odp_ishm_pool_create("sched_shm_pool", + pool_size, + min_alloc, max_alloc, + _ODP_ISHM_SINGLE_VA); + if (sched_shm_pool == NULL) { + ODP_ERR("Failed to allocate shared memory pool " + "for sched\n"); + goto failed_sched_shm_pool_create; + } + } + + odp_spinlock_init(&sched_grp_lock); + + bits = MAX_SCHED_GROUP; + if (MAX_SCHED_GROUP == sizeof(sg_free) * CHAR_BIT) + sg_free = ~0; + else + sg_free = (1 << bits) - 1; + + for (uint32_t i = 0; i < MAX_SCHED_GROUP; i++) + sg_vec[i] = NULL; + for (uint32_t i = 0; i < MAXTHREADS; i++) { + thread_state[i].sg_sem = 0; + for (uint32_t j = 0; j < ODP_SCHED_PRIO_NUM; j++) + thread_state[i].sg_wanted[j] = bitset_null(); + } + + /* Create sched groups for default GROUP_ALL, GROUP_WORKER and + * GROUP_CONTROL groups. + */ + odp_thrmask_zero(&mask); + tmp_all = odp_schedule_group_create("__group_all", &mask); + if (tmp_all != ODP_SCHED_GROUP_ALL) { + ODP_ERR("Could not create ODP_SCHED_GROUP_ALL()\n"); + goto failed_create_group_all; + } + + tmp_wrkr = odp_schedule_group_create("__group_worker", &mask); + if (tmp_wrkr != ODP_SCHED_GROUP_WORKER) { + ODP_ERR("Could not create ODP_SCHED_GROUP_WORKER()\n"); + goto failed_create_group_worker; + } + + tmp_ctrl = odp_schedule_group_create("__group_control", &mask); + if (tmp_ctrl != ODP_SCHED_GROUP_CONTROL) { + ODP_ERR("Could not create ODP_SCHED_GROUP_CONTROL()\n"); + goto failed_create_group_control; + } + + return 0; + +failed_create_group_control: + if (tmp_ctrl != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL); + +failed_create_group_worker: + if (tmp_wrkr != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER); + +failed_create_group_all: + if (tmp_all != ODP_SCHED_GROUP_INVALID) + odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL); + +failed_sched_shm_pool_create: + + return -1; +} + +static int schedule_term_global(void) +{ + /* Destroy sched groups for default GROUP_ALL, GROUP_WORKER and + * GROUP_CONTROL groups. + */ + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_ALL) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_ALL\n"); + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_WORKER) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_WORKER\n"); + if (odp_schedule_group_destroy(ODP_SCHED_GROUP_CONTROL) != 0) + ODP_ERR("Failed to destroy ODP_SCHED_GROUP_CONTROL\n"); + + _odp_ishm_pool_destroy(sched_shm_pool); + + return 0; +} + +static int schedule_init_local(void) +{ + int thr_id; + odp_thread_type_t thr_type; + odp_thrmask_t mask; + + thr_id = odp_thread_id(); + if (thread_state_init(thr_id)) + goto failed_to_init_ts; + + /* Add this thread to default schedule groups */ + thr_type = odp_thread_type(); + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr_id); + + if (odp_schedule_group_join(ODP_SCHED_GROUP_ALL, &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_ALL\n"); + goto failed_to_join_grp_all; + } + if (thr_type == ODP_THREAD_CONTROL) { + if (odp_schedule_group_join(ODP_SCHED_GROUP_CONTROL, + &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_CONTROL\n"); + goto failed_to_join_grp_ctrl; + } + } else { + if (odp_schedule_group_join(ODP_SCHED_GROUP_WORKER, + &mask) != 0) { + ODP_ERR("Failed to join ODP_SCHED_GROUP_WORKER\n"); + goto failed_to_join_grp_wrkr; + } + } + + return 0; + +failed_to_join_grp_wrkr: + +failed_to_join_grp_ctrl: + odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask); + +failed_to_join_grp_all: +failed_to_init_ts: + + return -1; +} + +static int schedule_term_local(void) +{ + int thr_id; + odp_thread_type_t thr_type; + odp_thrmask_t mask; + int rc = 0; + + /* Remove this thread from default schedule groups */ + thr_id = odp_thread_id(); + thr_type = odp_thread_type(); + odp_thrmask_zero(&mask); + odp_thrmask_set(&mask, thr_id); + + if (odp_schedule_group_leave(ODP_SCHED_GROUP_ALL, &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_ALL\n"); + if (thr_type == ODP_THREAD_CONTROL) { + if (odp_schedule_group_leave(ODP_SCHED_GROUP_CONTROL, + &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_CONTROL\n"); + } else { + if (odp_schedule_group_leave(ODP_SCHED_GROUP_WORKER, + &mask) != 0) + ODP_ERR("Failed to leave ODP_SCHED_GROUP_WORKER\n"); + } + + update_sg_membership(sched_ts); + + /* Check if the thread is still part of any groups */ + if (sched_ts->num_schedq != 0) { + ODP_ERR("Thread %d still part of scheduler group(s)\n", + sched_ts->tidx); + rc = -1; + } + + return rc; +} + +static void pktio_start(int pktio_index, int num_in_queue, int in_queue_idx[]) +{ + int i; + uint32_t old, tag, j; + + for (i = 0; i < num_in_queue; i++) { + /* Try to reserve a slot */ + if (__atomic_fetch_add(&pktin_num, + 1, __ATOMIC_RELAXED) >= PKTIN_MAX) { + __atomic_fetch_sub(&pktin_num, 1, __ATOMIC_RELAXED); + ODP_ABORT("Too many pktio in queues for scheduler\n"); + } + /* A slot has been reserved, now we need to find an empty one */ + for (j = 0; ; j = (j + 1) % PKTIN_MAX) { + if (__atomic_load_n(&pktin_tags[j], + __ATOMIC_RELAXED) != TAG_EMPTY) + /* Slot used, continue with next */ + continue; + /* Empty slot found */ + old = TAG_EMPTY; + tag = PKTIO_QUEUE_2_TAG(pktio_index, in_queue_idx[i]); + if (__atomic_compare_exchange_n(&pktin_tags[j], + &old, + tag, + true, + __ATOMIC_RELEASE, + __ATOMIC_RELAXED)) { + /* Success grabbing slot,update high + * watermark + */ + __atomic_fetch_max(&pktin_hi, + j + 1, __ATOMIC_RELAXED); + /* One more tag (queue) for this pktio + * instance + */ + __atomic_fetch_add(&pktin_count[pktio_index], + 1, __ATOMIC_RELAXED); + /* Continue with next RX queue */ + break; + } + /* Failed to grab slot */ + } + } +} + +static int num_grps(void) +{ + return MAX_SCHED_GROUP; +} + +/* + * Stubs for internal scheduler abstraction layer due to absence of NULL + * checking before calling the function pointer. + */ + +static int thr_add(odp_schedule_group_t group, int thr) +{ + /* This function is a schedule_init_local duplicate. */ + (void)group; + (void)thr; + return 0; +} + +static int thr_rem(odp_schedule_group_t group, int thr) +{ + /* This function is a schedule_term_local duplicate. */ + (void)group; + (void)thr; + return 0; +} + +static int init_queue(uint32_t queue_index, + const odp_schedule_param_t *sched_param) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; + (void)sched_param; + return 0; +} + +static void destroy_queue(uint32_t queue_index) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; +} + +static int sched_queue(uint32_t queue_index) +{ + /* Not used in scalable scheduler. */ + (void)queue_index; + return 0; +} + +static int ord_enq_multi(queue_t handle, void *buf_hdr[], int num, + int *ret) + +{ + queue_entry_t *queue; + sched_scalable_thread_state_t *ts; + int actual; + + ts = sched_ts; + if (ts && odp_unlikely(ts->out_of_order)) { + queue = qentry_from_int(handle); + actual = rctx_save(queue, (odp_buffer_hdr_t **)buf_hdr, num); + *ret = actual; + return 1; + } + return 0; +} + +static void schedule_prefetch(int num) +{ + (void)num; +} + +/* Wait until we are in-order (when processing an ordered queue) + * Note: this function may be called also when processing other queue types + */ +static void order_lock(void) +{ + sched_scalable_thread_state_t *ts; + reorder_window_t *rwin; + uint32_t sn; + + ts = sched_ts; + if (odp_unlikely(ts->out_of_order)) { + /* We are processing ordered queue and are currently + * out-of-order. + * We are in-order when our reorder window slot number (sn) + * equals the head of the reorder window. + */ + ODP_ASSERT(ts->rctx != NULL); + rwin = ts->rctx->rwin; + sn = ts->rctx->sn; + sevl(); + /* Use acquire ordering to be on the safe side even if + * this isn't an acquire/release situation (aka lock). + */ + while (wfe() && + monitor32(&rwin->hc.head, __ATOMIC_ACQUIRE) != sn) + doze(); + } +} + +/* This function is unnecessary. + * The next thread becomes in-order when we release our reorder context + * (i.e. when odp_schedule() is called again. + */ +static void order_unlock(void) +{ +} + +static unsigned schedule_max_ordered_locks(void) +{ + return CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE; +} + +const schedule_fn_t schedule_scalable_fn = { + .pktio_start = pktio_start, + .thr_add = thr_add, + .thr_rem = thr_rem, + .num_grps = num_grps, + .init_queue = init_queue, + .destroy_queue = destroy_queue, + .sched_queue = sched_queue, + .ord_enq_multi = ord_enq_multi, + .init_global = schedule_init_global, + .term_global = schedule_term_global, + .init_local = schedule_init_local, + .term_local = schedule_term_local, + .order_lock = order_lock, + .order_unlock = order_unlock, + .max_ordered_locks = schedule_max_ordered_locks, +}; + +const schedule_api_t schedule_scalable_api = { + .schedule_wait_time = schedule_wait_time, + .schedule = schedule, + .schedule_multi = schedule_multi, + .schedule_pause = schedule_pause, + .schedule_resume = schedule_resume, + .schedule_release_atomic = schedule_release_atomic, + .schedule_release_ordered = schedule_release_ordered, + .schedule_prefetch = schedule_prefetch, + .schedule_num_prio = schedule_num_prio, + .schedule_group_create = schedule_group_create, + .schedule_group_destroy = schedule_group_destroy, + .schedule_group_lookup = schedule_group_lookup, + .schedule_group_join = schedule_group_join, + .schedule_group_leave = schedule_group_leave, + .schedule_group_thrmask = schedule_group_thrmask, + .schedule_group_info = schedule_group_info, + .schedule_order_lock = schedule_order_lock, + .schedule_order_unlock = schedule_order_unlock, +}; diff --git a/platform/linux-generic/odp_schedule_scalable_ordered.c b/platform/linux-generic/odp_schedule_scalable_ordered.c new file mode 100644 index 00000000..a4b3bb22 --- /dev/null +++ b/platform/linux-generic/odp_schedule_scalable_ordered.c @@ -0,0 +1,347 @@ +/* Copyright (c) 2017, ARM Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include <odp/api/shared_memory.h> +#include <odp_queue_scalable_internal.h> +#include <odp_schedule_if.h> +#include <odp_bitset.h> + +#include <string.h> + +extern __thread sched_scalable_thread_state_t *sched_ts; + +reorder_window_t *rwin_alloc(_odp_ishm_pool_t *pool, unsigned lock_count) +{ + reorder_window_t *rwin; + uint32_t i; + + rwin = (reorder_window_t *) + shm_pool_alloc_align(pool, sizeof(reorder_window_t)); + if (rwin == NULL) + return NULL; + + rwin->hc.head = 0; + rwin->hc.chgi = 0; + rwin->winmask = RWIN_SIZE - 1; + rwin->tail = 0; + rwin->turn = 0; + rwin->lock_count = (uint16_t)lock_count; + memset(rwin->olock, 0, sizeof(rwin->olock)); + for (i = 0; i < RWIN_SIZE; i++) + rwin->ring[i] = NULL; + + return rwin; +} + +int rwin_free(_odp_ishm_pool_t *pool, reorder_window_t *rwin) +{ + return _odp_ishm_pool_free(pool, rwin); +} + +bool rwin_reserve(reorder_window_t *rwin, uint32_t *sn) +{ + uint32_t head; + uint32_t oldt; + uint32_t newt; + uint32_t winmask; + + /* Read head and tail separately */ + oldt = rwin->tail; + winmask = rwin->winmask; + do { + /* Need __atomic_load to avoid compiler reordering */ + head = __atomic_load_n(&rwin->hc.head, __ATOMIC_RELAXED); + if (odp_unlikely(oldt - head >= winmask)) + return false; + + newt = oldt + 1; + } while (!__atomic_compare_exchange(&rwin->tail, + &oldt, + &newt, + true, + __ATOMIC_RELAXED, + __ATOMIC_RELAXED)); + *sn = oldt; + + return true; +} + +void rwin_insert(reorder_window_t *rwin, + reorder_context_t *rctx, + uint32_t sn, + void (*callback)(reorder_context_t *)) +{ + /* Initialise to silence scan-build */ + hc_t old = {0, 0}; + hc_t new; + uint32_t winmask; + + __atomic_load(&rwin->hc, &old, __ATOMIC_ACQUIRE); + winmask = rwin->winmask; + if (old.head != sn) { + /* We are out-of-order. Store context in reorder window, + * releasing its content. + */ + ODP_ASSERT(rwin->ring[sn & winmask] == NULL); + atomic_store_release(&rwin->ring[sn & winmask], + rctx, + /*readonly=*/false); + rctx = NULL; + do { + hc_t new; + + new.head = old.head; + new.chgi = old.chgi + 1; /* Changed value */ + /* Update head & chgi, fail if any has changed */ + if (__atomic_compare_exchange(&rwin->hc, + /* Updated on fail */ + &old, + &new, + true, + /* Rel our ring update */ + __ATOMIC_RELEASE, + __ATOMIC_ACQUIRE)) + /* CAS succeeded => head same (we are not + * in-order), chgi updated. + */ + return; + /* CAS failed => head and/or chgi changed. + * We might not be out-of-order anymore. + */ + } while (old.head != sn); + } + + /* old.head == sn => we are now in-order! */ + ODP_ASSERT(old.head == sn); + /* We are in-order so our responsibility to retire contexts */ + new.head = old.head; + new.chgi = old.chgi + 1; + + /* Retire our in-order context (if we still have it) */ + if (rctx != NULL) { + callback(rctx); + new.head++; + } + + /* Retire in-order contexts in the ring + * The first context might actually be ours (if we were originally + * out-of-order) + */ + do { + for (;;) { + rctx = __atomic_load_n(&rwin->ring[new.head & winmask], + __ATOMIC_ACQUIRE); + if (rctx == NULL) + break; + /* We are the only thread that are in-order + * (until head updated) so don't have to use + * atomic load-and-clear (exchange) + */ + rwin->ring[new.head & winmask] = NULL; + callback(rctx); + new.head++; + } + /* Update head&chgi, fail if chgi has changed (head cannot change) */ + } while (!__atomic_compare_exchange(&rwin->hc, + &old, /* Updated on failure */ + &new, + false, /* weak */ + __ATOMIC_RELEASE, /* Release our ring updates */ + __ATOMIC_ACQUIRE)); +} + +void rctx_init(reorder_context_t *rctx, uint16_t idx, + reorder_window_t *rwin, uint32_t sn) +{ + /* rctx->rvec_free and rctx->idx already initialised in + * thread_state_init function. + */ + ODP_ASSERT(rctx->idx == idx); + rctx->rwin = rwin; + rctx->sn = sn; + rctx->olock_flags = 0; + /* First => no next reorder context */ + rctx->next_idx = idx; + /* Where to store next event */ + rctx->cur_idx = idx; + rctx->numevts = 0; +} + +inline void rctx_free(const reorder_context_t *rctx) +{ + const reorder_context_t *const base = &rctx[-(int)rctx->idx]; + const uint32_t first = rctx->idx; + uint32_t next_idx; + + next_idx = rctx->next_idx; + + ODP_ASSERT(rctx->rwin != NULL); + /* Set free bit */ + if (rctx->rvec_free == &sched_ts->rvec_free) + /* Since it is our own reorder context, we can instead + * perform a non-atomic and relaxed update on our private + * rvec_free. + */ + sched_ts->priv_rvec_free = + bitset_set(sched_ts->priv_rvec_free, rctx->idx); + else + atom_bitset_set(rctx->rvec_free, rctx->idx, __ATOMIC_RELEASE); + + /* Can't dereference rctx after the corresponding free bit is set */ + while (next_idx != first) { + rctx = &base[next_idx]; + next_idx = rctx->next_idx; + /* Set free bit */ + if (rctx->rvec_free == &sched_ts->rvec_free) + sched_ts->priv_rvec_free = + bitset_set(sched_ts->priv_rvec_free, rctx->idx); + else + atom_bitset_set(rctx->rvec_free, rctx->idx, + __ATOMIC_RELEASE); + } +} + +inline void olock_unlock(const reorder_context_t *rctx, reorder_window_t *rwin, + uint32_t lock_index) +{ + if ((rctx->olock_flags & (1U << lock_index)) == 0) { + /* Use relaxed ordering, we are not releasing any updates */ + rwin->olock[lock_index] = rctx->sn + 1; + } +} + +void olock_release(const reorder_context_t *rctx) +{ + reorder_window_t *rwin; + + rwin = rctx->rwin; + + switch (rwin->lock_count) { + case 2: + olock_unlock(rctx, rwin, 1); + case 1: + olock_unlock(rctx, rwin, 0); + } + ODP_STATIC_ASSERT(NUM_OLOCKS == 2, "Number of ordered locks != 2"); +} + +static void blocking_enqueue(queue_entry_t *q, odp_buffer_hdr_t **evts, int num) +{ + int actual; + + /* Iterate until all events have been successfully enqueued */ + for (;;) { + /* Attempt to enqueue remaining events */ + actual = q->s.enqueue_multi(qentry_to_int(q), evts, num); + if (odp_unlikely(actual < 0)) + ODP_ERR("Failed to enqueue deferred events\n"); + /* Update for potential partial success */ + evts += actual; + num -= actual; + if (num == 0) + break; + /* Back-off to decrease load on the system */ + odp_cpu_pause(); + } +} + +void rctx_retire(reorder_context_t *first) +{ + reorder_context_t *rctx; + queue_entry_t *q; + uint32_t i; + uint32_t j; + uint32_t num; + + rctx = first; + do { + /* Process all events in this reorder context */ + for (i = 0; i < rctx->numevts;) { + q = rctx->destq[i]; + /* Find index of next different destq */ + j = i + 1; + while (j < rctx->numevts && rctx->destq[j] == q) + j++; + num = j - i; + /* Blocking enqueue of events to this destq */ + blocking_enqueue(q, &rctx->events[i], num); + i += num; + } + /* Update rctx pointer to point to 'next_idx' element */ + rctx += (int)rctx->next_idx - (int)rctx->idx; + } while (rctx != first); + olock_release(first); + rctx_free(first); +} + +void rctx_release(reorder_context_t *rctx) +{ + /* Insert reorder context into reorder window, potentially calling the + * rctx_retire function for all pending reorder_contexts. + */ + rwin_insert(rctx->rwin, rctx, rctx->sn, rctx_retire); +} + +/* Save destination queue and events in the reorder context for deferred + * enqueue. + */ +int rctx_save(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + sched_scalable_thread_state_t *ts; + reorder_context_t *first; + reorder_context_t *cur; + bitset_t next_idx; + + ts = sched_ts; + first = ts->rctx; + ODP_ASSERT(ts->rctx != NULL); + cur = &first[(int)first->cur_idx - (int)first->idx]; + for (i = 0; i < num; i++) { + if (odp_unlikely(cur->numevts == RC_EVT_SIZE)) { + /* No more space in current reorder context + * Try to allocate another. + */ + if (odp_unlikely( + bitset_is_null(ts->priv_rvec_free))) { + ts->priv_rvec_free = + atom_bitset_xchg( + &ts->rvec_free, + 0, + __ATOMIC_RELAXED); + if (odp_unlikely(bitset_is_null( + ts->priv_rvec_free))) + /* Out of reorder contexts. + * Return the number of events + * stored so far. + */ + return i; + } + next_idx = bitset_ffs(ts->priv_rvec_free) - 1; + ts->priv_rvec_free = + bitset_clr(ts->priv_rvec_free, + next_idx); + /* Link current to next (for eventual + * retiring) + */ + cur->next_idx = next_idx; + /* Link first to next (for next call to + * queue_enq_multi()) + */ + first->cur_idx = next_idx; + /* Update current to next */ + cur = &ts->rvec[next_idx]; + rctx_init(cur, next_idx, NULL, 0); + /* The last rctx (so far) */ + cur->next_idx = first->idx; + } + cur->events[cur->numevts] = buf_hdr[i]; + cur->destq[cur->numevts] = queue; + cur->numevts++; + } + /* All events stored. */ + return num; +} -- 2.13.1