This commit adds the intermediate queue for vHost-user ports. It
improves the throughput in multiple virtual machines deployments and
also in cases with VM doing packet forwarding in kernel stack.

This patch is aligned with intermediate queue implementation for dpdk
ports that can be found here: https://patchwork.ozlabs.org/patch/723309/

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com>
Signed-off-by: Antonio Fischetti <antonio.fische...@intel.com>
Co-authored-by: Antonio Fischetti <antonio.fische...@intel.com>
---
- Limited testing is done with this RFC patch, test scenarios includes
  VM doing ip forwarding(Linux stack) and running testpmd in the guest.
- Sanity testing is done with multiple VMs to check for any locking/crashes.
- Much of the testing is done with single queue, and very basic testing with MQ.
- No throughput/latency tests are done at this point.

TODO:
- Retry logic in 'netdev_dpdk_vhost_tx_burst' should be handled appropriately to
  lessen the throughput impact when multiple vHost-user port serviced by same 
PMD.
  An option could be to allow configurable 'retries' option and the default 
being
  no retries. During testing it was found that the second retry couldn't add a 
single
  packet to RX queue most of the times with ip forwarding in kernel stack*.

 lib/dpif-netdev.c     |  51 +++++++++++++++++++++-
 lib/netdev-dpdk.c     | 117 ++++++++++++++++++++++++++++++++++++++------------
 lib/netdev-dummy.c    |   1 +
 lib/netdev-linux.c    |   1 +
 lib/netdev-provider.h |   3 ++
 lib/netdev-vport.c    |   3 +-
 lib/netdev.c          |   9 ++++
 lib/netdev.h          |   1 +
 8 files changed, 156 insertions(+), 30 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index a14a2eb..4710985 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -344,6 +344,8 @@ struct dp_netdev_rxq {
     struct dp_netdev_pmd_thread *pmd;  /* pmd thread that will poll this 
queue. */
 };
 
+#define LAST_USED_QID_NONE -1
+
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     odp_port_t port_no;
@@ -494,6 +496,8 @@ struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    int last_used_qid;    /* Last queue id where packets could be
+                             enqueued. */
 };
 
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
@@ -3033,6 +3037,26 @@ cycles_count_end(struct dp_netdev_pmd_thread *pmd,
 }
 
 static void
+dp_netdev_drain_txq_ports(struct dp_netdev_pmd_thread *pmd)
+{
+    struct tx_port *cached_tx_port;
+    int tx_qid;
+
+    HMAP_FOR_EACH (cached_tx_port, node, &pmd->send_port_cache) {
+        tx_qid = cached_tx_port->last_used_qid;
+
+        if (tx_qid != LAST_USED_QID_NONE) {
+            netdev_txq_drain(cached_tx_port->port->netdev, tx_qid,
+                         cached_tx_port->port->dynamic_txqs);
+
+            /* Queue drained and mark it empty. */
+            cached_tx_port->last_used_qid = LAST_USED_QID_NONE;
+        }
+    }
+}
+
+
+static void
 dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct netdev_rxq *rx,
                            odp_port_t port_no)
@@ -3647,15 +3671,18 @@ pmd_load_queues_and_ports(struct dp_netdev_pmd_thread 
*pmd,
     return i;
 }
 
+enum { DRAIN_TSC = 20000ULL };
+
 static void *
 pmd_thread_main(void *f_)
 {
     struct dp_netdev_pmd_thread *pmd = f_;
-    unsigned int lc = 0;
+    unsigned int lc = 0, lc_drain = 0;
     struct polled_queue *poll_list;
     bool exiting;
     int poll_cnt;
     int i;
+    uint64_t prev = 0, now = 0;
 
     poll_list = NULL;
 
@@ -3688,6 +3715,17 @@ reload:
                                        poll_list[i].port_no);
         }
 
