From: Pavan Nikhilesh <pbhagavat...@marvell.com>

Add event vector support in pipeline tests. By default this mode
is disabled, it can be enabled by using the option --enable_vector.
example:
        dpdk-test-eventdev -l 7-23 -s 0xff00 -- --prod_type_ethdev
        --nb_pkts=0 --verbose 2 --test=pipeline_atq --stlist=a
        --wlcores=20-23  --enable_vector

Addtional options to configure vector size and vector timeout are
also implemented and can be used by specifying --vector_size and
--vector_tmo_ns

This patch also adds a new option to set the number of Rx queues
configured per event eth rx adapter.
example:
        dpdk-test-eventdev -l 7-23 -s 0xff00 -- --prod_type_ethdev
        --nb_pkts=0 --verbose 2 --test=pipeline_atq --stlist=a
        --wlcores=20-23  --nb_eth_queues 4

Signed-off-by: Pavan Nikhilesh <pbhagavat...@marvell.com>
---
 app/test-eventdev/evt_common.h           |   4 +
 app/test-eventdev/evt_options.c          |  52 ++++
 app/test-eventdev/evt_options.h          |   4 +
 app/test-eventdev/test_pipeline_atq.c    | 310 ++++++++++++++++++++--
 app/test-eventdev/test_pipeline_common.c | 113 +++++++-
 app/test-eventdev/test_pipeline_common.h |  18 ++
 app/test-eventdev/test_pipeline_queue.c  | 320 +++++++++++++++++++++--
 doc/guides/tools/testeventdev.rst        |  28 ++
 8 files changed, 795 insertions(+), 54 deletions(-)

diff --git a/app/test-eventdev/evt_common.h b/app/test-eventdev/evt_common.h
index a1da1cf11..0e228258e 100644
--- a/app/test-eventdev/evt_common.h
+++ b/app/test-eventdev/evt_common.h
@@ -58,16 +58,20 @@ struct evt_options {
        uint8_t sched_type_list[EVT_MAX_STAGES];
        uint16_t mbuf_sz;
        uint16_t wkr_deq_dep;
+       uint16_t vector_size;
+       uint16_t eth_queues;
        uint32_t nb_flows;
        uint32_t tx_first;
        uint32_t max_pkt_sz;
        uint32_t deq_tmo_nsec;
        uint32_t q_priority:1;
        uint32_t fwd_latency:1;
+       uint32_t ena_vector : 1;
        uint64_t nb_pkts;
        uint64_t nb_timers;
        uint64_t expiry_nsec;
        uint64_t max_tmo_nsec;
+       uint64_t vector_tmo_nsec;
        uint64_t timer_tick_nsec;
        uint64_t optm_timer_tick_nsec;
        enum evt_prod_type prod_type;
diff --git a/app/test-eventdev/evt_options.c b/app/test-eventdev/evt_options.c
index 0d04ea9f8..0d5540574 100644
--- a/app/test-eventdev/evt_options.c
+++ b/app/test-eventdev/evt_options.c
@@ -34,6 +34,9 @@ evt_options_default(struct evt_options *opt)
        opt->max_tmo_nsec = 1E5;  /* 100000ns ~100us */
        opt->expiry_nsec = 1E4;   /* 10000ns ~10us */
        opt->prod_type = EVT_PROD_TYPE_SYNT;
+       opt->eth_queues = 1;
+       opt->vector_size = 64;
+       opt->vector_tmo_nsec = 100E3;
 }
 
 typedef int (*option_parser_t)(struct evt_options *opt,
@@ -257,6 +260,43 @@ evt_parse_max_pkt_sz(struct evt_options *opt, const char 
*arg)
        return ret;
 }
 
