From: Pavan Nikhilesh <pbhagavat...@marvell.com>

Add event vector support for event eth Rx adapter, the implementation
creates vector flows based on port and queue identifier of the received
mbufs.

Signed-off-by: Pavan Nikhilesh <pbhagavat...@marvell.com>
---
 lib/librte_eventdev/eventdev_pmd.h            |   7 +-
 .../rte_event_eth_rx_adapter.c                | 257 ++++++++++++++++--
 lib/librte_eventdev/rte_eventdev.c            |   6 +-
 3 files changed, 250 insertions(+), 20 deletions(-)

diff --git a/lib/librte_eventdev/eventdev_pmd.h 
b/lib/librte_eventdev/eventdev_pmd.h
index 9297f1433..0f724ac85 100644
--- a/lib/librte_eventdev/eventdev_pmd.h
+++ b/lib/librte_eventdev/eventdev_pmd.h
@@ -69,9 +69,10 @@ extern "C" {
        } \
 } while (0)
 
-#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
-               ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
-                       (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
+#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                        
\
+       ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
+        (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
+        (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
 
 #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
                RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c 
b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
index ac8ba5bf0..c71990078 100644
--- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
+++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
@@ -26,6 +26,10 @@
 #define BATCH_SIZE             32
 #define BLOCK_CNT_THRESHOLD    10
 #define ETH_EVENT_BUFFER_SIZE  (4*BATCH_SIZE)
+#define MAX_VECTOR_SIZE                1024
+#define MIN_VECTOR_SIZE                4
+#define MAX_VECTOR_NS          1E9
+#define MIN_VECTOR_NS          1E5
 
 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN        32
 #define ETH_RX_ADAPTER_MEM_NAME_LEN    32
@@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
        uint16_t eth_rx_qid;
 };
 
+struct eth_rx_vector_data {
+       TAILQ_ENTRY(eth_rx_vector_data) next;
+       uint16_t port;
+       uint16_t queue;
+       uint16_t max_vector_count;
+       uint64_t event;
+       uint64_t ts;
+       uint64_t vector_timeout_ticks;
+       struct rte_mempool *vector_pool;
+       struct rte_event_vector *vector_ev;
+} __rte_cache_aligned;
+
+TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
+
 /* Instance per adapter */
 struct rte_eth_event_enqueue_buffer {
        /* Count of events in this buffer */
@@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
        uint32_t wrr_pos;
        /* Event burst buffer */
        struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
+       /* Vector enable flag */
+       uint8_t ena_vector;
+       /* Timestamp of previous vector expiry list traversal */
+       uint64_t prev_expiry_ts;
+       /* Minimum ticks to wait before traversing expiry list */
+       uint64_t vector_tmo_ticks;
+       /* vector list */
+       struct eth_rx_vector_data_list vector_list;
        /* Per adapter stats */
        struct rte_event_eth_rx_adapter_stats stats;
        /* Block count, counts up to BLOCK_CNT_THRESHOLD */
@@ -198,9 +224,11 @@ struct eth_device_info {
 struct eth_rx_queue_info {
        int queue_enabled;      /* True if added */
        int intr_enabled;
+       uint8_t ena_vector;
        uint16_t wt;            /* Polling weight */
        uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
        uint64_t event;
+       struct eth_rx_vector_data vector_data;
 };
 
 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
@@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter 
*rx_adapter)
            &rx_adapter->event_enqueue_buffer;
        struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
 
+       if (!buf->count)
+               return 0;
+
        uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
                                        rx_adapter->event_port_id,
                                        buf->events,
@@ -742,6 +773,72 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter 
*rx_adapter)
        return n;
 }
 
+static inline uint16_t
+rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
+                       struct eth_rx_queue_info *queue_info,
+                       struct rte_eth_event_enqueue_buffer *buf,
+                       struct rte_mbuf **mbufs, uint16_t num)
+{
+       struct rte_event *ev = &buf->events[buf->count];
+       struct eth_rx_vector_data *vec;
+       uint16_t filled, space, sz;
+
+       filled = 0;
+       vec = &queue_info->vector_data;
+       while (num) {
+               if (vec->vector_ev == NULL) {
+                       if (rte_mempool_get(vec->vector_pool,
+                                           (void **)&vec->vector_ev) < 0) {
+                               rte_pktmbuf_free_bulk(mbufs, num);
+                               return 0;
+                       }
+                       vec->vector_ev->nb_elem = 0;
+                       vec->vector_ev->port = vec->port;
+                       vec->vector_ev->queue = vec->queue;
+                       vec->vector_ev->attr_valid = true;
+                       TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+               } else if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+                       /* Event ready. */
+                       ev->event = vec->event;
+                       ev->vec = vec->vector_ev;
+                       ev++;
+                       filled++;
+                       vec->vector_ev = NULL;
+                       TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+                       if (rte_mempool_get(vec->vector_pool,
+                                           (void **)&vec->vector_ev) < 0) {
+                               rte_pktmbuf_free_bulk(mbufs, num);
+                               return 0;
+                       }
+                       vec->vector_ev->nb_elem = 0;
+                       vec->vector_ev->port = vec->port;
+                       vec->vector_ev->queue = vec->queue;
+                       vec->vector_ev->attr_valid = true;
+                       TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
+               }
+
+               space = vec->max_vector_count - vec->vector_ev->nb_elem;
+               sz = num > space ? space : num;
+               memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
+                      sizeof(void *) * sz);
+               vec->vector_ev->nb_elem += sz;
+               num -= sz;
+               mbufs += sz;
+               vec->ts = rte_rdtsc();
+       }
+
+       if (vec->vector_ev->nb_elem == vec->max_vector_count) {
+               ev->event = vec->event;
+               ev->vec = vec->vector_ev;
+               ev++;
+               filled++;
+               vec->vector_ev = NULL;
+               TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
+       }
+
+       return filled;
+}
+
 static inline void
 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
                uint16_t eth_dev_id,