+#define MAX_LOOP_TO_DRAIN 128
+        if (lc_drain++ > MAX_LOOP_TO_DRAIN) {
+            lc_drain = 0;
+            prev = now;
+            now = pmd->last_cycles;
+
+            if ((now - prev) > DRAIN_TSC) {
+                dp_netdev_drain_txq_ports(pmd);
+            }
+        }
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4330,6 +4368,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread 
*pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->last_used_qid = LAST_USED_QID_NONE;
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
     pmd->need_reload = true;
@@ -4892,6 +4931,14 @@ dpif_netdev_xps_get_tx_qid(const struct 
dp_netdev_pmd_thread *pmd,
 
     dpif_netdev_xps_revalidate_pmd(pmd, now, false);
 
+    /* The tx queue can change in XPS case, make sure packets in previous
+     * queue is drained properly. */
+    if (tx->last_used_qid != LAST_USED_QID_NONE &&
+               tx->qid != tx->last_used_qid) {
+        netdev_txq_drain(port->netdev, tx->last_used_qid, port->dynamic_txqs);
+        tx->last_used_qid = LAST_USED_QID_NONE;
+    }
+
     VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
              pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
     return min_qid;
@@ -4987,6 +5034,8 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
*packets_,
                 tx_qid = pmd->static_tx_qid;
             }
 
+            p->last_used_qid = tx_qid;
+
             netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
                         dynamic_txqs);
             return;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index ddc651b..26cfa85 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -286,6 +286,11 @@ struct dpdk_mp {
     struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
 };
 
+/* Queue 'INTERIM_QUEUE_BURST_THRESHOLD' packets before tranmitting.
+ * Defaults to 'NETDEV_MAX_BURST'(32) now.
+ */
+#define INTERIM_QUEUE_BURST_THRESHOLD NETDEV_MAX_BURST
+
 /* There should be one 'struct dpdk_tx_queue' created for
  * each cpu core. */
 struct dpdk_tx_queue {
@@ -295,6 +300,11 @@ struct dpdk_tx_queue {
                                     * pmd threads (see 'concurrent_txq'). */
     int map;                       /* Mapping of configured vhost-user queues
                                     * to enabled by guest. */
+    struct dp_packet *pkts[INTERIM_QUEUE_BURST_THRESHOLD];
+                                   /* Intermediate queue where packets can
+                                    * be buffered for vhost ports */
+    int pkt_cnt;                   /* Number of packets waiting to be sent on
+                                    * vhost port */
 };
 
 /* dpdk has no way to remove dpdk ring ethernet devices
@@ -1666,6 +1676,61 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_stats 
*stats,
     }
 }
 
+static int
+netdev_dpdk_vhost_tx_burst(struct netdev_dpdk *dev, int qid,
+                           int dropped)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+    struct rte_mbuf **cur_pkts = (struct rte_mbuf **)txq->pkts;
+
+    int tx_vid = netdev_dpdk_get_vid(dev);
+    int tx_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+    uint32_t sent=0;
+    uint32_t retries=0;
+    uint32_t sum=0;
+    uint32_t total_pkts = 0;
+
+    total_pkts = sum = txq->pkt_cnt;
+    do {
+        uint32_t ret;
+        ret = rte_vhost_enqueue_burst(tx_vid, tx_qid, &cur_pkts[sent], sum);
+        if (!ret) {
+            /* No packets enqueued - do not retry. */
+            break;
+        } else {
+            /* Packet have been sent */
+            sent += ret;
+
+            /* 'sum; packet have to be retransmitted */
+            sum -= ret;
+        }
+    } while (sum && (retries++ < VHOST_ENQ_RETRY_NUM));
+
+    for (int i=0; i < total_pkts - dropped; i++) {
+        dp_packet_delete(txq->pkts[i]);
+    }
+
+    /* Reset pkt count */
+    txq->pkt_cnt = 0;
+
+    /* 'sum' refers to packets dropped */
+    return sum;
+}
+
+static int
+netdev_dpdk_vhost_txq_drain(struct netdev *netdev, int qid,
+                            bool concurrent_txq OVS_UNUSED)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    if (OVS_LIKELY(txq->pkt_cnt)) {
+        netdev_dpdk_vhost_tx_burst(dev, qid, 0);
+    }
+
+    return 0;
+}
+
 static void
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
@@ -1674,16 +1739,20 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts;
     unsigned int total_pkts = cnt;
     unsigned int dropped = 0;
