This commit improves DLB credit handling scenarios when
ports hold on to credits but can't release them due to insufficient
accumulation (less than 2 * credit quanta).

Worker ports now release all accumulated credits when back-to-back
zero poll count reaches preset threshold.

Producer ports release all accumulated credits if enqueue fails for a
consecutive number of retries.

In a multi-producer system, some producer(s) may exit early while
holding on to credits. Now these are released during port unlink
which needs to be performed by the application.

test-eventdev is modified to call rte_event_port_unlink() to release
any accumulated credits by producer ports.

Signed-off-by: Abdullah Sevincer <abdullah.sevin...@intel.com>
---
 app/test-eventdev/test_perf_common.c |  20 +--
 drivers/event/dlb2/dlb2.c            | 203 +++++++++++++++++++++------
 drivers/event/dlb2/dlb2_priv.h       |   1 +
 drivers/event/dlb2/meson.build       |  12 ++
 drivers/event/dlb2/meson_options.txt |   6 +
 5 files changed, 194 insertions(+), 48 deletions(-)
 create mode 100644 drivers/event/dlb2/meson_options.txt

diff --git a/app/test-eventdev/test_perf_common.c 
b/app/test-eventdev/test_perf_common.c
index 93e6132de8..b3a12e12ac 100644
--- a/app/test-eventdev/test_perf_common.c
+++ b/app/test-eventdev/test_perf_common.c
@@ -854,6 +854,7 @@ perf_producer_wrapper(void *arg)
        struct rte_event_dev_info dev_info;
        struct prod_data *p  = arg;
        struct test_perf *t = p->t;
