This commit introduces a capability for disabling the "implicit" release
functionality for a port, which prevents the eventdev PMD from issuing
outstanding releases for previously dequeued events when dequeuing a new
batch of events.

If a PMD does not support this capability, the application will receive an
error if it attempts to setup a port with implicit releases disabled.
Otherwise, if the port is configured with implicit releases disabled, the
application must release each dequeued event by invoking
rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE or
RTE_EVENT_OP_FORWARD.

Signed-off-by: Gage Eads <gage.e...@intel.com>
Acked-by: Harry van Haaren <harry.van.haa...@intel.com>
---
v2:
 - Merged two lines into one in dpaa2_eventdev_port_def_conf().
 - Used '0' instead of 'false' in skeleton_eventdev_port_def_conf().
 - Replaced instances of 'bool' with 'uint8_t' in
   eventdev_pipeline_sw_pmd/main.c.
 - Put both worker_loopback() invocations in the same if block in
   test_sw_eventdev().

 drivers/event/dpaa2/dpaa2_eventdev.c       |  1 +
 drivers/event/octeontx/ssovf_evdev.c       |  1 +
 drivers/event/skeleton/skeleton_eventdev.c |  1 +
 drivers/event/sw/sw_evdev.c                | 10 +++++++---
 drivers/event/sw/sw_evdev.h                |  1 +
 drivers/event/sw/sw_evdev_worker.c         | 16 ++++++++--------
 examples/eventdev_pipeline_sw_pmd/main.c   | 24 +++++++++++++++++++++++-
 lib/librte_eventdev/rte_eventdev.c         |  9 +++++++++
 lib/librte_eventdev/rte_eventdev.h         | 23 ++++++++++++++++++++---
 test/test/test_eventdev.c                  |  9 +++++++++
 test/test/test_eventdev_sw.c               | 20 ++++++++++++++++----
 11 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/drivers/event/dpaa2/dpaa2_eventdev.c 
b/drivers/event/dpaa2/dpaa2_eventdev.c
index eeeb231..13e7122 100644
--- a/drivers/event/dpaa2/dpaa2_eventdev.c
+++ b/drivers/event/dpaa2/dpaa2_eventdev.c
@@ -440,6 +440,7 @@ dpaa2_eventdev_port_def_conf(struct rte_eventdev *dev, 
uint8_t port_id,
                DPAA2_EVENT_MAX_PORT_DEQUEUE_DEPTH;
        port_conf->enqueue_depth =
                DPAA2_EVENT_MAX_PORT_ENQUEUE_DEPTH;
+       port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/octeontx/ssovf_evdev.c 
b/drivers/event/octeontx/ssovf_evdev.c
index 117b145..b80a6c0 100644
--- a/drivers/event/octeontx/ssovf_evdev.c
+++ b/drivers/event/octeontx/ssovf_evdev.c
@@ -252,6 +252,7 @@ ssovf_port_def_conf(struct rte_eventdev *dev, uint8_t 
port_id,
        port_conf->new_event_threshold = edev->max_num_events;
        port_conf->dequeue_depth = 1;
        port_conf->enqueue_depth = 1;
+       port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/skeleton/skeleton_eventdev.c 
b/drivers/event/skeleton/skeleton_eventdev.c
index bb554c3..3b448b2 100644
--- a/drivers/event/skeleton/skeleton_eventdev.c
+++ b/drivers/event/skeleton/skeleton_eventdev.c
@@ -237,6 +237,7 @@ skeleton_eventdev_port_def_conf(struct rte_eventdev *dev, 
uint8_t port_id,
        port_conf->new_event_threshold = 32 * 1024;
        port_conf->dequeue_depth = 16;
        port_conf->enqueue_depth = 16;
+       port_conf->disable_implicit_release = 0;
 }
 
 static void
diff --git a/drivers/event/sw/sw_evdev.c b/drivers/event/sw/sw_evdev.c
index 5b4a208..1ef6340 100644
--- a/drivers/event/sw/sw_evdev.c
+++ b/drivers/event/sw/sw_evdev.c
@@ -191,6 +191,7 @@ sw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
        }
 
        p->inflight_max = conf->new_event_threshold;
+       p->implicit_release = !conf->disable_implicit_release;
 
        /* check if ring exists, same as rx_worker above */
        snprintf(buf, sizeof(buf), "sw%d_p%u, %s", dev->data->dev_id,
@@ -413,6 +414,7 @@ sw_port_def_conf(struct rte_eventdev *dev, uint8_t port_id,
        port_conf->new_event_threshold = 1024;
        port_conf->dequeue_depth = 16;
        port_conf->enqueue_depth = 16;
+       port_conf->disable_implicit_release = 0;
 }
 
 static int
@@ -482,9 +484,11 @@ sw_info_get(struct rte_eventdev *dev, struct 
rte_event_dev_info *info)
                        .max_event_port_dequeue_depth = MAX_SW_CONS_Q_DEPTH,
                        .max_event_port_enqueue_depth = MAX_SW_PROD_Q_DEPTH,
                        .max_num_events = SW_INFLIGHT_EVENTS_TOTAL,
-                       .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
-                                       RTE_EVENT_DEV_CAP_BURST_MODE |
-                                       RTE_EVENT_DEV_CAP_EVENT_QOS),
+                       .event_dev_cap = (
+                               RTE_EVENT_DEV_CAP_QUEUE_QOS |
+                               RTE_EVENT_DEV_CAP_BURST_MODE |
+                               RTE_EVENT_DEV_CAP_EVENT_QOS |
+                               RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE),
        };
 
        *info = evdev_sw_info;
