Version 2: Changes:
Rebased due to recent changes in code.
Made coding style changes based on feedback from Ben Pfaff.

Put in a DPDK queue to receive multiple SMP input from vSwitch for NIC TX 
output.
Eliminated the inside polling loop SMP TX output lock (DPDK queue handles SMP).
Reused SMP tx-lock for non-polling operation to allow TX output by a 
non-polling thread
     when interface not being polled. Lock only accessed only when polling is 
not enabled.
Added new netdev subroutine to control polling lock and enable and disable flag.
Packets do not get discarded between TX pre-queue and NIC queue to handle 
surges.
Removed new code packet buffer leak.

Measured improved port to port packet rates.

Measured improved average PMD port to port 2544 zero loss packet rate of 
299,830 for 
packets 256 bytes and smaller. Predict double that when using 1 cpu 
core/interface.

Observed better persistence of obtaining 100% 10 GbE for larger packets with 
the added
DPDK queue, consistent with other tests outside of OVS where large surges from 
fast path 
interfaces transferring larger sized packets from VMs were being absorbed in 
the NIC 
TX pre-queue and TX queue and packet loss was suppressed.

Requires earlier patch: PATCH [1/1] High speed PMD physical NIC queue size

Signed-off-by: Mike A. Polehn <mike.a.pol...@intel.com>

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
old mode 100644
new mode 100755
index f490900..2a6d79f
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -1894,6 +1894,11 @@ reload:
     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
     atomic_read(&f->change_seq, &port_seq);
 
+    /* Get polling ownership of interfaces. */
+    for (i = 0; i < poll_cnt; i++) {
+       netdev_rxq_set_polling(poll_list[i].rx, true);
+    }
+
     for (;;) {
         unsigned int c_port_seq;
         int i;
@@ -1916,6 +1921,11 @@ reload:
         }
     }
 
+    /* Release polling ownership of interfaces */
+    for (i = 0; i < poll_cnt; i++) {
+       netdev_rxq_set_polling(poll_list[i].rx, false);
+    }
+
     if (!latch_is_set(&f->dp->exit_latch)){
         goto reload;
     }
diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
index 65ae9f9..fc77b6a 100644
--- a/lib/netdev-bsd.c
+++ b/lib/netdev-bsd.c
@@ -1609,6 +1609,7 @@ netdev_bsd_update_flags(struct netdev *netdev_, enum 
netdev_flags off,
     netdev_bsd_rxq_recv,                             \
     netdev_bsd_rxq_wait,                             \
     netdev_bsd_rxq_drain,                            \
+    NULL, /* rxq_set_polling */                       \
 }
 
 const struct netdev_class netdev_bsd_class =
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index eb06595..e26c6fb 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -73,6 +73,8 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 
20);
 
 #define NIC_PORT_RX_Q_SIZE 2048  /* Size of Physical NIC RX Queue (n*32<4096)*/
 #define NIC_PORT_TX_Q_SIZE 2048  /* Size of Physical NIC TX Queue (n*32<4096)*/
+#define NIC_TX_PRE_Q_SIZE  4096  /* Size of Physical NIC TX Pre-Que (2**n)*/
+#define NIC_TX_PRE_Q_TRANS   64  /* Pre-Que to Physical NIC Que Transfer */
 
 /* TODO: Needs per NIC value for these constants. */
 #define RX_PTHRESH 32 /* Default values of RX prefetch threshold reg. */
@@ -124,8 +126,6 @@ static const struct rte_eth_txconf tx_conf = {
 };
 
 enum { MAX_RX_QUEUE_LEN = 64 };
-enum { MAX_TX_QUEUE_LEN = 64 };
-enum { DRAIN_TSC = 200000ULL };
 
 static int rte_eal_init_ret = ENODEV;
 
@@ -147,10 +147,12 @@ struct dpdk_mp {
 };
 
 struct dpdk_tx_queue {
-    rte_spinlock_t tx_lock;
+    bool is_polled;
+    int port_id;
     int count;
-    uint64_t tsc;
-    struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
+    struct rte_mbuf *tx_trans[NIC_TX_PRE_Q_TRANS];
+    struct rte_ring *tx_preq;
+    rte_spinlock_t tx_lock;
 };
 
 struct netdev_dpdk {
@@ -363,6 +365,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) 
OVS_REQUIRES(dpdk_mutex)
     struct ether_addr eth_addr;
     int diag;
     int i;