+       int ret = 0;
 
        rte_event_dev_info_get(p->dev_id, &dev_info);
        if (!t->opt->prod_enq_burst_sz) {
@@ -870,29 +871,32 @@ perf_producer_wrapper(void *arg)
         */
        if (t->opt->prod_type == EVT_PROD_TYPE_SYNT &&
                        t->opt->prod_enq_burst_sz == 1)
-               return perf_producer(arg);
+               ret = perf_producer(arg);
        else if (t->opt->prod_type == EVT_PROD_TYPE_SYNT &&
                        t->opt->prod_enq_burst_sz > 1) {
                if (dev_info.max_event_port_enqueue_depth == 1)
                        evt_err("This event device does not support burst 
mode");
                else
-                       return perf_producer_burst(arg);
+                       ret = perf_producer_burst(arg);
        }
        else if (t->opt->prod_type == EVT_PROD_TYPE_EVENT_TIMER_ADPTR &&
                        !t->opt->timdev_use_burst)
-               return perf_event_timer_producer(arg);
+               ret = perf_event_timer_producer(arg);
        else if (t->opt->prod_type == EVT_PROD_TYPE_EVENT_TIMER_ADPTR &&
                        t->opt->timdev_use_burst)
-               return perf_event_timer_producer_burst(arg);
+               ret = perf_event_timer_producer_burst(arg);
        else if (t->opt->prod_type == EVT_PROD_TYPE_EVENT_CRYPTO_ADPTR) {
                if (t->opt->prod_enq_burst_sz > 1)
-                       return perf_event_crypto_producer_burst(arg);
+                       ret = perf_event_crypto_producer_burst(arg);
                else
-                       return perf_event_crypto_producer(arg);
+                       ret = perf_event_crypto_producer(arg);
        } else if (t->opt->prod_type == EVT_PROD_TYPE_EVENT_DMA_ADPTR)
-               return perf_event_dma_producer(arg);
+               ret = perf_event_dma_producer(arg);
 
-       return 0;
+       /* Unlink port to release any acquired HW resources*/
+       rte_event_port_unlink(p->dev_id, p->port_id, &p->queue_id, 1);
+
+       return ret;
 }
 
 static inline uint64_t
diff --git a/drivers/event/dlb2/dlb2.c b/drivers/event/dlb2/dlb2.c
index 56db9bc937..b4217e2a50 100644
--- a/drivers/event/dlb2/dlb2.c
+++ b/drivers/event/dlb2/dlb2.c
@@ -43,7 +43,47 @@
  * to DLB can go ahead of relevant application writes like updates to buffers
  * being sent with event
  */
+#ifndef DLB2_BYPASS_FENCE_ON_PP
 #define DLB2_BYPASS_FENCE_ON_PP 0  /* 1 == Bypass fence, 0 == do not bypass */
+#endif
+
+/* HW credit checks can only be turned off for DLB2 device if following
+ * is true for each created eventdev
+ * LDB credits <= DIR credits + minimum CQ Depth
+ * (CQ Depth is minimum of all ports configured within eventdev)
+ * This needs to be true for all eventdevs created on any DLB2 device
+ * managed by this driver.
+ * DLB2.5 does not any such restriction as it has single credit pool
+ */
+#ifndef DLB_HW_CREDITS_CHECKS
+#define DLB_HW_CREDITS_CHECKS 1
+#endif
+
+/*
+ * SW credit checks can only be turned off if application has a way to
+ * limit input events to the eventdev below assigned credit limit
+ */
+#ifndef DLB_SW_CREDITS_CHECKS
+#define DLB_SW_CREDITS_CHECKS 1
+#endif
+
+/*
+ * To avoid deadlock situations, by default, per port new_event_threshold
+ * check is disabled. nb_events_limit is still checked while allocating
+ * new event credits.
+ */
+#define ENABLE_PORT_THRES_CHECK 1
+/*
+ * To avoid deadlock, ports holding to credits will release them after these
+ * many consecutive zero dequeues
+ */
+#define DLB2_ZERO_DEQ_CREDIT_RETURN_THRES 16384
+
+/*
+ * To avoid deadlock, ports holding to credits will release them after these
+ * many consecutive enqueue failures
+ */
+#define DLB2_ENQ_FAIL_CREDIT_RETURN_THRES 100
 
 /*
  * Resources exposed to eventdev. Some values overridden at runtime using
@@ -2488,6 +2528,61 @@ dlb2_event_queue_detach_ldb(struct dlb2_eventdev *dlb2,
        return ret;
 }
 
+static inline void
+dlb2_port_credits_return(struct dlb2_port *qm_port)
+{
+       /* Return all port credits */
+       if (qm_port->dlb2->version == DLB2_HW_V2_5) {
+               if (qm_port->cached_credits) {
+                       
__atomic_fetch_add(qm_port->credit_pool[DLB2_COMBINED_POOL],
+                                          qm_port->cached_credits, 
__ATOMIC_SEQ_CST);
+                       qm_port->cached_credits = 0;
+               }
+       } else {
+               if (qm_port->cached_ldb_credits) {
+                       __atomic_fetch_add(qm_port->credit_pool[DLB2_LDB_QUEUE],
+                                          qm_port->cached_ldb_credits, 
__ATOMIC_SEQ_CST);
+                       qm_port->cached_ldb_credits = 0;
+               }
+               if (qm_port->cached_dir_credits) {
+                       __atomic_fetch_add(qm_port->credit_pool[DLB2_DIR_QUEUE],
+                                          qm_port->cached_dir_credits, 
__ATOMIC_SEQ_CST);
+                       qm_port->cached_dir_credits = 0;
+               }
+       }
+}
+
+static inline void
+dlb2_release_sw_credits(struct dlb2_eventdev *dlb2,
+                       struct dlb2_eventdev_port *ev_port, uint16_t val)
+{
+       if (ev_port->inflight_credits) {
+               __atomic_fetch_sub(&dlb2->inflights, val, __ATOMIC_SEQ_CST);
+               ev_port->inflight_credits -= val;
+       }
+}
+
+static void dlb2_check_and_return_credits(struct dlb2_eventdev_port *ev_port,
+                                         bool cond, uint32_t threshold)
+{
+#if DLB_SW_CREDITS_CHECKS || DLB_HW_CREDITS_CHECKS
+       if (cond) {
+               if (++ev_port->credit_return_count > threshold) {
+#if DLB_SW_CREDITS_CHECKS
+                       dlb2_release_sw_credits(ev_port->dlb2, ev_port,
+                                               ev_port->inflight_credits);
+#endif
+#if DLB_HW_CREDITS_CHECKS
+                       dlb2_port_credits_return(&ev_port->qm_port);
+#endif
+                       ev_port->credit_return_count = 0;
+               }
+       } else {
+               ev_port->credit_return_count = 0;
+       }
+#endif
+}
+
 static int
 dlb2_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
                          uint8_t queues[], uint16_t nb_unlinks)
