Concurrent enqueue is an important performance optimization when the number
of cores used for switching is different than the number of vhost queues.
I've observed a 20% performance improvement compared to a strategy that
binds queues to cores.

The atomic cmpset is only executed when the application calls
rte_vhost_enqueue_burst_mp. Benchmarks show no performance impact
when not using concurrent enqueue.

Mergeable RX buffers aren't supported by concurrent enqueue to minimize
code complexity.

Partially reverts 39449e74 ("vhost: remove concurrent enqueue") and
includes a fix from "vhost: avoid reordering of used->idx and last_used_idx
updating".

Signed-off-by: Rich Lane <rich.lane at bigswitch.com>
---
 lib/librte_vhost/rte_vhost_version.map |  6 +++
 lib/librte_vhost/rte_virtio_net.h      | 19 +++++++++
 lib/librte_vhost/vhost-net.h           |  2 +
 lib/librte_vhost/vhost_rxtx.c          | 77 ++++++++++++++++++++++++++++++----
 lib/librte_vhost/virtio-net.c          |  2 +
 5 files changed, 97 insertions(+), 9 deletions(-)

diff --git a/lib/librte_vhost/rte_vhost_version.map 
b/lib/librte_vhost/rte_vhost_version.map
index 5ceaa8a..ca9d49e 100644
--- a/lib/librte_vhost/rte_vhost_version.map
+++ b/lib/librte_vhost/rte_vhost_version.map
@@ -30,3 +30,9 @@ DPDK_16.07 {
        rte_vhost_get_queue_num;

 } DPDK_2.1;
+
+DPDK_16.11 {
+       global:
+
+       rte_vhost_enqueue_burst_mp
+} DPDK_16.07;
diff --git a/lib/librte_vhost/rte_virtio_net.h 
b/lib/librte_vhost/rte_virtio_net.h
index 9caa622..0f05917 100644
--- a/lib/librte_vhost/rte_virtio_net.h
+++ b/lib/librte_vhost/rte_virtio_net.h
@@ -175,6 +175,25 @@ uint16_t rte_vhost_enqueue_burst(int vid, uint16_t 
queue_id,
        struct rte_mbuf **pkts, uint16_t count);

 /**
+ * This function adds buffers to the virtio devices RX virtqueue. Buffers can
+ * be received from the physical port or from another virtual device. A packet
+ * count is returned to indicate the number of packets that were successfully
+ * added to the RX queue. This version is multi-producer safe.
+ * @param vid
+ *  virtio-net device ID
+ * @param queue_id
+ *  virtio queue index in mq case
+ * @param pkts
+ *  array to contain packets to be enqueued
+ * @param count
+ *  packets num to be enqueued
+ * @return
+ *  num of packets enqueued
+ */
+uint16_t rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+       struct rte_mbuf **pkts, uint16_t count);
+
+/**
  * This function gets guest buffers from the virtio device TX virtqueue,
  * construct host mbufs, copies guest buffer content to host mbufs and
  * store them in pkts to be processed.
diff --git a/lib/librte_vhost/vhost-net.h b/lib/librte_vhost/vhost-net.h
index 38593a2..ea9c377 100644
--- a/lib/librte_vhost/vhost-net.h
+++ b/lib/librte_vhost/vhost-net.h
@@ -72,6 +72,8 @@ struct vhost_virtqueue {

        /* Last index used on the available ring */
        volatile uint16_t       last_used_idx;
+       /* Used for multiple devices reserving buffers */
+       volatile uint16_t       last_used_idx_res;
 #define VIRTIO_INVALID_EVENTFD         (-1)
 #define VIRTIO_UNINITIALIZED_EVENTFD   (-2)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index 08a73fd..eea810e 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -212,6 +212,47 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct 
vhost_virtqueue *vq,
        return 0;
 }