+    char qname[32];
 
     if (dev->port_id < 0 || dev->port_id >= rte_eth_dev_count()) {
         return -ENODEV;
@@ -375,12 +378,21 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) 
OVS_REQUIRES(dpdk_mutex)
     }
 
     for (i = 0; i < NR_QUEUE; i++) {
+        dev->tx_q[i].port_id = dev->port_id;
         diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE,
                                       dev->socket_id, &tx_conf);
         if (diag) {
             VLOG_ERR("eth dev tx queue setup error %d",diag);
             return diag;
         }
+
+        snprintf(qname, sizeof(qname),"NIC_TX_Pre_Q_%u_%u", dev->port_id, i);
+        dev->tx_q[i].tx_preq = rte_ring_create(qname, NIC_TX_PRE_Q_SIZE,
+                                               dev->socket_id, RING_F_SC_DEQ);
+        if (dev->tx_q[i].tx_preq == NULL) {
+            VLOG_ERR("eth dev tx pre-queue alloc error");
+            return -ENOMEM;
+        }
     }
 
     for (i = 0; i < NR_QUEUE; i++) {
@@ -454,6 +466,7 @@ netdev_dpdk_construct(struct netdev *netdev_)
     port_no = strtol(cport, 0, 0); /* string must be null terminated */
 
     for (i = 0; i < NR_QUEUE; i++) {
+        netdev->tx_q[i].is_polled = false;
         rte_spinlock_init(&netdev->tx_q[i].tx_lock);
     }
 
@@ -571,24 +584,64 @@ netdev_dpdk_rxq_dealloc(struct netdev_rxq *rxq_)
 }
 
 inline static void
-dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
+dpdk_port_out(struct dpdk_tx_queue *tx_q, int qid)
 {
-    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
-    uint32_t nb_tx;
+    /* Get packets off NIC TX staging pre-queue. */
+    if (OVS_LIKELY(tx_q->count == 0)) {
+        tx_q->count =  rte_ring_sc_dequeue_burst(tx_q->tx_preq, 
+            (void **)&tx_q->tx_trans[0], NIC_TX_PRE_Q_TRANS);
+    }
 
-    if (txq->count == 0) {
-        return;
+    /* Send packets to NIC physical TX queue. */
+    if (OVS_LIKELY(tx_q->count != 0)) {
+        unsigned sent = rte_eth_tx_burst(tx_q->port_id, qid, tx_q->tx_trans,
+                                         tx_q->count);
+        tx_q->count -= sent;
+
+        if (OVS_UNLIKELY((tx_q->count != 0) && (sent > 0))) {
+            /* Move unsent packets to front of list. */
+            memmove(&tx_q->tx_trans[0], &tx_q->tx_trans[sent],
+                    (sizeof(struct rte_mbuf *) * tx_q->count));
+        }
+    }
+}
+
+static void
+netdev_dpdk_set_poll(struct netdev_rxq *rxq_, bool enable)
+{
+    struct netdev_rxq_dpdk *rx = netdev_rxq_dpdk_cast(rxq_);
+    struct netdev *netdev = rx->up.netdev;
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *tx_q = &dev->tx_q[rxq_->queue_id];
+
+    if (enable) {
+        tx_q->is_polled = true;
+        /* Get polling ownership. */
+        rte_spinlock_lock(&tx_q->tx_lock);
+    } else {
+        tx_q->is_polled = false;
+        /* Clear queue after flag for race of the non-pollng queuer. */
+        dpdk_port_out(tx_q, rxq_->queue_id);
+        rte_spinlock_unlock(&tx_q->tx_lock);
     }
-    rte_spinlock_lock(&txq->tx_lock);
-    nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
-    if (nb_tx != txq->count) {
-        /* free buffers if we couldn't transmit packets */
-        rte_mempool_put_bulk(dev->dpdk_mp->mp,
-                             (void **) &txq->burst_pkts[nb_tx],
-                             (txq->count - nb_tx));
+}
+
+static void
+dpdk_push_tx_to_nic(struct netdev_dpdk *dev, int qid)
+{
+    struct dpdk_tx_queue *tx_q = &dev->tx_q[qid];
+
+    for (;;) {
+        if (tx_q->is_polled) {
+            return;  /* A polling owner now present, owner will output. */
+        }
+
+        if (rte_spinlock_trylock(&tx_q->tx_lock)) {              
+            dpdk_port_out(tx_q, qid);
+            rte_spinlock_unlock(&tx_q->tx_lock);
+            return;
+        }
     }
-    txq->count = 0;
-    rte_spinlock_unlock(&txq->tx_lock);
 }
 
 static int
@@ -598,9 +651,13 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct 
dpif_packet **packets,
     struct netdev_rxq_dpdk *rx = netdev_rxq_dpdk_cast(rxq_);
     struct netdev *netdev = rx->up.netdev;
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *tx_q = &dev->tx_q[rxq_->queue_id];
     int nb_rx;
 
-    dpdk_queue_flush(dev, rxq_->queue_id);
+    /* When in polling mode, push tx out */
+    if (OVS_LIKELY(tx_q->is_polled)) {
+        dpdk_port_out(tx_q, rxq_->queue_id);
+    }
 
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
                              (struct rte_mbuf **) packets,
@@ -615,53 +672,37 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct 
dpif_packet **packets,
     return 0;
 }
 
+/* Queue Packets to DPDK queue, Push packets to physical NIC queue 
+ * if interface not in polling mode.
+ */
+
 inline static void
 dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
-               struct rte_mbuf **pkts, int cnt)
+                struct rte_mbuf **pkts, int cnt)
 {
-    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
-    uint64_t diff_tsc;
-    uint64_t cur_tsc;
-    uint32_t nb_tx;
+    struct dpdk_tx_queue *tx_q = &dev->tx_q[qid];
+    int n_qed;
 
-    int i = 0;
+    /* Queuing can occur from any thread. */
+    n_qed = rte_ring_mp_enqueue_burst(tx_q->tx_preq,
+                                      (void **) pkts, cnt);
 
-    rte_spinlock_lock(&txq->tx_lock);
-    while (i < cnt) {
-        int freeslots = MAX_TX_QUEUE_LEN - txq->count;
-        int tocopy = MIN(freeslots, cnt-i);
+    if (OVS_UNLIKELY(n_qed != cnt)) {  /* Discard at tx prequeue. */
 
-        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
-               tocopy * sizeof (struct rte_mbuf *));
+        /* Free packets when no room in queue. */
+        int dropped = cnt - n_qed;
+        rte_mempool_put_bulk(dev->dpdk_mp->mp, (void **) &pkts[n_qed],
+                             dropped);
 
-        txq->count += tocopy;
-        i += tocopy;
+        ovs_mutex_lock(&dev->mutex);
+        dev->stats.tx_dropped += dropped;
+        ovs_mutex_unlock(&dev->mutex);
+    }
 
-        if (txq->count == MAX_TX_QUEUE_LEN) {
-            goto flush;
-        }
-        cur_tsc = rte_get_timer_cycles();
-        if (txq->count == 1) {
-            txq->tsc = cur_tsc;
-        }
-        diff_tsc = cur_tsc - txq->tsc;
-        if (diff_tsc >= DRAIN_TSC) {
-            goto flush;
-        }
-        continue;
-
-    flush:
-        nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts,
-                                 txq->count);
-        if (nb_tx != txq->count) {
-            /* free buffers if we couldn't transmit packets */
-            rte_mempool_put_bulk(dev->dpdk_mp->mp,
-                                 (void **) &txq->burst_pkts[nb_tx],
-                                 (txq->count - nb_tx));
-        }
-        txq->count = 0;
+    /* If not being polled, do tx output. */
+    if (OVS_UNLIKELY(!tx_q->is_polled)) {
+        dpdk_push_tx_to_nic(dev, qid);
     }
-    rte_spinlock_unlock(&txq->tx_lock);
 }
 
 /* Tx function. Transmit packets indefinitely */
@@ -687,9 +728,14 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet 
** pkts, int cnt)
 
         mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
 
-        if (!mbufs[newcnt]) {
+        if (OVS_UNLIKELY(!mbufs[newcnt])) {
+            if (newcnt) {
+                dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE,
+                                mbufs, newcnt);
+            }
+
             ovs_mutex_lock(&dev->mutex);
-            dev->stats.tx_dropped++;
+            dev->stats.tx_dropped += cnt - newcnt;
             ovs_mutex_unlock(&dev->mutex);
             return;
         }