@@ -2507,14 +2602,15 @@ dlb2_eventdev_port_unlink(struct rte_eventdev *dev, 
void *event_port,
 
        if (queues == NULL || nb_unlinks == 0) {
                DLB2_LOG_DBG("dlb2: queues is NULL or nb_unlinks is 0\n");
-               return 0; /* Ignore and return success */
+               nb_unlinks = 0; /* Ignore and return success */
+               goto ret_credits;
        }
 
        if (ev_port->qm_port.is_directed) {
                DLB2_LOG_DBG("dlb2: ignore unlink from dir port %d\n",
                             ev_port->id);
                rte_errno = 0;
-               return nb_unlinks; /* as if success */
+               goto ret_credits;
        }
 
        dlb2 = ev_port->dlb2;
@@ -2553,6 +2649,10 @@ dlb2_eventdev_port_unlink(struct rte_eventdev *dev, void 
*event_port,
                ev_queue->num_links--;
        }
 
+ret_credits:
+       if (ev_port->inflight_credits)
+               dlb2_check_and_return_credits(ev_port, true, 0);
+
        return nb_unlinks;
 }
 
@@ -2752,8 +2852,7 @@ dlb2_replenish_sw_credits(struct dlb2_eventdev *dlb2,
                /* Replenish credits, saving one quanta for enqueues */
                uint16_t val = ev_port->inflight_credits - quanta;
 
-               __atomic_fetch_sub(&dlb2->inflights, val, __ATOMIC_SEQ_CST);
-               ev_port->inflight_credits -= val;
+               dlb2_release_sw_credits(dlb2, ev_port, val);
        }
 }
 
