> -----Original Message-----
> From: pbhagavat...@marvell.com <pbhagavat...@marvell.com>
> Sent: Tuesday, March 30, 2021 1:52 PM
> To: jer...@marvell.com; Jayatheerthan, Jay <jay.jayatheert...@intel.com>; 
> Carrillo, Erik G <erik.g.carri...@intel.com>; Gujjar,
> Abhinandan S <abhinandan.guj...@intel.com>; McDaniel, Timothy 
> <timothy.mcdan...@intel.com>; hemant.agra...@nxp.com; Van
> Haaren, Harry <harry.van.haa...@intel.com>; mattias.ronnblom 
> <mattias.ronnb...@ericsson.com>; Ma, Liang J
> <liang.j...@intel.com>
> Cc: dev@dpdk.org; Pavan Nikhilesh <pbhagavat...@marvell.com>
> Subject: [dpdk-dev] [PATCH v9 4/8] eventdev: add Rx adapter event vector 
> support
> 
> From: Pavan Nikhilesh <pbhagavat...@marvell.com>
> 
> Add event vector support for event eth Rx adapter, the implementation
> creates vector flows based on port and queue identifier of the received
> mbufs.
> The flow id for SW Rx event vectorization will use 12-bits of queue
> identifier and 8-bits port identifier when custom flow id is not set
> for simplicity.
> 
> Signed-off-by: Pavan Nikhilesh <pbhagavat...@marvell.com>
> ---
>  .../prog_guide/event_ethernet_rx_adapter.rst  |  11 +
>  lib/librte_eventdev/eventdev_pmd.h            |   7 +-
>  .../rte_event_eth_rx_adapter.c                | 278 ++++++++++++++++--
>  lib/librte_eventdev/rte_eventdev.c            |   6 +-
>  4 files changed, 278 insertions(+), 24 deletions(-)
> 
> diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst 
> b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
> index 5eefef355..06fa864fa 100644
> --- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
> +++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst
> @@ -224,3 +224,14 @@ A loop processing ``rte_event_vector`` containing mbufs 
> is shown below.
>          case ...
>          ...
>          }
> +
> +Rx event vectorization for SW Rx adapter
> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> +
> +For SW based event vectorization, i.e., when the
> +``RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT`` is not set in the adapter's
> +capabilities flags for a particular ethernet device, the service function
> +creates a single event vector flow for all the mbufs arriving on the given
> +Rx queue.
> +The 20-bit event flow identifier is set to 12-bits of Rx queue identifier
> +and 8-bits of ethernet device identifier.
> diff --git a/lib/librte_eventdev/eventdev_pmd.h 
> b/lib/librte_eventdev/eventdev_pmd.h
> index 9297f1433..0f724ac85 100644
> --- a/lib/librte_eventdev/eventdev_pmd.h
> +++ b/lib/librte_eventdev/eventdev_pmd.h
> @@ -69,9 +69,10 @@ extern "C" {
>       } \
>  } while (0)
> 
> -#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP \
> -             ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) | \
> -                     (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ))
> +#define RTE_EVENT_ETH_RX_ADAPTER_SW_CAP                                      
>   \
> +     ((RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) |                     \
> +      (RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) |                         \
> +      (RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR))
> 
>  #define RTE_EVENT_CRYPTO_ADAPTER_SW_CAP \
>               RTE_EVENT_CRYPTO_ADAPTER_CAP_SESSION_PRIVATE_DATA
> diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c 
> b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> index ac8ba5bf0..e273b3acf 100644
> --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c
> @@ -26,6 +26,10 @@
>  #define BATCH_SIZE           32
>  #define BLOCK_CNT_THRESHOLD  10
>  #define ETH_EVENT_BUFFER_SIZE        (4*BATCH_SIZE)
> +#define MAX_VECTOR_SIZE              1024
> +#define MIN_VECTOR_SIZE              4
> +#define MAX_VECTOR_NS                1E9
> +#define MIN_VECTOR_NS                1E5
> 
>  #define ETH_RX_ADAPTER_SERVICE_NAME_LEN      32
>  #define ETH_RX_ADAPTER_MEM_NAME_LEN  32
> @@ -59,6 +63,20 @@ struct eth_rx_poll_entry {
>       uint16_t eth_rx_qid;
>  };
> 
> +struct eth_rx_vector_data {
> +     TAILQ_ENTRY(eth_rx_vector_data) next;
> +     uint16_t port;
> +     uint16_t queue;
> +     uint16_t max_vector_count;
> +     uint64_t event;
> +     uint64_t ts;
> +     uint64_t vector_timeout_ticks;
> +     struct rte_mempool *vector_pool;
> +     struct rte_event_vector *vector_ev;
> +} __rte_cache_aligned;
> +
> +TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
> +
>  /* Instance per adapter */
>  struct rte_eth_event_enqueue_buffer {
>       /* Count of events in this buffer */
> @@ -92,6 +110,14 @@ struct rte_event_eth_rx_adapter {
>       uint32_t wrr_pos;
>       /* Event burst buffer */
>       struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
> +     /* Vector enable flag */
> +     uint8_t ena_vector;
> +     /* Timestamp of previous vector expiry list traversal */
> +     uint64_t prev_expiry_ts;
> +     /* Minimum ticks to wait before traversing expiry list */
> +     uint64_t vector_tmo_ticks;
> +     /* vector list */
> +     struct eth_rx_vector_data_list vector_list;
>       /* Per adapter stats */
>       struct rte_event_eth_rx_adapter_stats stats;
>       /* Block count, counts up to BLOCK_CNT_THRESHOLD */
> @@ -198,9 +224,11 @@ struct eth_device_info {
>  struct eth_rx_queue_info {
>       int queue_enabled;      /* True if added */
>       int intr_enabled;
> +     uint8_t ena_vector;
>       uint16_t wt;            /* Polling weight */
>       uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
>       uint64_t event;
> +     struct eth_rx_vector_data vector_data;
>  };
> 
>  static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
> @@ -722,6 +750,9 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter 
> *rx_adapter)
>           &rx_adapter->event_enqueue_buffer;
>       struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
> 
> +     if (!buf->count)
> +             return 0;
> +
>       uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
>                                       rx_adapter->event_port_id,
>                                       buf->events,
> @@ -742,6 +773,77 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter 
> *rx_adapter)
>       return n;
>  }
> 
> +static inline void
> +rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
> +             struct eth_rx_vector_data *vec)
> +{
> +     vec->vector_ev->nb_elem = 0;
> +     vec->vector_ev->port = vec->port;
> +     vec->vector_ev->queue = vec->queue;
> +     vec->vector_ev->attr_valid = true;
> +     TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
> +}
> +
> +static inline uint16_t
> +rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
> +                     struct eth_rx_queue_info *queue_info,
> +                     struct rte_eth_event_enqueue_buffer *buf,
> +                     struct rte_mbuf **mbufs, uint16_t num)
> +{
> +     struct rte_event *ev = &buf->events[buf->count];
> +     struct eth_rx_vector_data *vec;
> +     uint16_t filled, space, sz;
> +
> +     filled = 0;
> +     vec = &queue_info->vector_data;
> +
> +     if (vec->vector_ev == NULL) {
> +             if (rte_mempool_get(vec->vector_pool,
> +                                 (void **)&vec->vector_ev) < 0) {
> +                     rte_pktmbuf_free_bulk(mbufs, num);
> +                     return 0;
> +             }
> +             rxa_init_vector(rx_adapter, vec);
> +     }
> +     while (num) {
> +             if (vec->vector_ev->nb_elem == vec->max_vector_count) {
> +                     /* Event ready. */
> +                     ev->event = vec->event;
> +                     ev->vec = vec->vector_ev;
> +                     ev++;
> +                     filled++;
> +                     vec->vector_ev = NULL;
> +                     TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> +                     if (rte_mempool_get(vec->vector_pool,
> +                                         (void **)&vec->vector_ev) < 0) {
> +                             rte_pktmbuf_free_bulk(mbufs, num);
> +                             return 0;
> +                     }
> +                     rxa_init_vector(rx_adapter, vec);
> +             }
> +
> +             space = vec->max_vector_count - vec->vector_ev->nb_elem;
> +             sz = num > space ? space : num;
> +             memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
> +                    sizeof(void *) * sz);
> +             vec->vector_ev->nb_elem += sz;
> +             num -= sz;
> +             mbufs += sz;
> +             vec->ts = rte_rdtsc();
> +     }
> +
> +     if (vec->vector_ev->nb_elem == vec->max_vector_count) {
> +             ev->event = vec->event;
> +             ev->vec = vec->vector_ev;
> +             ev++;
> +             filled++;
> +             vec->vector_ev = NULL;
> +             TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> +     }
> +
> +     return filled;
> +}
> +
>  static inline void
>  rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
>               uint16_t eth_dev_id,
> @@ -766,29 +868,33 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter 
> *rx_adapter,
>       uint16_t nb_cb;
>       uint16_t dropped;
> 
> -     /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
> -     rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> -     do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> -
> -     for (i = 0; i < num; i++) {
> -             m = mbufs[i];
> -
> -             rss = do_rss ?
> -                     rxa_do_softrss(m, rx_adapter->rss_key_be) :
> -                     m->hash.rss;
> -             ev->event = event;
> -             ev->flow_id = (rss & ~flow_id_mask) |
> -                             (ev->flow_id & flow_id_mask);
> -             ev->mbuf = m;
> -             ev++;
> +     if (!eth_rx_queue_info->ena_vector) {
> +             /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
> +             rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
> +             do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
> +             for (i = 0; i < num; i++) {
> +                     m = mbufs[i];
> +
> +                     rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
> +                                  : m->hash.rss;
> +                     ev->event = event;
> +                     ev->flow_id = (rss & ~flow_id_mask) |
> +                                   (ev->flow_id & flow_id_mask);
> +                     ev->mbuf = m;
> +                     ev++;
> +             }
> +     } else {
> +             num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
> +                                           buf, mbufs, num);
>       }
> 
> -     if (dev_info->cb_fn) {
> +     if (num && dev_info->cb_fn) {
> 
>               dropped = 0;
>               nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
> -                                     ETH_EVENT_BUFFER_SIZE, buf->count, ev,
> -                                     num, dev_info->cb_arg, &dropped);
> +                                     ETH_EVENT_BUFFER_SIZE, buf->count,
> +                                     &buf->events[buf->count], num,
> +                                     dev_info->cb_arg, &dropped);
>               if (unlikely(nb_cb > num))
>                       RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
>                               nb_cb, num);
> @@ -1124,6 +1230,30 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
>       return nb_rx;
>  }
> 
> +static void
> +rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
> +{
> +     struct rte_event_eth_rx_adapter *rx_adapter = arg;
> +     struct rte_eth_event_enqueue_buffer *buf =
> +             &rx_adapter->event_enqueue_buffer;
> +     struct rte_event *ev;
> +
> +     if (buf->count)
> +             rxa_flush_event_buffer(rx_adapter);
> +
> +     if (vec->vector_ev->nb_elem == 0)
> +             return;
> +     ev = &buf->events[buf->count];
> +
> +     /* Event ready. */
> +     ev->event = vec->event;
> +     ev->vec = vec->vector_ev;
> +     buf->count++;
> +
> +     vec->vector_ev = NULL;
> +     vec->ts = 0;
> +}
> +
>  static int
>  rxa_service_func(void *args)
>  {
> @@ -1137,6 +1267,24 @@ rxa_service_func(void *args)
>               return 0;
>       }
> 
> +     if (rx_adapter->ena_vector) {
> +             if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
> +                 rx_adapter->vector_tmo_ticks) {
> +                     struct eth_rx_vector_data *vec;
> +
> +                     TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
> +                             uint64_t elapsed_time = rte_rdtsc() - vec->ts;
> +
> +                             if (elapsed_time >= vec->vector_timeout_ticks) {
> +                                     rxa_vector_expire(vec, rx_adapter);
> +                                     TAILQ_REMOVE(&rx_adapter->vector_list,
> +                                                  vec, next);
> +                             }
> +                     }
> +                     rx_adapter->prev_expiry_ts = rte_rdtsc();
> +             }
> +     }
> +
>       stats = &rx_adapter->stats;
>       stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
>       stats->rx_packets += rxa_poll(rx_adapter);
> @@ -1640,11 +1788,35 @@ rxa_update_queue(struct rte_event_eth_rx_adapter 
> *rx_adapter,
>       }
>  }
> 
> +static void
> +rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t 
> vector_count,
> +                 uint64_t vector_ns, struct rte_mempool *mp, int32_t qid,
> +                 uint16_t port_id)
> +{
> +#define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
> +     struct eth_rx_vector_data *vector_data;
> +     uint32_t flow_id;
> +
> +     vector_data = &queue_info->vector_data;
> +     vector_data->max_vector_count = vector_count;
> +     vector_data->port = port_id;
> +     vector_data->queue = qid;
> +     vector_data->vector_pool = mp;
> +     vector_data->vector_timeout_ticks =
> +             NSEC2TICK(vector_ns, rte_get_timer_hz());
> +     vector_data->ts = 0;
> +     flow_id = queue_info->event & 0xFFFFF;
> +     flow_id =
> +             flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
> +     vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
> +}
> +
>  static void
>  rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>       struct eth_device_info *dev_info,
>       int32_t rx_queue_id)
>  {
> +     struct eth_rx_vector_data *vec;
>       int pollq;
>       int intrq;
>       int sintrq;
> @@ -1663,6 +1835,14 @@ rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
>               return;
>       }
> 
> +     /* Push all the partial event vectors to event device. */
> +     TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
> +             if (vec->queue != rx_queue_id)
> +                     continue;
> +             rxa_vector_expire(vec, rx_adapter);
> +             TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
> +     }
> +