+static int
+evt_parse_ena_vector(struct evt_options *opt, const char *arg __rte_unused)
+{
+       opt->ena_vector = 1;
+       return 0;
+}
+
+static int
+evt_parse_vector_size(struct evt_options *opt, const char *arg)
+{
+       int ret;
+
+       ret = parser_read_uint16(&(opt->vector_size), arg);
+
+       return ret;
+}
+
+static int
+evt_parse_vector_tmo_ns(struct evt_options *opt, const char *arg)
+{
+       int ret;
+
+       ret = parser_read_uint64(&(opt->vector_tmo_nsec), arg);
+
+       return ret;
+}
+
+static int
+evt_parse_eth_queues(struct evt_options *opt, const char *arg)
+{
+       int ret;
+
+       ret = parser_read_uint16(&(opt->eth_queues), arg);
+
+       return ret;
+}
+
 static void
 usage(char *program)
 {
@@ -289,6 +329,10 @@ usage(char *program)
                "\t--expiry_nsec      : event timer expiry ns.\n"
                "\t--mbuf_sz          : packet mbuf size.\n"
                "\t--max_pkt_sz       : max packet size.\n"
+               "\t--nb_eth_queues    : number of ethernet Rx queues.\n"
+               "\t--enable_vector    : enable event vectorization.\n"
+               "\t--vector_size      : Max vector size.\n"
+               "\t--vector_tmo_ns    : Max vector timeout in nanoseconds\n"
                );
        printf("available tests:\n");
        evt_test_dump_names();
@@ -360,6 +404,10 @@ static struct option lgopts[] = {
        { EVT_EXPIRY_NSEC,         1, 0, 0 },
        { EVT_MBUF_SZ,             1, 0, 0 },
        { EVT_MAX_PKT_SZ,          1, 0, 0 },
+       { EVT_NB_ETH_QUEUES,       1, 0, 0 },
+       { EVT_ENA_VECTOR,          0, 0, 0 },
+       { EVT_VECTOR_SZ,           1, 0, 0 },
+       { EVT_VECTOR_TMO,          1, 0, 0 },
        { EVT_HELP,                0, 0, 0 },
        { NULL,                    0, 0, 0 }
 };
@@ -394,6 +442,10 @@ evt_opts_parse_long(int opt_idx, struct evt_options *opt)
                { EVT_EXPIRY_NSEC, evt_parse_expiry_nsec},
                { EVT_MBUF_SZ, evt_parse_mbuf_sz},
                { EVT_MAX_PKT_SZ, evt_parse_max_pkt_sz},
+               { EVT_NB_ETH_QUEUES, evt_parse_eth_queues},
+               { EVT_ENA_VECTOR, evt_parse_ena_vector},
+               { EVT_VECTOR_SZ, evt_parse_vector_size},
+               { EVT_VECTOR_TMO, evt_parse_vector_tmo_ns},
        };
 
        for (i = 0; i < RTE_DIM(parsermap); i++) {
diff --git a/app/test-eventdev/evt_options.h b/app/test-eventdev/evt_options.h
index 748e54fae..1cea2a3e1 100644
--- a/app/test-eventdev/evt_options.h
+++ b/app/test-eventdev/evt_options.h
@@ -42,6 +42,10 @@
 #define EVT_EXPIRY_NSEC          ("expiry_nsec")
 #define EVT_MBUF_SZ              ("mbuf_sz")
 #define EVT_MAX_PKT_SZ           ("max_pkt_sz")
+#define EVT_NB_ETH_QUEUES        ("nb_eth_queues")
+#define EVT_ENA_VECTOR           ("enable_vector")
+#define EVT_VECTOR_SZ            ("vector_size")
+#define EVT_VECTOR_TMO           ("vector_tmo_ns")
 #define EVT_HELP                 ("help")
 
 void evt_options_default(struct evt_options *opt);
diff --git a/app/test-eventdev/test_pipeline_atq.c 
b/app/test-eventdev/test_pipeline_atq.c
index 0872b25b5..84dd4f44e 100644
--- a/app/test-eventdev/test_pipeline_atq.c
+++ b/app/test-eventdev/test_pipeline_atq.c
@@ -15,6 +15,8 @@ pipeline_atq_nb_event_queues(struct evt_options *opt)
        return rte_eth_dev_count_avail();
 }
 
+typedef int (*pipeline_atq_worker_t)(void *arg);
+
 static __rte_noinline int
 pipeline_atq_worker_single_stage_tx(void *arg)
 {
@@ -113,6 +115,112 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
        return 0;
 }
 