diff --git a/drivers/event/sw/sw_evdev.h b/drivers/event/sw/sw_evdev.h
index 0859388..d08f7d0 100644
--- a/drivers/event/sw/sw_evdev.h
+++ b/drivers/event/sw/sw_evdev.h
@@ -213,6 +213,7 @@ struct sw_port {
        uint16_t outstanding_releases __rte_cache_aligned;
        uint16_t inflight_max; /* app requested max inflights for this port */
        uint16_t inflight_credits; /* num credits this port has right now */
+       uint8_t implicit_release; /* release events before dequeueing */
 
        uint16_t last_dequeue_burst_sz; /* how big the burst was */
        uint64_t last_dequeue_ticks; /* used to track burst processing time */
diff --git a/drivers/event/sw/sw_evdev_worker.c 
b/drivers/event/sw/sw_evdev_worker.c
index b3b3b17..93cd29b 100644
--- a/drivers/event/sw/sw_evdev_worker.c
+++ b/drivers/event/sw/sw_evdev_worker.c
@@ -109,7 +109,7 @@ sw_event_enqueue_burst(void *port, const struct rte_event 
ev[], uint16_t num)
                        return 0;
        }
 
-       uint32_t forwards = 0;
+       uint32_t completions = 0;
        for (i = 0; i < num; i++) {
                int op = ev[i].op;
                int outstanding = p->outstanding_releases > 0;
@@ -118,7 +118,6 @@ sw_event_enqueue_burst(void *port, const struct rte_event 
ev[], uint16_t num)
                p->inflight_credits -= (op == RTE_EVENT_OP_NEW);
                p->inflight_credits += (op == RTE_EVENT_OP_RELEASE) *
                                        outstanding;
-               forwards += (op == RTE_EVENT_OP_FORWARD);
 
                new_ops[i] = sw_qe_flag_map[op];
                new_ops[i] &= ~(invalid_qid << QE_FLAG_VALID_SHIFT);
@@ -127,8 +126,10 @@ sw_event_enqueue_burst(void *port, const struct rte_event 
ev[], uint16_t num)
                 * correct usage of the API), providing very high correct
                 * prediction rate.
                 */