@@ -2924,7 +3023,9 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
 {
        struct dlb2_eventdev *dlb2 = ev_port->dlb2;
        struct dlb2_eventdev_queue *ev_queue;
+#if DLB_HW_CREDITS_CHECKS
        uint16_t *cached_credits = NULL;
+#endif
        struct dlb2_queue *qm_queue;
 
        ev_queue = &dlb2->ev_queues[ev->queue_id];
@@ -2936,6 +3037,7 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                goto op_check;
 
        if (!qm_queue->is_directed) {
+#if DLB_HW_CREDITS_CHECKS
                /* Load balanced destination queue */
 
                if (dlb2->version == DLB2_HW_V2) {
@@ -2951,6 +3053,7 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                        }
                        cached_credits = &qm_port->cached_credits;
                }
+#endif
                switch (ev->sched_type) {
                case RTE_SCHED_TYPE_ORDERED:
                        DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_ORDERED\n");
@@ -2981,7 +3084,7 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                }
        } else {
                /* Directed destination queue */
-
+#if DLB_HW_CREDITS_CHECKS
                if (dlb2->version == DLB2_HW_V2) {
                        if (dlb2_check_enqueue_hw_dir_credits(qm_port)) {
                                rte_errno = -ENOSPC;
@@ -2995,6 +3098,7 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                        }
                        cached_credits = &qm_port->cached_credits;
                }
+#endif
                DLB2_LOG_DBG("dlb2: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
 
                *sched_type = DLB2_SCHED_DIRECTED;
@@ -3002,6 +3106,7 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
 
 op_check:
        switch (ev->op) {
+#if DLB_SW_CREDITS_CHECKS
        case RTE_EVENT_OP_NEW:
                /* Check that a sw credit is available */
                if (dlb2_check_enqueue_sw_credits(dlb2, ev_port)) {
@@ -3009,7 +3114,10 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                        return 1;
                }
                ev_port->inflight_credits--;
+#endif
+#if DLB_HW_CREDITS_CHECKS
                (*cached_credits)--;
+#endif
                break;
        case RTE_EVENT_OP_FORWARD:
                /* Check for outstanding_releases underflow. If this occurs,
@@ -3020,10 +3128,14 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                RTE_ASSERT(ev_port->outstanding_releases > 0);
                ev_port->outstanding_releases--;
                qm_port->issued_releases++;
+#if DLB_HW_CREDITS_CHECKS
                (*cached_credits)--;
+#endif
                break;
        case RTE_EVENT_OP_RELEASE:
+#if DLB_SW_CREDITS_CHECKS
                ev_port->inflight_credits++;
+#endif
                /* Check for outstanding_releases underflow. If this occurs,
                 * the application is not using the EVENT_OPs correctly; for
                 * example, forwarding or releasing events that were not
@@ -3032,9 +3144,10 @@ dlb2_event_enqueue_prep(struct dlb2_eventdev_port 
*ev_port,
                RTE_ASSERT(ev_port->outstanding_releases > 0);
                ev_port->outstanding_releases--;
                qm_port->issued_releases++;
-
+#if DLB_SW_CREDITS_CHECKS
                /* Replenish s/w credits if enough are cached */
                dlb2_replenish_sw_credits(dlb2, ev_port);
+#endif
                break;
        }
 
@@ -3145,6 +3258,8 @@ __dlb2_event_enqueue_burst(void *event_port,
                        break;
        }
 
+       dlb2_check_and_return_credits(ev_port, !i, 
DLB2_ENQ_FAIL_CREDIT_RETURN_THRES);
+
        return i;
 }
 
@@ -3283,53 +3398,45 @@ dlb2_event_release(struct dlb2_eventdev *dlb2,
                return;
        }
        ev_port->outstanding_releases -= i;
+#if DLB_SW_CREDITS_CHECKS
        ev_port->inflight_credits += i;
 
        /* Replenish s/w credits if enough releases are performed */
        dlb2_replenish_sw_credits(dlb2, ev_port);
+#endif
 }
 
 static inline void
 dlb2_port_credits_inc(struct dlb2_port *qm_port, int num)
 {
        uint32_t batch_size = qm_port->hw_credit_quanta;
+       int val;
 
        /* increment port credits, and return to pool if exceeds threshold */
-       if (!qm_port->is_directed) {
-               if (qm_port->dlb2->version == DLB2_HW_V2) {
-                       qm_port->cached_ldb_credits += num;
-                       if (qm_port->cached_ldb_credits >= 2 * batch_size) {
-                               __atomic_fetch_add(
-                                       qm_port->credit_pool[DLB2_LDB_QUEUE],
-                                       batch_size, __ATOMIC_SEQ_CST);
-                               qm_port->cached_ldb_credits -= batch_size;
-                       }
-               } else {
-                       qm_port->cached_credits += num;
-                       if (qm_port->cached_credits >= 2 * batch_size) {
-                               __atomic_fetch_add(
-                                     qm_port->credit_pool[DLB2_COMBINED_POOL],
-                                     batch_size, __ATOMIC_SEQ_CST);
-                               qm_port->cached_credits -= batch_size;
-                       }
+       if (qm_port->dlb2->version == DLB2_HW_V2_5) {
+               qm_port->cached_credits += num;
+               if (qm_port->cached_credits >= 2 * batch_size) {
+                       val = qm_port->cached_credits - batch_size;
+                       __atomic_fetch_add(
+                           qm_port->credit_pool[DLB2_COMBINED_POOL], val,
+                           __ATOMIC_SEQ_CST);
+                       qm_port->cached_credits -= val;
+               }
+       } else if (!qm_port->is_directed) {
+               qm_port->cached_ldb_credits += num;
+               if (qm_port->cached_ldb_credits >= 2 * batch_size) {
+                       val = qm_port->cached_ldb_credits - batch_size;
+                       __atomic_fetch_add(qm_port->credit_pool[DLB2_LDB_QUEUE],
+                                          val, __ATOMIC_SEQ_CST);
+                       qm_port->cached_ldb_credits -= val;
                }
        } else {
-               if (qm_port->dlb2->version == DLB2_HW_V2) {
-                       qm_port->cached_dir_credits += num;
-                       if (qm_port->cached_dir_credits >= 2 * batch_size) {
-                               __atomic_fetch_add(
-                                       qm_port->credit_pool[DLB2_DIR_QUEUE],
-                                       batch_size, __ATOMIC_SEQ_CST);
-                               qm_port->cached_dir_credits -= batch_size;
-                       }
-               } else {
-                       qm_port->cached_credits += num;
-                       if (qm_port->cached_credits >= 2 * batch_size) {
-                               __atomic_fetch_add(
-                                     qm_port->credit_pool[DLB2_COMBINED_POOL],
-                                     batch_size, __ATOMIC_SEQ_CST);
-                               qm_port->cached_credits -= batch_size;
-                       }
+               qm_port->cached_dir_credits += num;
+               if (qm_port->cached_dir_credits >= 2 * batch_size) {
+                       val = qm_port->cached_dir_credits - batch_size;
+                       __atomic_fetch_add(qm_port->credit_pool[DLB2_DIR_QUEUE],
+                                          val, __ATOMIC_SEQ_CST);
+                       qm_port->cached_dir_credits -= val;
                }
        }
 }
@@ -3360,6 +3467,15 @@ dlb2_dequeue_wait(struct dlb2_eventdev *dlb2,
 
        /* Wait/poll time expired */
        if (elapsed_ticks >= timeout) {
+
+               /* Return all credits before blocking if remaining credits in
+                * system is less than quanta.
+                */
+               uint32_t sw_inflights = __atomic_load_n(&dlb2->inflights, 
__ATOMIC_SEQ_CST);
+               uint32_t quanta = ev_port->credit_update_quanta;
+
+               if (dlb2->new_event_limit - sw_inflights < quanta)
+                       dlb2_check_and_return_credits(ev_port, true, 0);
                return 1;
        } else if (dlb2->umwait_allowed) {
                struct rte_power_monitor_cond pmc;
@@ -4222,8 +4338,9 @@ dlb2_hw_dequeue(struct dlb2_eventdev *dlb2,
                        dlb2_consume_qe_immediate(qm_port, num);
 
                ev_port->outstanding_releases += num;
-
+#if DLB_HW_CREDITS_CHECKS
                dlb2_port_credits_inc(qm_port, num);
+#endif
        }
 
        return num;
@@ -4257,6 +4374,9 @@ dlb2_event_dequeue_burst(void *event_port, struct 
rte_event *ev, uint16_t num,
        DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
        DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
 
+       dlb2_check_and_return_credits(ev_port, !cnt,
+                                     DLB2_ZERO_DEQ_CREDIT_RETURN_THRES);
+
        return cnt;
 }
 
@@ -4293,6 +4413,9 @@ dlb2_event_dequeue_burst_sparse(void *event_port, struct 
rte_event *ev,
 
        DLB2_INC_STAT(ev_port->stats.traffic.total_polls, 1);
        DLB2_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
+
+       dlb2_check_and_return_credits(ev_port, !cnt,
+                                     DLB2_ZERO_DEQ_CREDIT_RETURN_THRES);
        return cnt;
 }
 
diff --git a/drivers/event/dlb2/dlb2_priv.h b/drivers/event/dlb2/dlb2_priv.h
index dc9f98e142..fd76b5b9fb 100644
--- a/drivers/event/dlb2/dlb2_priv.h
+++ b/drivers/event/dlb2/dlb2_priv.h
@@ -527,6 +527,7 @@ struct __rte_cache_aligned dlb2_eventdev_port {
        struct rte_event_port_conf conf; /* user-supplied configuration */
        uint16_t inflight_credits; /* num credits this port has right now */
        uint16_t credit_update_quanta;
+       uint32_t credit_return_count; /* count till the credit return condition 
is true */
        struct dlb2_eventdev *dlb2; /* backlink optimization */
        alignas(RTE_CACHE_LINE_SIZE) struct dlb2_port_stats stats;
        struct dlb2_event_queue_link link[DLB2_MAX_NUM_QIDS_PER_LDB_CQ];
diff --git a/drivers/event/dlb2/meson.build b/drivers/event/dlb2/meson.build
index 515d1795fe..77a197e32c 100644
--- a/drivers/event/dlb2/meson.build
+++ b/drivers/event/dlb2/meson.build
@@ -68,3 +68,15 @@ endif
 headers = files('rte_pmd_dlb2.h')
 
 deps += ['mbuf', 'mempool', 'ring', 'pci', 'bus_pci']
+
+if meson.version().version_compare('> 0.58.0')
+fs = import('fs')
+dlb_options = fs.read('meson_options.txt').strip().split('\n')
+
+foreach opt: dlb_options
+       if (opt.strip().startswith('#') or opt.strip() == '')
+               continue
+       endif
+       cflags += '-D' + opt.strip().to_upper().replace(' ','')
+endforeach
+endif
diff --git a/drivers/event/dlb2/meson_options.txt 
b/drivers/event/dlb2/meson_options.txt
new file mode 100644
index 0000000000..69be6f41c1
--- /dev/null
+++ b/drivers/event/dlb2/meson_options.txt
@@ -0,0 +1,6 @@
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright(c) 2023-2024 Intel Corporation
+
+DLB2_BYPASS_FENCE_ON_PP = 0
+DLB_HW_CREDITS_CHECKS = 0
+DLB_SW_CREDITS_CHECKS = 1
-- 
2.25.1

Reply via email to