On 2018-12-14 16:45, Erik Gabriel Carrillo wrote:
This patch introduces a new version of the event timer adapter software
PMD. In the original design, timer event producer lcores in the primary
and secondary processes enqueued event timers into a ring, and a
service core in the primary process dequeued them and processed them
further.  To improve performance, this version does away with the ring
and lets lcores in both primary and secondary processes insert timers
directly into timer skiplist data structures; the service core directly
accesses the lists as well, when looking for timers that have expired.

Signed-off-by: Erik Gabriel Carrillo <erik.g.carri...@intel.com>
---
  lib/librte_eventdev/rte_event_timer_adapter.c | 688 +++++++++++---------------
  1 file changed, 276 insertions(+), 412 deletions(-)

diff --git a/lib/librte_eventdev/rte_event_timer_adapter.c 
b/lib/librte_eventdev/rte_event_timer_adapter.c
index 79070d4..029a45a 100644
--- a/lib/librte_eventdev/rte_event_timer_adapter.c
+++ b/lib/librte_eventdev/rte_event_timer_adapter.c
@@ -19,6 +19,7 @@
  #include <rte_timer.h>
  #include <rte_service_component.h>
  #include <rte_cycles.h>
+#include <rte_random.h>

You aren't using anything from rte_random.h.

/../

-static __rte_always_inline uint16_t
-__sw_event_timer_arm_burst(const struct rte_event_timer_adapter *adapter,
-                         struct rte_event_timer **evtims,
-                         uint16_t nb_evtims)
+static uint16_t
+__swtim_arm_burst(const struct rte_event_timer_adapter *adapter,
+               struct rte_event_timer **evtims,
+               uint16_t nb_evtims)
  {
-       uint16_t i;
-       int ret;
-       struct rte_event_timer_adapter_sw_data *sw_data;
-       struct msg *msgs[nb_evtims];
+       int i, ret;
+       struct swtim *sw = swtim_pmd_priv(adapter);
+       uint32_t lcore_id = rte_lcore_id();
+       struct rte_timer *tim, *tims[nb_evtims];
+       uint64_t cycles;
#ifdef RTE_LIBRTE_EVENTDEV_DEBUG
        /* Check that the service is running. */
@@ -1101,101 +979,104 @@ __sw_event_timer_arm_burst(const struct 
rte_event_timer_adapter *adapter,
        }
  #endif
- sw_data = adapter->data->adapter_priv;
+       /* Adjust lcore_id if non-EAL thread. Arbitrarily pick the timer list of
+        * the highest lcore to insert such timers into
+        */
+       if (lcore_id == LCORE_ID_ANY)
+               lcore_id = RTE_MAX_LCORE - 1;
+
+       /* If this is the first time we're arming an event timer on this lcore,
+        * mark this lcore as "in use"; this will cause the service
+        * function to process the timer list that corresponds to this lcore.
+        */
+       if (unlikely(rte_atomic16_test_and_set(&sw->in_use[lcore_id].v))) {
+               rte_spinlock_lock(&sw->poll_lcores_sl);
+               EVTIM_LOG_DBG("Adding lcore id = %u to list of lcores to poll",
+                             lcore_id);
+               sw->poll_lcores[sw->n_poll_lcores++] = lcore_id;
+               rte_spinlock_unlock(&sw->poll_lcores_sl);
+       }
- ret = rte_mempool_get_bulk(sw_data->msg_pool, (void **)msgs, nb_evtims);
+       ret = rte_mempool_get_bulk(sw->tim_pool, (void **)tims,
+                                  nb_evtims);
        if (ret < 0) {
                rte_errno = ENOSPC;
                return 0;
        }
- /* Let the service know we're producing messages for it to process */
-       rte_atomic16_inc(&sw_data->message_producer_count);
-
-       /* If the service is managing timers, wait for it to finish */
-       while (sw_data->service_phase == 2)
-               rte_pause();
-
-       rte_smp_rmb();
-
        for (i = 0; i < nb_evtims; i++) {
                /* Don't modify the event timer state in these cases */
                if (evtims[i]->state == RTE_EVENT_TIMER_ARMED) {
                        rte_errno = EALREADY;
                        break;
                } else if (!(evtims[i]->state == RTE_EVENT_TIMER_NOT_ARMED ||
-                   evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
+                            evtims[i]->state == RTE_EVENT_TIMER_CANCELED)) {
                        rte_errno = EINVAL;
                        break;
                }
ret = check_timeout(evtims[i], adapter);
-               if (ret == -1) {
+               if (unlikely(ret == -1)) {
                        evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOLATE;
                        rte_errno = EINVAL;
                        break;
-               }
-               if (ret == -2) {
+               } else if (unlikely(ret == -2)) {
                        evtims[i]->state = RTE_EVENT_TIMER_ERROR_TOOEARLY;
                        rte_errno = EINVAL;
                        break;
                }
- if (check_destination_event_queue(evtims[i], adapter) < 0) {
+               if (unlikely(check_destination_event_queue(evtims[i],
+                                                          adapter) < 0)) {
                        evtims[i]->state = RTE_EVENT_TIMER_ERROR;
                        rte_errno = EINVAL;
                        break;
                }
- /* Checks passed, set up a message to enqueue */
-               msgs[i]->type = MSG_TYPE_ARM;
-               msgs[i]->evtim = evtims[i];
+               tim = tims[i];
+               rte_timer_init(tim);
- /* Set the payload pointer if not set. */
-               if (evtims[i]->ev.event_ptr == NULL)
-                       evtims[i]->ev.event_ptr = evtims[i];
+               evtims[i]->impl_opaque[0] = (uintptr_t)tim;
+               evtims[i]->impl_opaque[1] = (uintptr_t)adapter;
- /* msg objects that get enqueued successfully will be freed
-                * either by a future cancel operation or by the timer
-                * expiration callback.
-                */
-               if (rte_ring_enqueue(sw_data->msg_ring, msgs[i]) < 0) {
-                       rte_errno = ENOSPC;
+               cycles = get_timeout_cycles(evtims[i], adapter);
+               ret = rte_timer_alt_reset(sw->timer_data_id, tim, cycles,
+                                         SINGLE, lcore_id, NULL, evtims[i]);
+               if (ret < 0) {
+                       /* tim was in RUNNING or CONFIG state */
+                       evtims[i]->state = RTE_EVENT_TIMER_ERROR;
                        break;
                }
- EVTIM_LOG_DBG("enqueued ARM message to ring");
-
+               rte_smp_wmb();
+               EVTIM_LOG_DBG("armed an event timer");
                evtims[i]->state = RTE_EVENT_TIMER_ARMED;

This looks like you want a reader to see the impl_opaque[] stores, before the state store, which sounds like a good idea.

However, I fail to find the corresponding read barriers on the reader side. Shouldn't swtim_cancel_burst() have such? It's loading state, and loading impl_opaque[], but there's no guarantee those memory loads happens in program order.

Reply via email to