@@ -770,25 +867,30 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter 
*rx_adapter,
        rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
        do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
 
-       for (i = 0; i < num; i++) {
-               m = mbufs[i];
-
-               rss = do_rss ?
-                       rxa_do_softrss(m, rx_adapter->rss_key_be) :
-                       m->hash.rss;
-               ev->event = event;
-               ev->flow_id = (rss & ~flow_id_mask) |
-                               (ev->flow_id & flow_id_mask);
-               ev->mbuf = m;
-               ev++;
+       if (!eth_rx_queue_info->ena_vector) {
+               for (i = 0; i < num; i++) {
+                       m = mbufs[i];
+
+                       rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
+                                    : m->hash.rss;
+                       ev->event = event;
+                       ev->flow_id = (rss & ~flow_id_mask) |
+                                     (ev->flow_id & flow_id_mask);
+                       ev->mbuf = m;
+                       ev++;
+               }
+       } else {
+               num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
+                                             buf, mbufs, num);
        }
 
-       if (dev_info->cb_fn) {
+       if (num && dev_info->cb_fn) {
 
                dropped = 0;
                nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-                                       ETH_EVENT_BUFFER_SIZE, buf->count, ev,
-                                       num, dev_info->cb_arg, &dropped);
+                                       ETH_EVENT_BUFFER_SIZE, buf->count,
+                                       &buf->events[buf->count], num,
+                                       dev_info->cb_arg, &dropped);
                if (unlikely(nb_cb > num))
                        RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
                                nb_cb, num);
@@ -1124,6 +1226,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
        return nb_rx;
 }
 
+static void
+rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
+{
+       struct rte_event_eth_rx_adapter *rx_adapter = arg;
+       struct rte_eth_event_enqueue_buffer *buf =
+               &rx_adapter->event_enqueue_buffer;
+       struct rte_event *ev;
+
+       if (buf->count)
+               rxa_flush_event_buffer(rx_adapter);
+
+       if (vec->vector_ev->nb_elem == 0)
+               return;
+       ev = &buf->events[buf->count];
+
+       /* Event ready. */
+       ev->event = vec->event;
+       ev->vec = vec->vector_ev;
+       buf->count++;
+
+       vec->vector_ev = NULL;
+       vec->ts = 0;
+}
+
 static int
 rxa_service_func(void *args)
 {
@@ -1137,6 +1263,24 @@ rxa_service_func(void *args)
                return 0;
        }
 
+       if (rx_adapter->ena_vector) {
+               if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
+                   rx_adapter->vector_tmo_ticks) {
+                       struct eth_rx_vector_data *vec;
+
+                       TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
+                               uint64_t elapsed_time = rte_rdtsc() - vec->ts;
+
+                               if (elapsed_time >= vec->vector_timeout_ticks) {
+                                       rxa_vector_expire(vec, rx_adapter);
+                                       TAILQ_REMOVE(&rx_adapter->vector_list,
+                                                    vec, next);
+                               }
+                       }
+                       rx_adapter->prev_expiry_ts = rte_rdtsc();
+               }
+       }
+
        stats = &rx_adapter->stats;
        stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
        stats->rx_packets += rxa_poll(rx_adapter);
@@ -1640,6 +1784,28 @@ rxa_update_queue(struct rte_event_eth_rx_adapter 
*rx_adapter,
        }
 }
 
