ixgbe_pmd has a minimum supported RX burst size of 4. If
less than 4 packets are requested with
odp_pktio_recv_queue(),  rte_eth_rx_burst() is called with
nb_pkts=4 and the possibly received extra packets are cached
for the next dpdk_recv_queue() call to use.

Reviewed-by: Petri Savolainen <petri.savolai...@nokia.com>
Signed-off-by: Matias Elo <matias....@nokia.com>
---
 platform/linux-generic/include/odp_packet_dpdk.h | 18 +++++++++
 platform/linux-generic/pktio/dpdk.c              | 47 ++++++++++++++++++++++--
 2 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/platform/linux-generic/include/odp_packet_dpdk.h 
b/platform/linux-generic/include/odp_packet_dpdk.h
index 3244175..d1e152f 100644
--- a/platform/linux-generic/include/odp_packet_dpdk.h
+++ b/platform/linux-generic/include/odp_packet_dpdk.h
@@ -30,6 +30,21 @@ _ODP_STATIC_ASSERT(DPDK_NB_MBUF % DPDK_MEMPOOL_CACHE_SIZE == 
0 &&
                   , "DPDK mempool cache size failure");
 #endif
 
+#define DPDK_IXGBE_MIN_RX_BURST 4
+
+/** Cache for storing packets */
+struct pkt_cache_t {
+       /** array for storing extra RX packets */
+       struct rte_mbuf *pkt[DPDK_IXGBE_MIN_RX_BURST];
+       unsigned idx;                     /**< head of cache */
+       unsigned count;                   /**< packets in cache */
+};
+
+typedef union {
+       struct pkt_cache_t s;
+       uint8_t pad[ODP_CACHE_LINE_SIZE_ROUNDUP(sizeof(struct pkt_cache_t))];
+} pkt_cache_t ODP_ALIGNED_CACHE;
+
 /** Packet IO using DPDK interface */
 typedef struct {
        odp_pool_t pool;                  /**< pool to alloc packets from */
@@ -40,11 +55,14 @@ typedef struct {
        /** DPDK packet pool name (pktpool_<ifname>) */
        char pool_name[IF_NAMESIZE + 8];
        uint8_t port_id;                  /**< DPDK port identifier */
+       unsigned min_rx_burst;            /**< minimum RX burst size */
        odp_pktin_hash_proto_t hash;      /**< Packet input hash protocol */
        odp_bool_t lockless_rx;           /**< no locking for rx */
        odp_bool_t lockless_tx;           /**< no locking for tx */
        odp_ticketlock_t rx_lock[PKTIO_MAX_QUEUES];  /**< RX queue locks */
        odp_ticketlock_t tx_lock[PKTIO_MAX_QUEUES];  /**< TX queue locks */
+       /** cache for storing extra RX packets */
+       pkt_cache_t rx_cache[PKTIO_MAX_QUEUES];
 } pkt_dpdk_t;
 
 #endif
diff --git a/platform/linux-generic/pktio/dpdk.c 
b/platform/linux-generic/pktio/dpdk.c
index 10600db..d35896e 100644
--- a/platform/linux-generic/pktio/dpdk.c
+++ b/platform/linux-generic/pktio/dpdk.c
@@ -423,6 +423,11 @@ static int dpdk_open(odp_pktio_t id ODP_UNUSED,
        }
        pkt_dpdk->mtu = mtu;
 
+       if (!strcmp(dev_info.driver_name, "rte_ixgbe_pmd"))
+               pkt_dpdk->min_rx_burst = DPDK_IXGBE_MIN_RX_BURST;
+       else
+               pkt_dpdk->min_rx_burst = 0;
+
        /* Look for previously opened packet pool */
        pkt_pool = rte_mempool_lookup(pkt_dpdk->pool_name);
        if (pkt_pool == NULL)
@@ -615,15 +620,51 @@ static int dpdk_recv_queue(pktio_entry_t *pktio_entry,
                           int num)
 {
        pkt_dpdk_t *pkt_dpdk = &pktio_entry->s.pkt_dpdk;
+       pkt_cache_t *rx_cache = &pkt_dpdk->rx_cache[index];
        uint16_t nb_rx;
-
        struct rte_mbuf *rx_mbufs[num];
+       int i;
+       unsigned cache_idx;
 
        if (!pkt_dpdk->lockless_rx)
                odp_ticketlock_lock(&pkt_dpdk->rx_lock[index]);
+       /**
+        * ixgbe_pmd has a minimum supported RX burst size ('min_rx_burst'). If
+        * 'num' < 'min_rx_burst', 'min_rx_burst' is used as rte_eth_rx_burst()
+        * argument and the possibly received extra packets are cached for the
+        * next dpdk_recv_queue() call to use.
+        *
+        * Either use cached packets or receive new ones. Not both during the
+        * same call. */
+       if (rx_cache->s.count > 0) {
+               for (i = 0; i < num && rx_cache->s.count; i++) {
+                       rx_mbufs[i] = rx_cache->s.pkt[rx_cache->s.idx];
+                       rx_cache->s.idx++;
+                       rx_cache->s.count--;
+               }
+               nb_rx = i;
+       } else if ((unsigned)num < pkt_dpdk->min_rx_burst) {
+               struct rte_mbuf *new_mbufs[pkt_dpdk->min_rx_burst];
 
-       nb_rx = rte_eth_rx_burst(pktio_entry->s.pkt_dpdk.port_id, index,
-                                rx_mbufs, num);
+               nb_rx = rte_eth_rx_burst(pktio_entry->s.pkt_dpdk.port_id, index,
+                                        new_mbufs, pkt_dpdk->min_rx_burst);
+
+               rx_cache->s.idx = 0;
+               for (i = 0; i < nb_rx; i++) {
+                       if (i < num) {
+                               rx_mbufs[i] = new_mbufs[i];
+                       } else {
+                               cache_idx = rx_cache->s.count;
+                               rx_cache->s.pkt[cache_idx] = new_mbufs[i];
+                               rx_cache->s.count++;
+                       }
+               }
+               nb_rx = RTE_MIN(num, nb_rx);
+
+       } else {
+               nb_rx = rte_eth_rx_burst(pktio_entry->s.pkt_dpdk.port_id, index,
+                                        rx_mbufs, num);
+       }
 
        if (nb_rx > 0)
                nb_rx = mbuf_to_pkt(pktio_entry, pkt_table, rx_mbufs, nb_rx);
-- 
1.9.1

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to