+static __rte_noinline int
+pipeline_atq_worker_single_stage_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+               vector_sz = ev.vec->nb_elem;
+               pipeline_event_tx_vector(dev, port, &ev);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               vector_sz = ev.vec->nb_elem;
+               ev.queue_id = tx_queue[ev.vec->port];
+               ev.vec->queue = 0;
+               pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+               pipeline_event_enqueue(dev, port, &ev);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+               vector_sz = 0;
+               for (i = 0; i < nb_rx; i++) {
+                       vector_sz += ev[i].vec->nb_elem;
+                       ev[i].vec->queue = 0;
+               }
+
+               pipeline_event_tx_burst(dev, port, ev, nb_rx);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               vector_sz = 0;
+               for (i = 0; i < nb_rx; i++) {
+                       ev[i].queue_id = tx_queue[ev[i].vec->port];
+                       ev[i].vec->queue = 0;
+                       vector_sz += ev[i].vec->nb_elem;
+                       pipeline_fwd_event_vector(&ev[i],
+                                                 RTE_SCHED_TYPE_ATOMIC);
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
+
 static __rte_noinline int
 pipeline_atq_worker_multi_stage_tx(void *arg)
 {
@@ -245,6 +353,147 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
        return 0;
 }
 
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               cq_id = ev.sub_event_type % nb_stages;
+
+               if (cq_id == last_queue) {
+                       vector_sz = ev.vec->nb_elem;
+                       pipeline_event_tx_vector(dev, port, &ev);
+                       w->processed_pkts += vector_sz;
+                       continue;
+               }
+
+               ev.sub_event_type++;
+               pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+               pipeline_event_enqueue(dev, port, &ev);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               cq_id = ev.sub_event_type % nb_stages;
+
+               if (cq_id == last_queue) {
+                       ev.queue_id = tx_queue[ev.vec->port];
+                       ev.vec->queue = 0;
+                       vector_sz = ev.vec->nb_elem;
+                       pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       pipeline_event_enqueue(dev, port, &ev);
+                       w->processed_pkts += vector_sz;
+               } else {
+                       ev.sub_event_type++;
+                       pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+                       pipeline_event_enqueue(dev, port, &ev);
+               }
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               for (i = 0; i < nb_rx; i++) {
+                       cq_id = ev[i].sub_event_type % nb_stages;
+
+                       if (cq_id == last_queue) {
+                               vector_sz = ev[i].vec->nb_elem;
+                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               w->processed_pkts += vector_sz;
+                               continue;
+                       }
+
+                       ev[i].sub_event_type++;
+                       pipeline_fwd_event_vector(&ev[i],
+                                                 sched_type_list[cq_id]);
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               for (i = 0; i < nb_rx; i++) {
+                       cq_id = ev[i].sub_event_type % nb_stages;
+
+                       if (cq_id == last_queue) {
+                               vector_sz = ev[i].vec->nb_elem;
+                               ev[i].queue_id = tx_queue[ev[i].vec->port];
+                               ev[i].vec->queue = 0;
+                               pipeline_fwd_event_vector(
+                                       &ev[i], RTE_SCHED_TYPE_ATOMIC);
+                               w->processed_pkts += vector_sz;
+                       } else {
+                               ev[i].sub_event_type++;
+                               pipeline_fwd_event_vector(
+                                       &ev[i], sched_type_list[cq_id]);
+                       }
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+       }
+
+       return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
@@ -253,27 +502,36 @@ worker_wrapper(void *arg)
        const bool burst = evt_has_burst_mode(w->dev_id);
        const bool internal_port = w->t->internal_port;
        const uint8_t nb_stages = opt->nb_stages;
-       RTE_SET_USED(opt);
+       /*vector/burst/internal_port*/
+       const pipeline_atq_worker_t
+       pipeline_atq_worker_single_stage[2][2][2] = {
+               [0][0][0] = pipeline_atq_worker_single_stage_fwd,
+               [0][0][1] = pipeline_atq_worker_single_stage_tx,
+               [0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
+               [0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
+               [1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
+               [1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
+               [1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
+               [1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
+       };
+       const pipeline_atq_worker_t
+       pipeline_atq_worker_multi_stage[2][2][2] = {
+               [0][0][0] = pipeline_atq_worker_multi_stage_fwd,
+               [0][0][1] = pipeline_atq_worker_multi_stage_tx,
+               [0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
+               [0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
+               [1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
+               [1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
+               [1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
+               [1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
+       };
 
-       if (nb_stages == 1) {
-               if (!burst && internal_port)
-                       return pipeline_atq_worker_single_stage_tx(arg);
-               else if (!burst && !internal_port)
-                       return pipeline_atq_worker_single_stage_fwd(arg);
-               else if (burst && internal_port)
-                       return pipeline_atq_worker_single_stage_burst_tx(arg);
-               else if (burst && !internal_port)
-                       return pipeline_atq_worker_single_stage_burst_fwd(arg);
-       } else {
-               if (!burst && internal_port)
-                       return pipeline_atq_worker_multi_stage_tx(arg);
-               else if (!burst && !internal_port)
-                       return pipeline_atq_worker_multi_stage_fwd(arg);
-               if (burst && internal_port)
-                       return pipeline_atq_worker_multi_stage_burst_tx(arg);
-               else if (burst && !internal_port)
-                       return pipeline_atq_worker_multi_stage_burst_fwd(arg);
-       }
+       if (nb_stages == 1)
+               return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
+                                                       [internal_port])(arg);
+       else
+               return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
+                                                      [internal_port])(arg);
 
        rte_panic("invalid worker\n");
 }
@@ -290,7 +548,7 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct 
evt_options *opt)
        int ret;
        int nb_ports;
        int nb_queues;
-       uint8_t queue;
+       uint8_t queue, is_prod;
        uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
        uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
        uint8_t nb_worker_queues = 0;
@@ -330,15 +588,19 @@ pipeline_atq_eventdev_setup(struct evt_test *test, struct 
evt_options *opt)
                q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
 
                if (!t->internal_port) {
+                       is_prod = false;
                        RTE_ETH_FOREACH_DEV(prod) {
                                if (queue == tx_evqueue_id[prod]) {
                                        q_conf.event_queue_cfg =
                                                RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
-                               } else {
-                                       queue_arr[nb_worker_queues] = queue;
-                                       nb_worker_queues++;
+                                       is_prod = true;
+                                       break;
                                }
                        }
+                       if (!is_prod) {
+                               queue_arr[nb_worker_queues] = queue;
+                               nb_worker_queues++;
+                       }
                }
 
                ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
diff --git a/app/test-eventdev/test_pipeline_common.c 
b/app/test-eventdev/test_pipeline_common.c
index b47d76743..d5ef90500 100644
--- a/app/test-eventdev/test_pipeline_common.c
+++ b/app/test-eventdev/test_pipeline_common.c
@@ -36,6 +36,12 @@ pipeline_opt_dump(struct evt_options *opt, uint8_t nb_queues)
        evt_dump_queue_priority(opt);
        evt_dump_sched_type_list(opt);
        evt_dump_producer_type(opt);
+       evt_dump("nb_eth_rx_queues", "%d", opt->eth_queues);
+       evt_dump("event_vector", "%d", opt->ena_vector);
+       if (opt->ena_vector) {
+               evt_dump("vector_size", "%d", opt->vector_size);
+               evt_dump("vector_tmo_ns", "%" PRIu64 "", opt->vector_tmo_nsec);
+       }
 }
 
 static inline uint64_t
@@ -163,7 +169,7 @@ pipeline_opt_check(struct evt_options *opt, uint64_t 
nb_queues)
 int
 pipeline_ethdev_setup(struct evt_test *test, struct evt_options *opt)
 {
-       uint16_t i;
+       uint16_t i, j;
        int ret;
        uint8_t nb_queues = 1;
        struct test_pipeline *t = evt_test_priv(test);
@@ -210,6 +216,16 @@ pipeline_ethdev_setup(struct evt_test *test, struct 
evt_options *opt)
                if (!(caps & RTE_EVENT_ETH_TX_ADAPTER_CAP_INTERNAL_PORT))
                        t->internal_port = 0;
 
+               ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id, i, &caps);
+               if (ret != 0) {
+                       evt_err("failed to get event tx adapter[%d] caps", i);
+                       return ret;
+               }
+
+               if (!(caps & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT))
+                       local_port_conf.rxmode.offloads |=
+                               DEV_RX_OFFLOAD_RSS_HASH;
+
                ret = rte_eth_dev_info_get(i, &dev_info);
                if (ret != 0) {
                        evt_err("Error during getting device (port %u) info: 
%s\n",
@@ -236,19 +252,22 @@ pipeline_ethdev_setup(struct evt_test *test, struct 
evt_options *opt)
                                local_port_conf.rx_adv_conf.rss_conf.rss_hf);
                }
 
-               if (rte_eth_dev_configure(i, nb_queues, nb_queues,
-                                       &local_port_conf)
-                               < 0) {
+               if (rte_eth_dev_configure(i, opt->eth_queues, nb_queues,
+                                         &local_port_conf) < 0) {
                        evt_err("Failed to configure eth port [%d]", i);
                        return -EINVAL;
                }
 
-               if (rte_eth_rx_queue_setup(i, 0, NB_RX_DESC,
-                               rte_socket_id(), &rx_conf, t->pool) < 0) {
-                       evt_err("Failed to setup eth port [%d] rx_queue: %d.",
+               for (j = 0; j < opt->eth_queues; j++) {
+                       if (rte_eth_rx_queue_setup(i, j, NB_RX_DESC,
+                                                  rte_socket_id(), &rx_conf,
+                                                  t->pool) < 0) {
+                               evt_err("Failed to setup eth port [%d] 
rx_queue: %d.",
                                        i, 0);
-                       return -EINVAL;
+                               return -EINVAL;
+                       }
                }
+
                if (rte_eth_tx_queue_setup(i, 0, NB_TX_DESC,
                                        rte_socket_id(), NULL) < 0) {
                        evt_err("Failed to setup eth port [%d] tx_queue: %d.",
@@ -310,12 +329,27 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, 
uint8_t stride,
 {
        int ret = 0;
        uint16_t prod;
+       struct rte_mempool *vector_pool = NULL;
        struct rte_event_eth_rx_adapter_queue_conf queue_conf;
+       struct rte_event_eth_rx_adapter_event_vector_config vec_conf;
 
        memset(&queue_conf, 0,
                        sizeof(struct rte_event_eth_rx_adapter_queue_conf));
        queue_conf.ev.sched_type = opt->sched_type_list[0];
+       if (opt->ena_vector) {
+               unsigned int nb_elem = (opt->pool_sz / opt->vector_size) << 1;
+
+               nb_elem = nb_elem ? nb_elem : 1;
+               vector_pool = rte_event_vector_pool_create(
+                       "vector_pool", nb_elem, 0, opt->vector_size,
+                       opt->socket_id);
+               if (vector_pool == NULL) {
+                       evt_err("failed to create event vector pool");
+                       return -ENOMEM;
+               }
+       }
        RTE_ETH_FOREACH_DEV(prod) {
+               struct rte_event_eth_rx_adapter_vector_limits limits;
                uint32_t cap;
 
                ret = rte_event_eth_rx_adapter_caps_get(opt->dev_id,
@@ -326,6 +360,50 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, 
uint8_t stride,
                                        opt->dev_id);
                        return ret;
                }
+
+               if (opt->ena_vector) {
+                       memset(&limits, 0, sizeof(limits));
+                       ret = rte_event_eth_rx_adapter_vector_limits_get(
+                               opt->dev_id, prod, &limits);
+                       if (ret) {
+                               evt_err("failed to get vector limits");
+                               return ret;
+                       }
+
+                       if (opt->vector_size < limits.min_sz ||
+                           opt->vector_size > limits.max_sz) {
+                               evt_err("Vector size [%d] not within limits 
max[%d] min[%d]",
+                                       opt->vector_size, limits.min_sz,
+                                       limits.max_sz);
+                               return -EINVAL;
+                       }
+
+                       if (limits.log2_sz &&
+                           !rte_is_power_of_2(opt->vector_size)) {
+                               evt_err("Vector size [%d] not power of 2",
+                                       opt->vector_size);
+                               return -EINVAL;
+                       }
+
+                       if (opt->vector_tmo_nsec > limits.max_timeout_ns ||
+                           opt->vector_tmo_nsec < limits.min_timeout_ns) {
+                               evt_err("Vector timeout [%" PRIu64
+                                       "] not within limits max[%" PRIu64
+                                       "] min[%" PRIu64 "]",
+                                       opt->vector_tmo_nsec,
+                                       limits.max_timeout_ns,
+                                       limits.min_timeout_ns);
+                               return -EINVAL;
+                       }
+
+                       if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) {
+                               queue_conf.rx_queue_flags |=
+                               RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR;
+                       } else {
+                               evt_err("Rx adapter doesn't support event 
vector");
+                               return -EINVAL;
+                       }
+               }
                queue_conf.ev.queue_id = prod * stride;
                ret = rte_event_eth_rx_adapter_create(prod, opt->dev_id,
                                &prod_conf);
@@ -340,6 +418,17 @@ pipeline_event_rx_adapter_setup(struct evt_options *opt, 
uint8_t stride,
                        return ret;
                }
 
+               if (opt->ena_vector) {
+                       vec_conf.vector_sz = opt->vector_size;
+                       vec_conf.vector_timeout_ns = opt->vector_tmo_nsec;
+                       vec_conf.vector_mp = vector_pool;
+                       if (rte_event_eth_rx_adapter_queue_event_vector_config(
+                                   prod, prod, -1, &vec_conf) < 0) {
+                               evt_err("Failed to configure event 
vectorization for Rx adapter");
+                               return -EINVAL;
+                       }
+               }
+
                if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
                        uint32_t service_id = -1U;
 
@@ -378,6 +467,14 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
                        return ret;
                }
 
+               if (opt->ena_vector) {
+                       if (!(cap &
+                             RTE_EVENT_ETH_TX_ADAPTER_CAP_EVENT_VECTOR)) {
+                               evt_err("Tx adapter doesn't support event 
vector");
+                               return -EINVAL;
+                       }
+               }
+
                ret = rte_event_eth_tx_adapter_create(consm, opt->dev_id,
                                &port_conf);
                if (ret) {
diff --git a/app/test-eventdev/test_pipeline_common.h 
b/app/test-eventdev/test_pipeline_common.h
index 6e73c6ab2..800a90616 100644
--- a/app/test-eventdev/test_pipeline_common.h
+++ b/app/test-eventdev/test_pipeline_common.h
@@ -101,6 +101,14 @@ pipeline_fwd_event(struct rte_event *ev, uint8_t sched)
        ev->sched_type = sched;
 }
 
+static __rte_always_inline void
+pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
+{
+       ev->event_type = RTE_EVENT_TYPE_CPU_VECTOR;
+       ev->op = RTE_EVENT_OP_FORWARD;
+       ev->sched_type = sched;
+}
+
 static __rte_always_inline void
 pipeline_event_tx(const uint8_t dev, const uint8_t port,
                struct rte_event * const ev)
@@ -110,6 +118,16 @@ pipeline_event_tx(const uint8_t dev, const uint8_t port,
                rte_pause();
 }
 
+static __rte_always_inline void
+pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
+                        struct rte_event *const ev)
+{
+       ev->vec->queue = 0;
+
+       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+               rte_pause();
+}
+
 static __rte_always_inline void
 pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
                struct rte_event *ev, const uint16_t nb_rx)
diff --git a/app/test-eventdev/test_pipeline_queue.c 
b/app/test-eventdev/test_pipeline_queue.c
index 9a9febb19..f6cc3e358 100644
--- a/app/test-eventdev/test_pipeline_queue.c
+++ b/app/test-eventdev/test_pipeline_queue.c
@@ -15,6 +15,8 @@ pipeline_queue_nb_event_queues(struct evt_options *opt)
        return (eth_count * opt->nb_stages) + eth_count;
 }
 
+typedef int (*pipeline_queue_worker_t)(void *arg);
+
 static __rte_noinline int
 pipeline_queue_worker_single_stage_tx(void *arg)
 {
@@ -126,6 +128,125 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
        return 0;
 }
 
+static __rte_noinline int
+pipeline_queue_worker_single_stage_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                       vector_sz = ev.vec->nb_elem;
+                       pipeline_event_tx_vector(dev, port, &ev);
+                       w->processed_pkts += vector_sz;
+               } else {
+                       ev.queue_id++;
+                       pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       pipeline_event_enqueue(dev, port, &ev);
+               }
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               ev.queue_id = tx_queue[ev.vec->port];
+               ev.vec->queue = 0;
+               vector_sz = ev.vec->nb_elem;
+               pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+               pipeline_event_enqueue(dev, port, &ev);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               for (i = 0; i < nb_rx; i++) {
+                       if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                               vector_sz = ev[i].vec->nb_elem;
+                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               w->processed_pkts += vector_sz;
+                       } else {
+                               ev[i].queue_id++;
+                               pipeline_fwd_event_vector(
+                                       &ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       }
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               vector_sz = 0;
+               for (i = 0; i < nb_rx; i++) {
+                       ev[i].queue_id = tx_queue[ev[i].vec->port];
+                       ev[i].vec->queue = 0;
+                       vector_sz += ev[i].vec->nb_elem;
+                       pipeline_fwd_event_vector(&ev[i],
+                                                 RTE_SCHED_TYPE_ATOMIC);
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               w->processed_pkts += vector_sz;
+       }
+
+       return 0;
+}
 
 static __rte_noinline int
 pipeline_queue_worker_multi_stage_tx(void *arg)
@@ -267,6 +388,151 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
        return 0;
 }
 
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               cq_id = ev.queue_id % nb_stages;
+
+               if (ev.queue_id == tx_queue[ev.vec->port]) {
+                       vector_sz = ev.vec->nb_elem;
+                       pipeline_event_tx_vector(dev, port, &ev);
+                       w->processed_pkts += vector_sz;
+                       continue;
+               }
+
+               ev.queue_id++;
+               pipeline_fwd_event_vector(&ev, cq_id != last_queue
+                                                      ? sched_type_list[cq_id]
+                                                      : RTE_SCHED_TYPE_ATOMIC);
+               pipeline_event_enqueue(dev, port, &ev);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+
+               if (!event) {
+                       rte_pause();
+                       continue;
+               }
+
+               cq_id = ev.queue_id % nb_stages;
+
+               if (cq_id == last_queue) {
+                       vector_sz = ev.vec->nb_elem;
+                       ev.queue_id = tx_queue[ev.vec->port];
+                       pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
+                       w->processed_pkts += vector_sz;
+               } else {
+                       ev.queue_id++;
+                       pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
+               }
+
+               pipeline_event_enqueue(dev, port, &ev);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               for (i = 0; i < nb_rx; i++) {
+                       cq_id = ev[i].queue_id % nb_stages;
+
+                       if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
+                               vector_sz = ev[i].vec->nb_elem;
+                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
+                               w->processed_pkts += vector_sz;
+                               continue;
+                       }
+
+                       ev[i].queue_id++;
+                       pipeline_fwd_event_vector(
+                               &ev[i], cq_id != last_queue
+                                               ? sched_type_list[cq_id]
+                                               : RTE_SCHED_TYPE_ATOMIC);
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+       }
+
+       return 0;
+}
+
+static __rte_noinline int
+pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
+{
+       PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t vector_sz;
+
+       while (!t->done) {
+               uint16_t nb_rx =
+                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+
+               if (!nb_rx) {
+                       rte_pause();
+                       continue;
+               }
+
+               for (i = 0; i < nb_rx; i++) {
+                       cq_id = ev[i].queue_id % nb_stages;
+
+                       if (cq_id == last_queue) {
+                               ev[i].queue_id = tx_queue[ev[i].vec->port];
+                               vector_sz = ev[i].vec->nb_elem;
+                               pipeline_fwd_event_vector(
+                                       &ev[i], RTE_SCHED_TYPE_ATOMIC);
+                               w->processed_pkts += vector_sz;
+                       } else {
+                               ev[i].queue_id++;
+                               pipeline_fwd_event_vector(
+                                       &ev[i], sched_type_list[cq_id]);
+                       }
+               }
+
+               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+       }
+
+       return 0;
+}
+
 static int
 worker_wrapper(void *arg)
 {
@@ -275,29 +541,39 @@ worker_wrapper(void *arg)
        const bool burst = evt_has_burst_mode(w->dev_id);
        const bool internal_port = w->t->internal_port;
        const uint8_t nb_stages = opt->nb_stages;
-       RTE_SET_USED(opt);
-
-       if (nb_stages == 1) {
-               if (!burst && internal_port)
-                       return pipeline_queue_worker_single_stage_tx(arg);
-               else if (!burst && !internal_port)
-                       return pipeline_queue_worker_single_stage_fwd(arg);
-               else if (burst && internal_port)
-                       return pipeline_queue_worker_single_stage_burst_tx(arg);
-               else if (burst && !internal_port)
-                       return pipeline_queue_worker_single_stage_burst_fwd(
-                                       arg);
-       } else {
-               if (!burst && internal_port)
-                       return pipeline_queue_worker_multi_stage_tx(arg);
-               else if (!burst && !internal_port)
-                       return pipeline_queue_worker_multi_stage_fwd(arg);
-               else if (burst && internal_port)
-                       return pipeline_queue_worker_multi_stage_burst_tx(arg);
-               else if (burst && !internal_port)
-                       return pipeline_queue_worker_multi_stage_burst_fwd(arg);
+       /*vector/burst/internal_port*/
+       const pipeline_queue_worker_t
+       pipeline_queue_worker_single_stage[2][2][2] = {
+               [0][0][0] = pipeline_queue_worker_single_stage_fwd,
+               [0][0][1] = pipeline_queue_worker_single_stage_tx,
+               [0][1][0] = pipeline_queue_worker_single_stage_burst_fwd,
+               [0][1][1] = pipeline_queue_worker_single_stage_burst_tx,
+               [1][0][0] = pipeline_queue_worker_single_stage_fwd_vector,
+               [1][0][1] = pipeline_queue_worker_single_stage_tx_vector,
+               [1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector,
+               [1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector,
+       };
+       const pipeline_queue_worker_t
+       pipeline_queue_worker_multi_stage[2][2][2] = {
+               [0][0][0] = pipeline_queue_worker_multi_stage_fwd,
+               [0][0][1] = pipeline_queue_worker_multi_stage_tx,
+               [0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd,
+               [0][1][1] = pipeline_queue_worker_multi_stage_burst_tx,
+               [1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector,
+               [1][0][1] = pipeline_queue_worker_multi_stage_tx_vector,
+               [1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector,
+               [1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector,
+       };
+
+       if (nb_stages == 1)
+               return (pipeline_queue_worker_single_stage[opt->ena_vector]
+                                                         [burst]
+                                                         [internal_port])(arg);
+       else
+               return (pipeline_queue_worker_multi_stage[opt->ena_vector]
+                                                        [burst]
+                                                        [internal_port])(arg);
 
-       }
        rte_panic("invalid worker\n");
 }
 
diff --git a/doc/guides/tools/testeventdev.rst 
b/doc/guides/tools/testeventdev.rst
index ad1788a3d..691cf706e 100644
--- a/doc/guides/tools/testeventdev.rst
+++ b/doc/guides/tools/testeventdev.rst
@@ -158,6 +158,26 @@ The following are the application command-line options:
        Set max packet mbuf size. Can be used configure Rx/Tx scatter gather.
        Only applicable for `pipeline_atq` and `pipeline_queue` tests.
 
+* ``--nb_eth_queues``
+
+       Configure multiple Rx queues per each ethernet port.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--enable_vector``
+
+       Enable event vector for Rx/Tx adapters.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--vector_size``
+
+       Vector size to configure for the Rx adapter.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
+* ``--vector_tmo_ns``
+
+       Vector timeout nanoseconds to be configured for the Rx adapter.
+       Only applicable for `pipeline_atq` and `pipeline_queue` tests.
+
 
 Eventdev Tests
 --------------
@@ -607,6 +627,10 @@ Supported application command line options are following::
         --worker_deq_depth
         --prod_type_ethdev
         --deq_tmo_nsec
+        --nb_eth_queues
+        --enable_vector
+        --vector_size
+        --vector_tmo_ns
 
 
 .. Note::
@@ -699,6 +723,10 @@ Supported application command line options are following::
         --worker_deq_depth
         --prod_type_ethdev
         --deq_tmo_nsec
+        --nb_eth_queues
+        --enable_vector
+        --vector_size
+        --vector_tmo_ns
 
 
 .. Note::
-- 
2.17.1

Reply via email to