Signed-off-by: Ola Liljedahl <ola.liljed...@arm.com> Reviewed-by: Brian Brooks <brian.bro...@arm.com> --- platform/linux-generic/Makefile.am | 1 + platform/linux-generic/include/odp_llqueue.h | 311 +++++++++++++++++++++++++++ 2 files changed, 312 insertions(+) create mode 100644 platform/linux-generic/include/odp_llqueue.h
diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 39322dc1..19e2241b 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -170,6 +170,7 @@ noinst_HEADERS = \ ${srcdir}/include/odp_errno_define.h \ ${srcdir}/include/odp_forward_typedefs_internal.h \ ${srcdir}/include/odp_internal.h \ + ${srcdir}/include/odp_llqueue.h \ ${srcdir}/include/odp_name_table_internal.h \ ${srcdir}/include/odp_packet_internal.h \ ${srcdir}/include/odp_packet_io_internal.h \ diff --git a/platform/linux-generic/include/odp_llqueue.h b/platform/linux-generic/include/odp_llqueue.h new file mode 100644 index 00000000..99b12e66 --- /dev/null +++ b/platform/linux-generic/include/odp_llqueue.h @@ -0,0 +1,311 @@ +/* Copyright (c) 2017, ARM Limited. All rights reserved. + * + * Copyright (c) 2017, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_LLQUEUE_H_ +#define ODP_LLQUEUE_H_ + +#include <odp/api/cpu.h> +#include <odp/api/hints.h> +#include <odp/api/spinlock.h> + +#include <odp_config_internal.h> +#include <odp_debug_internal.h> +#include <odp_cpu.h> + +#include <stdint.h> +#include <stdlib.h> + +/****************************************************************************** + * Linked list queues + *****************************************************************************/ + +struct llqueue; +struct llnode; + +static struct llnode *llq_head(struct llqueue *llq); +static void llqueue_init(struct llqueue *llq); +static void llq_enqueue(struct llqueue *llq, struct llnode *node); +static struct llnode *llq_dequeue(struct llqueue *llq); +static odp_bool_t llq_dequeue_cond(struct llqueue *llq, struct llnode *exp); +static odp_bool_t llq_cond_rotate(struct llqueue *llq, struct llnode *node); +static odp_bool_t llq_on_queue(struct llnode *node); + +/****************************************************************************** + * The implementation(s) + *****************************************************************************/ + +#define SENTINEL ((void *)~(uintptr_t)0) + +#ifdef CONFIG_LLDSCD +/* Implement queue operations using double-word LL/SC */ + +/* The scalar equivalent of a double pointer */ +#if __SIZEOF_PTRDIFF_T__ == 4 +typedef uint64_t dintptr_t; +#endif +#if __SIZEOF_PTRDIFF_T__ == 8 +typedef __int128 dintptr_t; +#endif + +struct llnode { + struct llnode *next; +}; + +union llht { + struct { + struct llnode *head, *tail; + } st; + dintptr_t ui; +}; + +struct llqueue { + union llht u; +}; + +static inline struct llnode *llq_head(struct llqueue *llq) +{ + return __atomic_load_n(&llq->u.st.head, __ATOMIC_RELAXED); +} + +static inline void llqueue_init(struct llqueue *llq) +{ + llq->u.st.head = NULL; + llq->u.st.tail = NULL; +} + +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) +{ + union llht old, neu; + + ODP_ASSERT(node->next == NULL); + node->next = SENTINEL; + do { + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); + neu.st.head = old.st.head == NULL ? node : old.st.head; + neu.st.tail = node; + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELEASE))); + if (old.st.tail != NULL) { + /* List was not empty */ + ODP_ASSERT(old.st.tail->next == SENTINEL); + old.st.tail->next = node; + } +} + +static inline struct llnode *llq_dequeue(struct llqueue *llq) +{ + struct llnode *head; + union llht old, neu; + + /* llq_dequeue() may be used in a busy-waiting fashion + * Read head using plain load to avoid disturbing remote LL/SC + */ + head = __atomic_load_n(&llq->u.st.head, __ATOMIC_ACQUIRE); + if (head == NULL) + return NULL; + /* Read head->next before LL to minimize cache miss latency + * in LL/SC below + */ + (void)__atomic_load_n(&head->next, __ATOMIC_RELAXED); + + do { + old.ui = lld(&llq->u.ui, __ATOMIC_RELAXED); + if (odp_unlikely(old.st.head == NULL)) { + /* Empty list */ + return NULL; + } else if (odp_unlikely(old.st.head == old.st.tail)) { + /* Single-element in list */ + neu.st.head = NULL; + neu.st.tail = NULL; + } else { + /* Multi-element list, dequeue head */ + struct llnode *next; + /* Wait until llq_enqueue() has written true next + * pointer + */ + while ((next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED)) == + SENTINEL) + odp_cpu_pause(); + neu.st.head = next; + neu.st.tail = old.st.tail; + } + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); + old.st.head->next = NULL; + return old.st.head; +} + +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, + struct llnode *exp) +{ + union llht old, neu; + + do { + old.ui = lld(&llq->u.ui, __ATOMIC_ACQUIRE); + if (odp_unlikely(old.st.head == NULL || old.st.head != exp)) { + /* Empty list or wrong head */ + return false; + } else if (odp_unlikely(old.st.head == old.st.tail)) { + /* Single-element in list */ + neu.st.head = NULL; + neu.st.tail = NULL; + } else { + /* Multi-element list, dequeue head */ + struct llnode *next; + + /* Wait until llq_enqueue() has written true next + * pointer */ + while ((next = __atomic_load_n(&old.st.head->next, + __ATOMIC_RELAXED)) == + SENTINEL) + odp_cpu_pause(); + + neu.st.head = next; + neu.st.tail = old.st.tail; + } + } while (odp_unlikely(scd(&llq->u.ui, neu.ui, __ATOMIC_RELAXED))); + old.st.head->next = NULL; + return true; +} + +/* If 'node' is a head of llq then move it to tail */ +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, + struct llnode *node) +{ + /* Difficult to make this into a single atomic operation + * Instead use existing primitives. + */ + if (odp_likely(llq_dequeue_cond(llq, node))) { + llq_enqueue(llq, node); + return true; + } + return false; +} + +static inline odp_bool_t llq_on_queue(struct llnode *node) +{ + return node->next != NULL; +} + +#else +/* Implement queue operations protected by a spin lock */ + +struct llnode { + struct llnode *next; +}; + +struct llqueue { + struct llnode *head, *tail; + odp_spinlock_t lock; +}; + +static inline struct llnode *llq_head(struct llqueue *llq) +{ + return __atomic_load_n(&llq->head, __ATOMIC_RELAXED); +} + +static inline void llqueue_init(struct llqueue *llq) +{ + llq->head = NULL; + llq->tail = NULL; + odp_spinlock_init(&llq->lock); +} + +static inline void llq_enqueue(struct llqueue *llq, struct llnode *node) +{ + ODP_ASSERT(node->next == NULL); + node->next = SENTINEL; + + odp_spinlock_lock(&llq->lock); + if (llq->head == NULL) { + llq->head = node; + llq->tail = node; + } else { + llq->tail->next = node; + llq->tail = node; + } + odp_spinlock_unlock(&llq->lock); +} + +static inline struct llnode *llq_dequeue(struct llqueue *llq) +{ + struct llnode *head; + struct llnode *node = NULL; + + head = __atomic_load_n(&llq->head, __ATOMIC_RELAXED); + if (head == NULL) + return NULL; + + odp_spinlock_lock(&llq->lock); + if (llq->head != NULL) { + node = llq->head; + if (llq->head == llq->tail) { + ODP_ASSERT(node->next == SENTINEL); + llq->head = NULL; + llq->tail = NULL; + } else { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + } + node->next = NULL; + } + odp_spinlock_unlock(&llq->lock); + return node; +} + +static inline odp_bool_t llq_dequeue_cond(struct llqueue *llq, + struct llnode *node) +{ + odp_bool_t success = false; + + odp_spinlock_lock(&llq->lock); + if (odp_likely(llq->head != NULL && llq->head == node)) { + success = true; + if (llq->head == llq->tail) { + ODP_ASSERT(node->next == SENTINEL); + llq->head = NULL; + llq->tail = NULL; + } else { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + } + node->next = NULL; + } + odp_spinlock_unlock(&llq->lock); + return success; +} + +/* If 'node' is a head of llq then move it to tail */ +static inline odp_bool_t llq_cond_rotate(struct llqueue *llq, + struct llnode *node) +{ + odp_bool_t success = false; + + odp_spinlock_lock(&llq->lock); + if (odp_likely(llq->head == node)) { + success = true; + if (llq->tail != node) { + ODP_ASSERT(node->next != SENTINEL); + llq->head = node->next; + llq->tail->next = node; + llq->tail = node; + node->next = SENTINEL; + } + /* Else 'node' is only element on list => nothing to do */ + } + odp_spinlock_unlock(&llq->lock); + return success; +} + +static inline odp_bool_t llq_on_queue(struct llnode *node) +{ + return node->next != NULL; +} + +#endif + +#endif -- 2.13.1