-    int i, retries = 0;
+    int i;
 
     qid = dev->tx_q[qid % netdev->n_txq].map;
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
 
     if (OVS_UNLIKELY(!is_vhost_running(dev) || qid < 0
                      || !(dev->flags & NETDEV_UP))) {
         rte_spinlock_lock(&dev->stats_lock);
         dev->stats.tx_dropped+= cnt;
         rte_spinlock_unlock(&dev->stats_lock);
-        goto out;
+
+        for (i = 0; i < total_pkts; i++) {
+            dp_packet_delete(pkts[i]);
+        }
     }
 
     rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
@@ -1693,34 +1762,21 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt);
     dropped = total_pkts - cnt;
 
-    do {
-        int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
-        unsigned int tx_pkts;
-
-        tx_pkts = rte_vhost_enqueue_burst(netdev_dpdk_get_vid(dev),
-                                          vhost_qid, cur_pkts, cnt);
-        if (OVS_LIKELY(tx_pkts)) {
-            /* Packets have been sent.*/
-            cnt -= tx_pkts;
-            /* Prepare for possible retry.*/
-            cur_pkts = &cur_pkts[tx_pkts];
-        } else {
-            /* No packets sent - do not retry.*/
-            break;
+    int idx = 0;
+    while (idx < cnt) {
+        txq->pkts[txq->pkt_cnt++] = pkts[idx++];
+
+        if (txq->pkt_cnt >= INTERIM_QUEUE_BURST_THRESHOLD) {
+            dropped += netdev_dpdk_vhost_tx_burst(dev, qid, dropped);
         }
-    } while (cnt && (retries++ <= VHOST_ENQ_RETRY_NUM));
+    }
 
     rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
 
     rte_spinlock_lock(&dev->stats_lock);
     netdev_dpdk_vhost_update_tx_counters(&dev->stats, pkts, total_pkts,
-                                         cnt + dropped);
+                                         dropped);
     rte_spinlock_unlock(&dev->stats_lock);
-
-out:
-    for (i = 0; i < total_pkts - dropped; i++) {
-        dp_packet_delete(pkts[i]);
-    }
 }
 
 /* Tx function. Transmit packets indefinitely */
@@ -3247,7 +3303,7 @@ unlock:
                           SET_CONFIG, SET_TX_MULTIQ, SEND,    \
                           GET_CARRIER, GET_STATS,             \
                           GET_FEATURES, GET_STATUS,           \
-                          RECONFIGURE, RXQ_RECV)              \
+                          RECONFIGURE, RXQ_RECV, TXQ_DRAIN)   \
 {                                                             \
     NAME,                                                     \
     true,                       /* is_pmd */                  \
@@ -3314,6 +3370,7 @@ unlock:
     RXQ_RECV,                                                 \
     NULL,                       /* rx_wait */                 \
     NULL,                       /* rxq_drain */               \
+    TXQ_DRAIN,                  /* txq_drain */               \
 }
 
 static const struct netdev_class dpdk_class =
@@ -3330,7 +3387,8 @@ static const struct netdev_class dpdk_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        NULL);
 
 static const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
@@ -3346,7 +3404,8 @@ static const struct netdev_class dpdk_ring_class =
         netdev_dpdk_get_features,
         netdev_dpdk_get_status,
         netdev_dpdk_reconfigure,
-        netdev_dpdk_rxq_recv);
+        netdev_dpdk_rxq_recv,
+        NULL);
 
 static const struct netdev_class dpdk_vhost_class =
     NETDEV_DPDK_CLASS(
@@ -3362,7 +3421,8 @@ static const struct netdev_class dpdk_vhost_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        netdev_dpdk_vhost_txq_drain);
 static const struct netdev_class dpdk_vhost_client_class =
     NETDEV_DPDK_CLASS(
         "dpdkvhostuserclient",
@@ -3377,7 +3437,8 @@ static const struct netdev_class dpdk_vhost_client_class =
         NULL,
         NULL,
         netdev_dpdk_vhost_client_reconfigure,
-        netdev_dpdk_vhost_rxq_recv);
+        netdev_dpdk_vhost_rxq_recv,
+        netdev_dpdk_vhost_txq_drain);
 
 void
 netdev_dpdk_register(void)
diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
index 0657434..4ef659e 100644
--- a/lib/netdev-dummy.c
+++ b/lib/netdev-dummy.c
@@ -1409,6 +1409,7 @@ netdev_dummy_update_flags(struct netdev *netdev_,
     netdev_dummy_rxq_recv,                                      \
     netdev_dummy_rxq_wait,                                      \
     netdev_dummy_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 static const struct netdev_class dummy_class =
diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
index 9ff1333..79478ee 100644
--- a/lib/netdev-linux.c
+++ b/lib/netdev-linux.c
@@ -2830,6 +2830,7 @@ netdev_linux_update_flags(struct netdev *netdev_, enum 
netdev_flags off,
     netdev_linux_rxq_recv,                                      \
     netdev_linux_rxq_wait,                                      \
     netdev_linux_rxq_drain,                                     \
+    NULL,                                                       \
 }
 
 const struct netdev_class netdev_linux_class =
diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
index 8346fc4..5dd68db 100644
--- a/lib/netdev-provider.h
+++ b/lib/netdev-provider.h
@@ -769,6 +769,9 @@ struct netdev_class {
 
     /* Discards all packets waiting to be received from 'rx'. */
     int (*rxq_drain)(struct netdev_rxq *rx);
+
+    /* Drain all packets waiting to be sent on queue 'qid'. */
+    int (*txq_drain)(struct netdev *netdev, int qid, bool concurrent_txq);
 };
 
 int netdev_register_provider(const struct netdev_class *);
diff --git a/lib/netdev-vport.c b/lib/netdev-vport.c
index 39093e8..eb4b7d2 100644
--- a/lib/netdev-vport.c
+++ b/lib/netdev-vport.c
@@ -847,7 +847,8 @@ get_stats(const struct netdev *netdev, struct netdev_stats 
*stats)
     NULL,                   /* rx_dealloc */                \
     NULL,                   /* rx_recv */                   \
     NULL,                   /* rx_wait */                   \
-    NULL,                   /* rx_drain */
+    NULL,                   /* rx_drain */                  \
+    NULL,                   /* tx_drain */
 
 
 #define TUNNEL_CLASS(NAME, DPIF_PORT, BUILD_HEADER, PUSH_HEADER, POP_HEADER)   
\
diff --git a/lib/netdev.c b/lib/netdev.c
index a8d8eda..b486b5d 100644
--- a/lib/netdev.c
+++ b/lib/netdev.c
@@ -678,6 +678,15 @@ netdev_rxq_drain(struct netdev_rxq *rx)
             : 0);
 }
 
+/* Flush packets on the queue 'qid'. */
+int
+netdev_txq_drain(struct netdev *netdev, int qid, bool netdev_txq_drain)
+{
+    return (netdev->netdev_class->txq_drain
+            ? netdev->netdev_class->txq_drain(netdev, qid, netdev_txq_drain)
+            : EOPNOTSUPP);
+}
+
 /* Configures the number of tx queues of 'netdev'. Returns 0 if successful,
  * otherwise a positive errno value.
  *
diff --git a/lib/netdev.h b/lib/netdev.h
index d6c07c1..7ddd790 100644
--- a/lib/netdev.h
+++ b/lib/netdev.h
@@ -155,6 +155,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
 int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
                 bool may_steal, bool concurrent_txq);
 void netdev_send_wait(struct netdev *, int qid);
+int netdev_txq_drain(struct netdev *, int qid, bool concurrent_txq);
 
 /* native tunnel APIs */
 /* Structure to pass parameters required to build a tunnel header. */
-- 
2.4.11

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to