From: Bogdan Pricope <[email protected]>

Update packet receive mode by adding direct pktin mode.
Direct mode should increase throughput on RX side.

Signed-off-by: Bogdan Pricope <[email protected]>
---
/** Email created from pull request 343 (bogdanPricope:generator_rx_direct_pr)
 ** https://github.com/Linaro/odp/pull/343
 ** Patch: https://github.com/Linaro/odp/pull/343.patch
 ** Base sha: 6b5cdc77eb9759a2349b10372a964648559bc92c
 ** Merge commit sha: d3f54aff752fe7ae7f51c4caa2f714dc659dd7a5
 **/
 example/generator/odp_generator.c | 257 ++++++++++++++++++++++++++------------
 1 file changed, 174 insertions(+), 83 deletions(-)

diff --git a/example/generator/odp_generator.c 
b/example/generator/odp_generator.c
index 956161a61..3bd844b2a 100644
--- a/example/generator/odp_generator.c
+++ b/example/generator/odp_generator.c
@@ -54,6 +54,8 @@ typedef struct {
        odp_pktio_config_t config;
        odp_pktout_queue_t pktout[MAX_WORKERS];
        unsigned pktout_count;
+       odp_pktin_queue_t pktin[MAX_WORKERS];
+       unsigned pktin_count;
 } interface_t;
 
 /**
@@ -83,6 +85,7 @@ typedef struct {
        int rx_burst;   /**< number of packets to receive with one
                                      API call */
        odp_bool_t csum;        /**< use platform csum support if available */
+       odp_bool_t sched;       /**< use scheduler API to receive packets */
 } appl_args_t;
 
 /**
@@ -109,6 +112,7 @@ typedef struct {
                        odp_pktout_config_opt_t *pktout_cfg; /**< Packet output 
config*/
                } tx;
                struct {
+                       odp_pktin_queue_t pktin; /**< Packet input queue */
                        interface_t *ifs; /**< Interfaces array */
                        int ifs_count; /**< Interfaces array size */
                } rx;