+static void
+rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t 
vector_count,
+                   uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
+                   uint16_t port_id)
+{
+#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
+       struct eth_rx_vector_data *vector_data;
+       uint32_t flow_id;
+
+       vector_data = &queue_info->vector_data;
+       vector_data->max_vector_count = vector_count;
+       vector_data->port = port_id;
+       vector_data->queue = qid;
+       vector_data->vector_pool = mp;
+       vector_data->vector_timeout_ticks =
+               NSEC2TICK(vector_ns, rte_get_timer_hz());
+       vector_data->ts = 0;
+       flow_id = queue_info->event & 0xFFFFF;
+       flow_id = flow_id == 0 ? (qid & 0xFF) | (port_id & 0xFFFF) : flow_id;
+       vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
+}
+
 static void
 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
        struct eth_device_info *dev_info,
@@ -1741,6 +1907,44 @@ rxa_add_queue(struct rte_event_eth_rx_adapter 
*rx_adapter,
        }
 }
 
+static void
+rxa_sw_event_vector_configure(
+       struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
+       int rx_queue_id,
+       const struct rte_event_eth_rx_adapter_event_vector_config *config)
+{
+       struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
+       struct eth_rx_queue_info *queue_info;
+       struct rte_event *qi_ev;
+
+       if (rx_queue_id == -1) {
+               uint16_t nb_rx_queues;
+               uint16_t i;
+
+               nb_rx_queues = dev_info->dev->data->nb_rx_queues;
+               for (i = 0; i < nb_rx_queues; i++)
+                       rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
+                                                     config);
+               return;
+       }
+
+       queue_info = &dev_info->rx_queue[rx_queue_id];
+       qi_ev = (struct rte_event *)&queue_info->event;
+       queue_info->ena_vector = 1;
+       qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
+       rxa_set_vector_data(queue_info, config->vector_sz,
+                           config->vector_timeout_ns, config->vector_mp,
+                           rx_queue_id, dev_info->dev->data->port_id);
+       rx_adapter->ena_vector = 1;
+       rx_adapter->vector_tmo_ticks =
+               rx_adapter->vector_tmo_ticks ?
+                             RTE_MIN(config->vector_timeout_ns << 1,
+                                     rx_adapter->vector_tmo_ticks) :
+                             config->vector_timeout_ns << 1;
+       rx_adapter->prev_expiry_ts = 0;
+       TAILQ_INIT(&rx_adapter->vector_list);
+}
+
 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
                uint16_t eth_dev_id,
                int rx_queue_id,
@@ -2081,6 +2285,15 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
                return -EINVAL;
        }
 
+       if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
+           (queue_conf->rx_queue_flags &
+            RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
+               RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
+                                " eth port: %" PRIu16 " adapter id: %" PRIu8,
+                                eth_dev_id, id);
+               return -EINVAL;
+       }
+
        if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
                (rx_queue_id != -1)) {
                RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
@@ -2143,6 +2356,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
        return 0;
 }
 
+static int
+rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
+{
+       limits->max_sz = MAX_VECTOR_SIZE;
+       limits->min_sz = MIN_VECTOR_SIZE;
+       limits->max_timeout_ns = MAX_VECTOR_NS;
+       limits->min_timeout_ns = MIN_VECTOR_NS;
+
+       return 0;
+}
+
 int
 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
                                int32_t rx_queue_id)
@@ -2333,7 +2557,8 @@ rte_event_eth_rx_adapter_queue_event_vector_config(
                ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
                        dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
        } else {
-               ret = -ENOTSUP;
+               rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
+                                             rx_queue_id, config);
        }
 
        return ret;
@@ -2371,7 +2596,7 @@ rte_event_eth_rx_adapter_vector_limits_get(
                ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
                        dev, &rte_eth_devices[eth_port_id], limits);
        } else {
-               ret = -ENOTSUP;
+               ret = rxa_sw_vector_limits(limits);
        }
 
        return ret;
diff --git a/lib/librte_eventdev/rte_eventdev.c 
b/lib/librte_eventdev/rte_eventdev.c
index be0499c52..62824654b 100644
--- a/lib/librte_eventdev/rte_eventdev.c
+++ b/lib/librte_eventdev/rte_eventdev.c
@@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, uint16_t 
eth_port_id,
 
        if (caps == NULL)
                return -EINVAL;
-       *caps = 0;
+
+       if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
+               *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
+       else
+               *caps = 0;
 
        return dev->dev_ops->eth_rx_adapter_caps_get ?
                                (*dev->dev_ops->eth_rx_adapter_caps_get)(dev,
-- 
2.17.1

Reply via email to