@@ -704,7 +750,6 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet 
** pkts, int cnt)
     }
 
     dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
-    dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
 }
 
 static int
@@ -1226,6 +1271,7 @@ static struct netdev_class netdev_dpdk_class = {
     netdev_dpdk_rxq_recv,
     NULL,                       /* rxq_wait */
     NULL,                       /* rxq_drain */
+    netdev_dpdk_set_poll,
 };
 
 int
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 8d1c298..3b0b276 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1092,6 +1092,7 @@ static const struct netdev_class dummy_class = {
     netdev_dummy_rxq_recv,
     netdev_dummy_rxq_wait,
     netdev_dummy_rxq_drain,
+    NULL,                       /* rxq_set_polling */
 };
 
 static struct ofpbuf *
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 1780639..efd499b 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2801,6 +2801,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum 
netdev_flags off,
     netdev_linux_rxq_recv,                                      \
     netdev_linux_rxq_wait,                                      \
     netdev_linux_rxq_drain,                                     \
+    NULL,                       /* rxq_set_polling */           \
 }
 
 const struct netdev_class netdev_linux_class =
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
old mode 100644
new mode 100755
index 6b8160d..7dd15ef
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -686,6 +686,12 @@ struct netdev_class {
 
     /* Discards all packets waiting to be received from 'rx'. */
     int (*rxq_drain)(struct netdev_rxq *rx);
+
+    /* Get poll ownership for PMD, enable before starting RX polling loop 
+     * and disable after exiting the polling loop. NULL if not supported. 
+     * This can be used for setting up operation that works with polling.
+     * i.e. Get ownership of TX interface and output to NIC before input */
+    void (*rxq_set_polling)(struct netdev_rxq *, bool);
 };
 
 int netdev_register_provider(const struct netdev_class *);
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 17adf94..1e59a8a 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -823,7 +823,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats 
*stats)
     NULL,                   /* rx_dealloc */                \
     NULL,                   /* rx_recv */                   \
     NULL,                   /* rx_wait */                   \
-    NULL,                   /* rx_drain */
+    NULL,                   /* rx_drain */                  \
+    NULL,                   /* rxq_set_polling */
 
 #define TUNNEL_CLASS(NAME, DPIF_PORT)                       \
     { DPIF_PORT,                                            \
diff --git a/lib/netdev.c b/lib/netdev.c
old mode 100644
new mode 100755
index 25edc16..a5680c7
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -650,6 +650,19 @@ netdev_rxq_drain(struct netdev_rxq *rx)
             : 0);
 }
 
+/* Before starting the PMD interface polling loop, this is called with
+ * enable set true to prepare for polling. On exit of polling loop this 
+ * should be called with false. This is used to setup for polling, i.e.
+ * get tx output lock and push output before each input. etc. */
+
+void
+netdev_rxq_set_polling(struct netdev_rxq *rx, bool enable)
+{
+    if (rx->netdev->netdev_class->rxq_set_polling) {
+        rx->netdev->netdev_class->rxq_set_polling(rx, enable);
+    }
+}
+
 /* Sends 'buffers' on 'netdev'.  Returns 0 if successful (for every packet),
  * otherwise a positive errno value.  Returns EAGAIN without blocking if
  * at least one the packets cannot be queued immediately.  Returns EMSGSIZE
diff --git a/lib/netdev.h b/lib/netdev.h
old mode 100644
new mode 100755
index 53415b2..dc0a283
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -43,6 +43,7 @@ extern "C" {
  *    netdev_rxq_recv()
  *    netdev_rxq_wait()
  *    netdev_rxq_drain()
+ *    netdev_rxq_set_polling()
  *
  *      These functions are conditionally thread-safe: they may be called from
  *      different threads only on different netdev_rxq objects.  (The client 
may
@@ -171,6 +172,7 @@ int netdev_rxq_recv(struct netdev_rxq *rx, struct 
dpif_packet **buffers,
                     int *cnt);
 void netdev_rxq_wait(struct netdev_rxq *);
 int netdev_rxq_drain(struct netdev_rxq *);
+void netdev_rxq_set_polling(struct netdev_rxq *, bool);
 
 /* Packet transmission. */
 int netdev_send(struct netdev *, struct dpif_packet **, int cnt,
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to