+/*
+ * As many data cores may want to access available buffers
+ * they need to be reserved.
+ */
+static inline uint32_t
+reserve_avail_buf(struct vhost_virtqueue *vq, uint32_t count, bool mp,
+                 uint16_t *start)
+{
+       uint16_t res_start_idx;
+       uint16_t avail_idx;
+       uint16_t free_entries;
+       int success;
+
+       count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+
+again:
+       res_start_idx = mp ? vq->last_used_idx_res : vq->last_used_idx;
+       avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+
+       free_entries = avail_idx - res_start_idx;
+       count = RTE_MIN(count, free_entries);
+       if (count == 0)
+               return 0;
+
+       if (mp) {
+               uint16_t res_end_idx = res_start_idx + count;
+
+               /*
+               * update vq->last_used_idx_res atomically; try again if failed.
+               */
+               success = rte_atomic16_cmpset(&vq->last_used_idx_res,
+                                       res_start_idx, res_end_idx);
+               if (unlikely(!success))
+                       goto again;
+       }
+
+       *start = res_start_idx;
+
+       return count;
+}
+
 /**
  * This function adds buffers to the virtio devices RX virtqueue. Buffers can
  * be received from the physical port or from another virtio device. A packet
@@ -221,10 +262,10 @@ copy_mbuf_to_desc(struct virtio_net *dev, struct 
vhost_virtqueue *vq,
  */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
-             struct rte_mbuf **pkts, uint32_t count)
+             struct rte_mbuf **pkts, uint32_t count, bool mp)
 {
        struct vhost_virtqueue *vq;
-       uint16_t avail_idx, free_entries, start_idx;
+       uint16_t start_idx;
        uint16_t desc_indexes[MAX_PKT_BURST];
        uint16_t used_idx;
        uint32_t i;
@@ -240,11 +281,7 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        if (unlikely(vq->enabled == 0))
                return 0;

-       avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-       start_idx = vq->last_used_idx;
-       free_entries = avail_idx - start_idx;
-       count = RTE_MIN(count, free_entries);
-       count = RTE_MIN(count, (uint32_t)MAX_PKT_BURST);
+       count = reserve_avail_buf(vq, count, mp, &start_idx);
        if (count == 0)
                return 0;

@@ -284,8 +321,16 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,

        rte_smp_wmb();

+       if (mp) {
+               /*
+                * Wait until it's our turn to add our buffer to the
+                * used ring.
+                */
+               while (unlikely(vq->last_used_idx != start_idx))
+                       rte_pause();
+       }
+
        *(volatile uint16_t *)&vq->used->idx += count;
-       vq->last_used_idx += count;
        vhost_log_used_vring(dev, vq,
                offsetof(struct vring_used, idx),
                sizeof(vq->used->idx));
@@ -293,6 +338,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        /* flush used->idx update before we read avail->flags. */
        rte_mb();

+       vq->last_used_idx = start_idx + count;
+
        /* Kick the guest if necessary. */
        if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
                        && (vq->callfd >= 0))
@@ -545,7 +592,19 @@ rte_vhost_enqueue_burst(int vid, uint16_t queue_id,
        if (dev->features & (1 << VIRTIO_NET_F_MRG_RXBUF))
                return virtio_dev_merge_rx(dev, queue_id, pkts, count);
        else
-               return virtio_dev_rx(dev, queue_id, pkts, count);
+               return virtio_dev_rx(dev, queue_id, pkts, count, false);
+}
+
+uint16_t
+rte_vhost_enqueue_burst_mp(int vid, uint16_t queue_id,
+       struct rte_mbuf **pkts, uint16_t count)
+{
+       struct virtio_net *dev = get_device(vid);
+
+       if (!dev)
+               return 0;
+
+       return virtio_dev_rx(dev, queue_id, pkts, count, true);
 }

 static void
diff --git a/lib/librte_vhost/virtio-net.c b/lib/librte_vhost/virtio-net.c
index 1785695..9fc04ff 100644
--- a/lib/librte_vhost/virtio-net.c
+++ b/lib/librte_vhost/virtio-net.c
@@ -567,6 +567,7 @@ vhost_set_vring_addr(int vid, struct vhost_vring_addr *addr)
                        "some packets maybe resent for Tx and dropped for Rx\n",
                        vq->last_used_idx, vq->used->idx);
                vq->last_used_idx     = vq->used->idx;
+               vq->last_used_idx_res = vq->used->idx;
        }

        vq->log_guest_addr = addr->log_guest_addr;
@@ -598,6 +599,7 @@ vhost_set_vring_base(int vid, struct vhost_vring_state 
*state)

        /* State->index refers to the queue index. The txq is 1, rxq is 0. */
        dev->virtqueue[state->index]->last_used_idx = state->num;
+       dev->virtqueue[state->index]->last_used_idx_res = state->num;

        return 0;
 }
-- 
1.9.1

Reply via email to