-               if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding)
+               if ((new_ops[i] & QE_FLAG_COMPLETE) && outstanding) {
                        p->outstanding_releases--;
+                       completions++;
+               }
 
                /* error case: branch to avoid touching p->stats */
                if (unlikely(invalid_qid)) {
@@ -137,8 +138,8 @@ sw_event_enqueue_burst(void *port, const struct rte_event 
ev[], uint16_t num)
                }
        }
 
-       /* handle directed port forward credits */
-       p->inflight_credits -= forwards * p->is_directed;
+       /* handle directed port forward and release credits */
+       p->inflight_credits -= completions * p->is_directed;
 
        /* returns number of events actually enqueued */
        uint32_t enq = enqueue_burst_with_ops(p->rx_worker_ring, ev, i,
@@ -172,7 +173,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, 
uint16_t num,
        uint32_t credit_update_quanta = sw->credit_update_quanta;
 
        /* check that all previous dequeues have been released */
-       if (!p->is_directed) {
+       if (p->implicit_release && !p->is_directed) {
                uint16_t out_rels = p->outstanding_releases;
                uint16_t i;
                for (i = 0; i < out_rels; i++)
@@ -182,7 +183,6 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, 
uint16_t num,
        /* returns number of events actually dequeued */
        uint16_t ndeq = rte_event_ring_dequeue_burst(ring, ev, num, NULL);
        if (unlikely(ndeq == 0)) {
-               p->outstanding_releases = 0;
                p->zero_polls++;
                p->total_polls++;
                goto end;
@@ -190,7 +190,7 @@ sw_event_dequeue_burst(void *port, struct rte_event *ev, 
uint16_t num,
 
        /* only add credits for directed ports - LB ports send RELEASEs */
        p->inflight_credits += ndeq * p->is_directed;
-       p->outstanding_releases = ndeq;
+       p->outstanding_releases += ndeq;
        p->last_dequeue_burst_sz = ndeq;
        p->last_dequeue_ticks = rte_get_timer_cycles();
        p->poll_buckets[(ndeq - 1) >> SW_DEQ_STAT_BUCKET_SHIFT]++;
diff --git a/examples/eventdev_pipeline_sw_pmd/main.c 
b/examples/eventdev_pipeline_sw_pmd/main.c
index 5f431d8..2e9a6d2 100644
--- a/examples/eventdev_pipeline_sw_pmd/main.c
+++ b/examples/eventdev_pipeline_sw_pmd/main.c
@@ -62,6 +62,7 @@ struct prod_data {
 struct cons_data {
        uint8_t dev_id;
        uint8_t port_id;
+       uint8_t release;
 } __rte_cache_aligned;
 
 static struct prod_data prod_data;
@@ -167,6 +168,18 @@ consumer(void)
                uint8_t outport = packets[i].mbuf->port;
                rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
                                packets[i].mbuf);
+
+               packets[i].op = RTE_EVENT_OP_RELEASE;
+       }
+
+       if (cons_data.release) {
+               uint16_t nb_tx;
+
+               nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
+               while (nb_tx < n)
+                       nb_tx += rte_event_enqueue_burst(dev_id, port_id,
+                                                        packets + nb_tx,
+                                                        n - nb_tx);
        }
 
        /* Print out mpps every 1<22 packets */
@@ -702,6 +715,7 @@ setup_eventdev(struct prod_data *prod_data,
        };
 
        struct port_link worker_queues[MAX_NUM_STAGES];
+       uint8_t disable_implicit_release;
        struct port_link tx_queue;
        unsigned int i;
 
@@ -715,6 +729,12 @@ setup_eventdev(struct prod_data *prod_data,
        ret = rte_event_dev_info_get(dev_id, &dev_info);
        printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
 
+       disable_implicit_release = (dev_info.event_dev_cap &
+                                   RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
+
+       wkr_p_conf.disable_implicit_release = disable_implicit_release;
+       tx_p_conf.disable_implicit_release = disable_implicit_release;
+
        if (dev_info.max_event_port_dequeue_depth <
                        config.nb_event_port_dequeue_depth)
                config.nb_event_port_dequeue_depth =
@@ -823,6 +843,7 @@ setup_eventdev(struct prod_data *prod_data,
                        .dequeue_depth = 8,
                        .enqueue_depth = 8,
                        .new_event_threshold = 1200,
+                       .disable_implicit_release = disable_implicit_release,
        };
 
        if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
@@ -839,7 +860,8 @@ setup_eventdev(struct prod_data *prod_data,
                                        .port_id = i + 1,
                                        .qid = cdata.qid[0] };
        *cons_data = (struct cons_data){.dev_id = dev_id,
-                                       .port_id = i };
+                                       .port_id = i,
+                                       .release = disable_implicit_release };
 
        ret = rte_event_dev_service_id_get(dev_id,
                                &fdata->evdev_service_id);
diff --git a/lib/librte_eventdev/rte_eventdev.c 
b/lib/librte_eventdev/rte_eventdev.c
index e17f8fc..3e93c03 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -686,6 +686,15 @@ rte_event_port_setup(uint8_t dev_id, uint8_t port_id,
                return -EINVAL;
        }
 
+       if (port_conf && port_conf->disable_implicit_release &&
+           !(dev->data->event_dev_cap &
+             RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+               RTE_EDEV_LOG_ERR(
+                  "dev%d port%d Implicit release disable not supported",
+                       dev_id, port_id);
+               return -EINVAL;
+       }
+
        if (dev->data->dev_started) {
                RTE_EDEV_LOG_ERR(
                    "device %d must be stopped to allow port setup", dev_id);
diff --git a/lib/librte_eventdev/rte_eventdev.h 
b/lib/librte_eventdev/rte_eventdev.h
index f1949ff..8377b3f 100644
--- a/lib/librte_eventdev/rte_eventdev.h
+++ b/lib/librte_eventdev/rte_eventdev.h
@@ -282,6 +282,16 @@ struct rte_mbuf; /* we just use mbuf pointers; no need to 
include rte_mbuf.h */
  *
  * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
  */
+#define RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE    (1ULL << 5)
+/**< Event device ports support disabling the implicit release feature, in
+ * which the port will release all unreleased events in its dequeue operation.
+ * If this capability is set and the port is configured with implicit release
+ * disabled, the application is responsible for explicitly releasing events
+ * using either the RTE_EVENT_OP_FORWARD or the RTE_EVENT_OP_RELEASE event
+ * enqueue operations.
+ *
+ * @see rte_event_dequeue_burst() rte_event_enqueue_burst()
+ */
 
 /* Event device priority levels */
 #define RTE_EVENT_DEV_PRIORITY_HIGHEST   0
@@ -686,6 +696,13 @@ struct rte_event_port_conf {
         * which previously supplied to rte_event_dev_configure().
         * Ignored when device is not RTE_EVENT_DEV_CAP_BURST_MODE capable.
         */
+       uint8_t disable_implicit_release;
+       /**< Configure the port not to release outstanding events in
+        * rte_event_dev_dequeue_burst(). If true, all events received through
+        * the port must be explicitly released with RTE_EVENT_OP_RELEASE or
+        * RTE_EVENT_OP_FORWARD. Must be false when the device is not
+        * RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE capable.
+        */
 };
 
 /**
@@ -1381,9 +1398,9 @@ rte_event_dequeue_timeout_ticks(uint8_t dev_id, uint64_t 
ns,
  *
  * The number of events dequeued is the number of scheduler contexts held by
  * this port. These contexts are automatically released in the next
- * rte_event_dequeue_burst() invocation, or invoking rte_event_enqueue_burst()
- * with RTE_EVENT_OP_RELEASE operation can be used to release the
- * contexts early.
+ * rte_event_dequeue_burst() invocation if the port supports implicit
+ * releases, or invoking rte_event_enqueue_burst() with RTE_EVENT_OP_RELEASE
+ * operation can be used to release the contexts early.
  *
  * Event operations RTE_EVENT_OP_FORWARD and RTE_EVENT_OP_RELEASE must only be
  * enqueued to the same port that their associated events were dequeued from.
diff --git a/test/test/test_eventdev.c b/test/test/test_eventdev.c
index 1ed2a1d..f8ee1be 100644
--- a/test/test/test_eventdev.c
+++ b/test/test/test_eventdev.c
@@ -581,6 +581,15 @@ test_eventdev_port_setup(void)
        ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
        TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
 
+       if (!(info.event_dev_cap &
+             RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE)) {
+               pconf.enqueue_depth = info.max_event_port_enqueue_depth;
+               pconf.disable_implicit_release = 1;
+               ret = rte_event_port_setup(TEST_DEV_ID, 0, &pconf);
+               TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
+               pconf.disable_implicit_release = 0;
+       }
+
        ret = rte_event_port_setup(TEST_DEV_ID, info.max_event_ports,
                                        &pconf);
        TEST_ASSERT(ret == -EINVAL, "Expected -EINVAL, %d", ret);
diff --git a/test/test/test_eventdev_sw.c b/test/test/test_eventdev_sw.c
index 96ed920..4108b00 100644
--- a/test/test/test_eventdev_sw.c
+++ b/test/test/test_eventdev_sw.c
@@ -200,6 +200,7 @@ create_ports(struct test *t, int num_ports)
                        .new_event_threshold = 1024,
                        .dequeue_depth = 32,
                        .enqueue_depth = 64,
+                       .disable_implicit_release = 0,
        };
        if (num_ports > MAX_PORTS)
                return -1;
@@ -1254,6 +1255,7 @@ port_reconfig_credits(struct test *t)
                                .new_event_threshold = 128,
                                .dequeue_depth = 32,
                                .enqueue_depth = 64,
+                               .disable_implicit_release = 0,
                };
                if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
                        printf("%d Error setting up port\n", __LINE__);
@@ -1343,6 +1345,7 @@ port_single_lb_reconfig(struct test *t)
                .new_event_threshold = 128,
                .dequeue_depth = 32,
                .enqueue_depth = 64,
+               .disable_implicit_release = 0,
        };
        if (rte_event_port_setup(evdev, 0, &port_conf) < 0) {
                printf("%d Error setting up port\n", __LINE__);
@@ -2935,7 +2938,7 @@ worker_loopback_producer_fn(void *arg)
 }
 
 static int
-worker_loopback(struct test *t)
+worker_loopback(struct test *t, uint8_t disable_implicit_release)
 {
        /* use a single producer core, and a worker core to see what happens
         * if the worker loops packets back multiple times
@@ -2961,6 +2964,7 @@ worker_loopback(struct test *t)
         * only be initialized once - and this needs to be set for multiple runs
         */
        conf.new_event_threshold = 512;
+       conf.disable_implicit_release = disable_implicit_release;
 
        if (rte_event_port_setup(evdev, 0, &conf) < 0) {
                printf("Error setting up RX port\n");
@@ -3235,15 +3239,23 @@ test_sw_eventdev(void)
        }
        if (rte_lcore_count() >= 3) {
                printf("*** Running Worker loopback test...\n");
-               ret = worker_loopback(t);
+               ret = worker_loopback(t, 0);
+               if (ret != 0) {
+                       printf("ERROR - Worker loopback test FAILED.\n");
+                       return ret;
+               }
+
+               printf("*** Running Worker loopback test (implicit release 
disabled)...\n");
+               ret = worker_loopback(t, 1);
                if (ret != 0) {
                        printf("ERROR - Worker loopback test FAILED.\n");
                        return ret;
                }
        } else {
-               printf("### Not enough cores for worker loopback test.\n");
-               printf("### Need at least 3 cores for test.\n");
+               printf("### Not enough cores for worker loopback tests.\n");
+               printf("### Need at least 3 cores for the tests.\n");
        }
+
        /*
         * Free test instance, leaving mempool initialized, and a pointer to it
         * in static eventdev_func_mempool, as it is re-used on re-runs
-- 
2.7.4

Reply via email to