@@ -517,10 +521,15 @@ static int create_pktio(const char *dev, odp_pool_t pool,
        odp_pktio_param_t pktio_param;
        odp_pktin_queue_param_t pktin_param;
        odp_pktout_queue_param_t pktout_param;
-       odp_pktio_op_mode_t pktout_mode;
+       odp_pktio_op_mode_t pktout_mode, pktin_mode;
+       odp_bool_t sched = args->appl.sched;
 
        odp_pktio_param_init(&pktio_param);
-       pktio_param.in_mode = ODP_PKTIN_MODE_SCHED;
+       pktio_param.in_mode = num_rx_queues ?
+               (sched ? ODP_PKTIN_MODE_SCHED : ODP_PKTIN_MODE_DIRECT) :
+               ODP_PKTIN_MODE_DISABLED;
+       pktio_param.out_mode = num_tx_queues ? ODP_PKTOUT_MODE_DIRECT :
+               ODP_PKTOUT_MODE_DISABLED;
 
        /* Open a packet IO instance */
        itf->pktio = odp_pktio_open(dev, pool, &pktio_param);
@@ -563,31 +572,46 @@ static int create_pktio(const char *dev, odp_pool_t pool,
                return -1;
        }
 
-       if (num_rx_queues > capa.max_input_queues)
-               num_rx_queues = capa.max_input_queues;
+       if (num_rx_queues) {
+               pktin_mode = ODP_PKTIO_OP_MT_UNSAFE;
+               if (num_rx_queues > capa.max_input_queues) {
+                       num_rx_queues = capa.max_input_queues;
+                       pktin_mode = ODP_PKTIO_OP_MT;
+                       EXAMPLE_DBG("Warning: Force RX multithread safe mode "
+                                   "(slower)on %s\n",  dev);
+               }
 
-       odp_pktin_queue_param_init(&pktin_param);
-       pktin_param.num_queues = num_rx_queues;
-       pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ATOMIC;
+               odp_pktin_queue_param_init(&pktin_param);
+               pktin_param.num_queues = num_rx_queues;
+               pktin_param.op_mode = pktin_mode;
+               if (sched)
+                       pktin_param.queue_param.sched.sync =
+                               ODP_SCHED_SYNC_ATOMIC;
 
-       if (odp_pktin_queue_config(itf->pktio, &pktin_param)) {
-               EXAMPLE_ERR("Error: pktin queue config failed for %s\n", dev);
-               return -1;
+               if (odp_pktin_queue_config(itf->pktio, &pktin_param)) {
+                       EXAMPLE_ERR("Error: Pktin config failed for %s\n", dev);
+                       return -1;
+               }
        }
 
-       pktout_mode = ODP_PKTIO_OP_MT_UNSAFE;
-       if (num_tx_queues > capa.max_output_queues) {
-               num_tx_queues = capa.max_output_queues;
-               pktout_mode = ODP_PKTIO_OP_MT;
-       }
+       if (num_tx_queues) {
+               pktout_mode = ODP_PKTIO_OP_MT_UNSAFE;
+               if (num_tx_queues > capa.max_output_queues) {
+                       num_tx_queues = capa.max_output_queues;
+                       pktout_mode = ODP_PKTIO_OP_MT;
+                       EXAMPLE_DBG("Warning: Force TX multithread safe mode "
+                                   "(slower) on %s\n", dev);
+               }
 
-       odp_pktout_queue_param_init(&pktout_param);
-       pktout_param.num_queues = num_tx_queues;
-       pktout_param.op_mode = pktout_mode;
+               odp_pktout_queue_param_init(&pktout_param);
+               pktout_param.num_queues = num_tx_queues;
+               pktout_param.op_mode = pktout_mode;
 
-       if (odp_pktout_queue_config(itf->pktio, &pktout_param)) {
-               EXAMPLE_ERR("Error: pktout queue config failed for %s\n", dev);
-               return -1;
+               if (odp_pktout_queue_config(itf->pktio, &pktout_param)) {
+                       EXAMPLE_ERR("Error: Pktout config failed for %s\n",
+                                   dev);
+                       return -1;
+               }
        }
 
        ret = odp_pktio_start(itf->pktio);
@@ -595,12 +619,21 @@ static int create_pktio(const char *dev, odp_pool_t pool,
                EXAMPLE_ABORT("Error: unable to start %s\n", dev);
 
        itf->pktout_count = num_tx_queues;
-       if (odp_pktout_queue(itf->pktio, itf->pktout, itf->pktout_count) !=
-                            (int)itf->pktout_count) {
+       if (itf->pktout_count &&
+           odp_pktout_queue(itf->pktio, itf->pktout, itf->pktout_count) !=
+           (int)itf->pktout_count) {
                EXAMPLE_ERR("Error: failed to get output queues for %s\n", dev);
                return -1;
        }
 
+       itf->pktin_count = num_rx_queues;
+       if (!sched && itf->pktin_count &&
+           odp_pktin_queue(itf->pktio, itf->pktin, itf->pktin_count) !=
+           (int)itf->pktin_count) {
+               EXAMPLE_ERR("Error: failed to get input queues for %s\n", dev);
+               return -1;
+       }
+
        printf("  created pktio:%02" PRIu64
               ", dev:%s, queue mode (ATOMIC queues)\n"
               "          default pktio%02" PRIu64 "\n",
@@ -768,14 +801,14 @@ static void process_icmp_pkt(thread_args_t *thr_args,
 }
 
 /**
- * Print odp packets
+ * Process odp packets
  *
  * @param  thr worker id
  * @param  pkt_tbl packets to be print
  * @param  len packet number
  */
-static void print_pkts(int thr, thread_args_t *thr_args,
-                      odp_packet_t pkt_tbl[], unsigned len)
+static void process_pkts(int thr, thread_args_t *thr_args,
+                        odp_packet_t pkt_tbl[], unsigned len)
 {
        odp_packet_t pkt;
        char *buf;
@@ -784,10 +817,33 @@ static void print_pkts(int thr, thread_args_t *thr_args,
        unsigned i;
        size_t offset;
        char msg[1024];
+       interface_t *itfs, *itf;
+
+       itfs = thr_args->rx.ifs;
 
        for (i = 0; i < len; ++i) {
                pkt = pkt_tbl[i];
 
+               itf = &itfs[odp_pktio_index(odp_packet_input(pkt))];
+
+               if (odp_packet_has_ipv4(pkt)) {
+                       if (itf->config.pktin.bit.ipv4_chksum) {
+                               if (odp_packet_has_l3_error(pkt))
+                                       printf("HW detected L3 error\n");
+                       }
+               }
+
+               if (odp_packet_has_udp(pkt)) {
+                       if (itf->config.pktin.bit.udp_chksum) {
+                               if (odp_packet_has_l4_error(pkt))
+                                       printf("HW detected L4 error\n");
+                       }
+               }
+
+               /* Drop packets with errors */
+               if (odp_unlikely(odp_packet_has_error(pkt)))
+                       continue;
+
                /* only ip pkts */
                if (!odp_packet_has_ipv4(pkt))
                        continue;
@@ -820,15 +876,13 @@ static int gen_recv_thread(void *arg)
 {
        int thr;
        thread_args_t *thr_args;
-       odp_packet_t pkts[MAX_RX_BURST], pkt;
-       odp_event_t events[MAX_RX_BURST];
-       int pkt_cnt, ev_cnt, i;
-       int burst_size;
-       interface_t *itfs, *itf;
+       odp_packet_t pkts[MAX_RX_BURST];
+       int pkt_cnt, burst_size;
+       odp_pktin_queue_t pktin;
 
        thr = odp_thread_id();
        thr_args = (thread_args_t *)arg;
-       itfs = thr_args->rx.ifs;
+       pktin = thr_args->rx.pktin;
        burst_size = args->rx_burst_size;
 
        printf("  [%02i] created mode: RECEIVE\n", thr);
@@ -838,39 +892,55 @@ static int gen_recv_thread(void *arg)
                if (thr_args->stop)
                        break;
 
-               /* Use schedule to get buf from any input queue */
-               ev_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
-                                           events, burst_size);
-               if (ev_cnt == 0)
-                       continue;
-               for (i = 0, pkt_cnt = 0; i < ev_cnt; i++) {
-                       pkt = odp_packet_from_event(events[i]);
-                       itf = &itfs[odp_pktio_index(odp_packet_input(pkt))];
-
-                       if (odp_packet_has_ipv4(pkt)) {
-                               if (itf->config.pktin.bit.ipv4_chksum) {
-                                       if (odp_packet_has_l3_error(pkt))
-                                               printf("HW detected L3 
error\n");
-                               }
-                       }
+               pkt_cnt = odp_pktin_recv_tmo(pktin, pkts, burst_size,
+                                            ODP_PKTIN_NO_WAIT);
 
-                       if (odp_packet_has_udp(pkt)) {
-                               if (itf->config.pktin.bit.udp_chksum) {
-                                       if (odp_packet_has_l4_error(pkt))
-                                               printf("HW detected L4 
error\n");
-                               }
-                       }
+               if (pkt_cnt > 0) {
+                       process_pkts(thr, thr_args, pkts, pkt_cnt);
 
-                       /* Drop packets with errors */
-                       if (odp_unlikely(odp_packet_has_error(pkt))) {
-                               odp_packet_free(pkt);
-                               continue;
-                       }
-                       pkts[pkt_cnt++] = pkt;
+                       odp_packet_free_multi(pkts, pkt_cnt);
+               } else if (pkt_cnt == 0) {
+                       continue;
+               } else {
+                       break;
                }
+       }
+
+       return 0;
+}
 
-               if (pkt_cnt) {
-                       print_pkts(thr, thr_args, pkts, pkt_cnt);
+/**
+ * Scheduler receive function
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static int gen_recv_sched_thread(void *arg)
+{
+       int thr;
+       thread_args_t *thr_args;
+       odp_packet_t pkts[MAX_RX_BURST];
+       odp_event_t events[MAX_RX_BURST];
+       int pkt_cnt, burst_size, i;
+
+       thr = odp_thread_id();
+       thr_args = (thread_args_t *)arg;
+       burst_size = args->rx_burst_size;
+
+       printf("  [%02i] created mode: RECEIVE SCHEDULER\n", thr);
+       odp_barrier_wait(&barrier);
+
+       for (;;) {
+               if (thr_args->stop)
+                       break;
+
+               pkt_cnt = odp_schedule_multi(NULL, ODP_SCHED_NO_WAIT,
+                                            events, burst_size);
+
+               if (pkt_cnt > 0) {
+                       for (i = 0; i < pkt_cnt; i++)
+                               pkts[i] = odp_packet_from_event(events[i]);
+
+                       process_pkts(thr, thr_args, pkts, pkt_cnt);
 
                        odp_packet_free_multi(pkts, pkt_cnt);
                }
@@ -1133,28 +1203,29 @@ int main(int argc, char *argv[])
 
        ifs = malloc(sizeof(interface_t) * args->appl.if_count);
 
-       if (args->appl.mode == APPL_MODE_PING ||
-           args->appl.mode == APPL_MODE_UDP)
-               num_rx_queues = 1;
-       else
-               num_rx_queues = num_workers;
-
-       if (args->appl.mode == APPL_MODE_PING ||
-           args->appl.mode == APPL_MODE_RCV)
-               num_tx_queues = 1;
-       else {
-               num_tx_queues = num_workers / args->appl.if_count;
-               if (num_workers % args->appl.if_count)
-                       num_tx_queues++;
-       }
+       for (i = 0; i < args->appl.if_count; ++i) {
+               if (args->appl.mode == APPL_MODE_PING) {
+                       num_rx_queues = 1;
+                       num_tx_queues = 1;
+               } else if (args->appl.mode == APPL_MODE_UDP) {
+                       num_rx_queues = 0;
+                       num_tx_queues = num_workers / args->appl.if_count;
+                       if (i < num_workers % args->appl.if_count)
+                               num_tx_queues++;
+               } else { /* APPL_MODE_RCV*/
+                       num_rx_queues = num_workers / args->appl.if_count;
+                       if (i < num_workers % args->appl.if_count)
+                               num_rx_queues++;
+                       num_tx_queues = 0;
+               }
 
-       for (i = 0; i < args->appl.if_count; ++i)
                if (create_pktio(args->appl.if_names[i], pool, num_rx_queues,
                                 num_tx_queues, &ifs[i])) {
                        EXAMPLE_ERR("Error: create interface %s failed.\n",
                                    args->appl.if_names[i]);
                        exit(EXIT_FAILURE);
                }
+       }
 
        /* Create and init worker threads */
        memset(thread_tbl, 0, sizeof(thread_tbl));
@@ -1182,6 +1253,8 @@ int main(int argc, char *argv[])
                        abort();
                }
                thr_args = &args->thread[PING_THR_RX];
+               if (!args->appl.sched)
+                       thr_args->rx.pktin = ifs[0].pktin[0];
                thr_args->rx.ifs = ifs;
                thr_args->rx.ifs_count = args->appl.if_count;
                thr_args->pool = pool;
@@ -1200,7 +1273,10 @@ int main(int argc, char *argv[])
                thr_args->mode = args->appl.mode;
 
                memset(&thr_params, 0, sizeof(thr_params));
-               thr_params.start    = gen_recv_thread;
+               if (args->appl.sched)
+                       thr_params.start = gen_recv_sched_thread;
+               else
+                       thr_params.start = gen_recv_thread;
                thr_params.arg      = thr_args;
                thr_params.thr_type = ODP_THREAD_WORKER;
                thr_params.instance = instance;
@@ -1246,21 +1322,27 @@ int main(int argc, char *argv[])
                for (i = 0; i < num_workers; ++i) {
                        odp_cpumask_t thd_mask;
                        int (*thr_run_func)(void *);
-                       int if_idx, pktout_idx;
+                       int if_idx, pktq_idx;
                        uint64_t start_seq;
 
+                       if_idx = i % args->appl.if_count;
+
                        if (args->appl.mode == APPL_MODE_RCV) {
+                               pktq_idx = (i / args->appl.if_count) %
+                                       ifs[if_idx].pktin_count;
+                               if (!args->appl.sched)
+                                       args->thread[i].rx.pktin =
+                                               ifs[if_idx].pktin[pktq_idx];
                                args->thread[i].rx.ifs = ifs;
                                args->thread[i].rx.ifs_count =
                                        args->appl.if_count;
                        } else {
-                               if_idx = i % args->appl.if_count;
-                               pktout_idx = (i / args->appl.if_count) %
+                               pktq_idx = (i / args->appl.if_count) %
                                        ifs[if_idx].pktout_count;
                                start_seq = i * args->tx_burst_size;
 
                                args->thread[i].tx.pktout =
-                                       ifs[if_idx].pktout[pktout_idx];
+                                       ifs[if_idx].pktout[pktq_idx];
                                args->thread[i].tx.pktout_cfg =
                                        &ifs[if_idx].config.pktout;
                                args->thread[i].counters.ctr_seq = start_seq;
@@ -1288,7 +1370,10 @@ int main(int argc, char *argv[])
                        if (args->appl.mode == APPL_MODE_UDP) {
                                thr_run_func = gen_send_thread;
                        } else if (args->appl.mode == APPL_MODE_RCV) {
-                               thr_run_func = gen_recv_thread;
+                               if (args->appl.sched)
+                                       thr_run_func = gen_recv_sched_thread;
+                               else
+                                       thr_run_func = gen_recv_thread;
                        } else {
                                EXAMPLE_ERR("ERR MODE\n");
                                exit(EXIT_FAILURE);
@@ -1388,10 +1473,11 @@ static void parse_args(int argc, char *argv[], 
appl_args_t *appl_args)
                {"udp_tx_burst", required_argument, NULL, 'x'},
                {"rx_burst", required_argument, NULL, 'r'},
                {"csum", no_argument, NULL, 'y'},
+               {"sched", no_argument, NULL, 'z'},
                {NULL, 0, NULL, 0}
        };
 
-       static const char *shortopts = "+I:a:b:s:d:p:i:m:n:t:w:c:x:he:f:yr:";
+       static const char *shortopts = "+I:a:b:s:d:p:i:m:n:t:w:c:x:he:f:yr:z";
 
        /* let helper collect its own arguments (e.g. --odph_proc) */
        odph_parse_options(argc, argv, shortopts, longopts);
@@ -1406,6 +1492,7 @@ static void parse_args(int argc, char *argv[], 
appl_args_t *appl_args)
        appl_args->srcport = 0;
        appl_args->dstport = 0;
        appl_args->csum = 0;
+       appl_args->sched = 0;
 
        opterr = 0; /* do not issue errors on helper options */
 
@@ -1557,6 +1644,9 @@ static void parse_args(int argc, char *argv[], 
appl_args_t *appl_args)
                case 'y':
                        appl_args->csum = 1;
                        break;
+               case 'z':
+                       appl_args->sched = 1;
+                       break;
                case 'h':
                        usage(argv[0]);
                        exit(EXIT_SUCCESS);
@@ -1646,6 +1736,7 @@ static void usage(char *progname)
               "  -r, --rx_burst size of RX burst\n"
               "  -y, --csum use platform checksum support if available\n"
               "                 default is disabled\n"
+              "  -z, --sched use scheduler API to receive packets\n"
               "\n", NO_PATH(progname), NO_PATH(progname)
              );
 }

Reply via email to