This patch removes the internal lockless enqueue implmentation.
DPDK doesn't support receiving/transmitting packets from/to the same
queue. Vhost PMD wraps vhost device as normal DPDK port. DPDK
applications normally have their own lock implmentation when enqueue
packets to the same queue of a port.

The atomic cmpset is a costly operation. This patch should help
performance a bit.

Signed-off-by: Huawei Xie <huawei.xie at intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 86 +++++++++++++------------------------------
 1 file changed, 25 insertions(+), 61 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index bbf3fac..26a1b9c 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -69,10 +69,8 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        uint64_t buff_hdr_addr = 0;
        uint32_t head[MAX_PKT_BURST];
        uint32_t head_idx, packet_success = 0;
-       uint16_t avail_idx, res_cur_idx;
-       uint16_t res_base_idx, res_end_idx;
+       uint16_t avail_idx, res_cur_idx, res_end_idx;
        uint16_t free_entries;
-       uint8_t success = 0;

        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_rx()\n", dev->device_fh);
        if (unlikely(!is_valid_virt_queue_idx(queue_id, 0, dev->virt_qp_nb))) {
@@ -88,29 +86,18 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,

        count = (count > MAX_PKT_BURST) ? MAX_PKT_BURST : count;

-       /*
-        * As many data cores may want access to available buffers,
-        * they need to be reserved.
-        */
-       do {
-               res_base_idx = vq->last_used_idx_res;
-               avail_idx = *((volatile uint16_t *)&vq->avail->idx);
-
-               free_entries = (avail_idx - res_base_idx);
-               /*check that we have enough buffers*/
-               if (unlikely(count > free_entries))
-                       count = free_entries;
-
-               if (count == 0)
-                       return 0;
-
-               res_end_idx = res_base_idx + count;
-               /* vq->last_used_idx_res is atomically updated. */
-               /* TODO: Allow to disable cmpset if no concurrency in 
application. */
-               success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-                               res_base_idx, res_end_idx);
-       } while (unlikely(success == 0));
-       res_cur_idx = res_base_idx;
+       avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+       free_entries = (avail_idx - vq->last_used_idx_res);
+       /*check that we have enough buffers*/
+       if (unlikely(count > free_entries))
+               count = free_entries;
+       if (count == 0)
+               return 0;
+
+       res_cur_idx = vq->last_used_idx_res;
+       res_end_idx = res_cur_idx + count;
+       vq->last_used_idx_res = res_end_idx;
+
        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| End Index %d\n",
                        dev->device_fh, res_cur_idx, res_end_idx);

@@ -230,10 +217,6 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,

        rte_compiler_barrier();

-       /* Wait until it's our turn to add our buffer to the used ring. */
-       while (unlikely(vq->last_used_idx != res_base_idx))
-               rte_pause();
-
        *(volatile uint16_t *)&vq->used->idx += count;
        vq->last_used_idx = res_end_idx;

@@ -474,7 +457,6 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t 
queue_id,
        uint32_t pkt_idx = 0, entry_success = 0;
        uint16_t avail_idx;
        uint16_t res_base_idx, res_cur_idx;
-       uint8_t success = 0;

        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
                dev->device_fh);
@@ -496,46 +478,28 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t 
queue_id,

        for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
                uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;
+               uint32_t secure_len = 0;
+               uint32_t vec_idx = 0;

-               do {
-                       /*
-                        * As many data cores may want access to available
-                        * buffers, they need to be reserved.
-                        */
-                       uint32_t secure_len = 0;
-                       uint32_t vec_idx = 0;
-
-                       res_base_idx = vq->last_used_idx_res;
-                       res_cur_idx = res_base_idx;
+               res_base_idx = res_cur_idx = vq->last_used_idx_res;

-                       do {
-                               avail_idx = *((volatile uint16_t 
*)&vq->avail->idx);
-                               if (unlikely(res_cur_idx == avail_idx))
-                                       goto merge_rx_exit;
+               do {
+                       avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+                       if (unlikely(res_cur_idx == avail_idx))
+                               goto merge_rx_exit;

-                               update_secure_len(vq, res_cur_idx,
-                                                 &secure_len, &vec_idx);
-                               res_cur_idx++;
-                       } while (pkt_len > secure_len);
+                       update_secure_len(vq, res_cur_idx,
+                                       &secure_len, &vec_idx);
+                       res_cur_idx++;
+               } while (pkt_len > secure_len);

-                       /* vq->last_used_idx_res is atomically updated. */
-                       success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-                                                       res_base_idx,
-                                                       res_cur_idx);
-               } while (success == 0);
+               vq->last_used_idx_res = res_cur_idx;

                entry_success = copy_from_mbuf_to_vring(dev, queue_id,
                        res_base_idx, res_cur_idx, pkts[pkt_idx]);

                rte_compiler_barrier();

-               /*
-                * Wait until it's our turn to add our buffer
-                * to the used ring.
-                */
-               while (unlikely(vq->last_used_idx != res_base_idx))
-                       rte_pause();
-
                *(volatile uint16_t *)&vq->used->idx += entry_success;
                vq->last_used_idx = res_cur_idx;
        }
-- 
1.8.1.4

Reply via email to