We are doing packet related activity (rxa_flush_event_buffer()) outside of 
rxa_service_func() although it wouldn't be running since queue del code has the 
lock. It would also be done in the context of a control thread. I don't know if 
there is a precedence for this. What do you think of just freeing the vector 
data and mbufs ?

>       pollq = rxa_polled_queue(dev_info, rx_queue_id);
>       intrq = rxa_intr_queue(dev_info, rx_queue_id);
>       sintrq = rxa_shared_intr(dev_info, rx_queue_id);
> @@ -1741,6 +1921,42 @@ rxa_add_queue(struct rte_event_eth_rx_adapter 
> *rx_adapter,
>       }
>  }
> 
> +static void
> +rxa_sw_event_vector_configure(
> +     struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
> +     int rx_queue_id,
> +     const struct rte_event_eth_rx_adapter_event_vector_config *config)
> +{
> +     struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
> +     struct eth_rx_queue_info *queue_info;
> +     struct rte_event *qi_ev;
> +
> +     if (rx_queue_id == -1) {
> +             uint16_t nb_rx_queues;
> +             uint16_t i;
> +
> +             nb_rx_queues = dev_info->dev->data->nb_rx_queues;
> +             for (i = 0; i < nb_rx_queues; i++)
> +                     rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
> +                                                   config);
> +             return;
> +     }
> +
> +     queue_info = &dev_info->rx_queue[rx_queue_id];
> +     qi_ev = (struct rte_event *)&queue_info->event;
> +     queue_info->ena_vector = 1;
> +     qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
> +     rxa_set_vector_data(queue_info, config->vector_sz,
> +                         config->vector_timeout_ns, config->vector_mp,
> +                         rx_queue_id, dev_info->dev->data->port_id);
> +     rx_adapter->ena_vector = 1;
> +     rx_adapter->vector_tmo_ticks =
> +             rx_adapter->vector_tmo_ticks ?
> +                           RTE_MIN(config->vector_timeout_ns >> 1,
> +                             rx_adapter->vector_tmo_ticks) :
> +                           config->vector_timeout_ns >> 1;
> +}
> +
>  static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
>               uint16_t eth_dev_id,
>               int rx_queue_id,
> @@ -1967,6 +2183,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t 
> dev_id,
>       rx_adapter->conf_cb = conf_cb;
>       rx_adapter->conf_arg = conf_arg;
>       rx_adapter->id = id;
> +     TAILQ_INIT(&rx_adapter->vector_list);
>       strcpy(rx_adapter->mem_name, mem_name);
>       rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
>                                       RTE_MAX_ETHPORTS *
> @@ -2081,6 +2298,15 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
>               return -EINVAL;
>       }
> 
> +     if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
> +         (queue_conf->rx_queue_flags &
> +          RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
> +             RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
> +                              " eth port: %" PRIu16 " adapter id: %" PRIu8,
> +                              eth_dev_id, id);
> +             return -EINVAL;
> +     }
> +
>       if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
>               (rx_queue_id != -1)) {
>               RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
> @@ -2143,6 +2369,17 @@ rte_event_eth_rx_adapter_queue_add(uint8_t id,
>       return 0;
>  }
> 
> +static int
> +rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
> +{
> +     limits->max_sz = MAX_VECTOR_SIZE;
> +     limits->min_sz = MIN_VECTOR_SIZE;
> +     limits->max_timeout_ns = MAX_VECTOR_NS;
> +     limits->min_timeout_ns = MIN_VECTOR_NS;
> +
> +     return 0;
> +}
> +
>  int
>  rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
>                               int32_t rx_queue_id)
> @@ -2333,7 +2570,8 @@ rte_event_eth_rx_adapter_queue_event_vector_config(
>               ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
>                       dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
>       } else {
> -             ret = -ENOTSUP;
> +             rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
> +                                           rx_queue_id, config);
>       }
> 
>       return ret;
> @@ -2371,7 +2609,7 @@ rte_event_eth_rx_adapter_vector_limits_get(
>               ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
>                       dev, &rte_eth_devices[eth_port_id], limits);
>       } else {
> -             ret = -ENOTSUP;
> +             ret = rxa_sw_vector_limits(limits);
>       }
> 
>       return ret;
> diff --git a/lib/librte_eventdev/rte_eventdev.c 
> b/lib/librte_eventdev/rte_eventdev.c
> index be0499c52..62824654b 100644
> --- a/lib/librte_eventdev/rte_eventdev.c
> +++ b/lib/librte_eventdev/rte_eventdev.c
> @@ -122,7 +122,11 @@ rte_event_eth_rx_adapter_caps_get(uint8_t dev_id, 
> uint16_t eth_port_id,
> 
>       if (caps == NULL)
>               return -EINVAL;
> -     *caps = 0;
> +
> +     if (dev->dev_ops->eth_rx_adapter_caps_get == NULL)
> +             *caps = RTE_EVENT_ETH_RX_ADAPTER_SW_CAP;
> +     else
> +             *caps = 0;
> 
>       return dev->dev_ops->eth_rx_adapter_caps_get ?
>                               (*dev->dev_ops->eth_rx_adapter_caps_get)(dev,
> --
> 2.17.1

Reply via email to