Current virtio_dev_merge_rx() implementation just looks like the
old rte_vhost_dequeue_burst(), full of twisted logic, that you
can see same code block in quite many different places.

However, the logic of virtio_dev_merge_rx() is quite similar to
virtio_dev_rx().  The big difference is that the mergeable one
could allocate more than one available entries to hold the data.
Fetching all available entries to vec_buf at once makes the
difference a bit bigger then.

Anyway, it could be simpler, just like what we did for virtio_dev_rx().
One difference is that we need to update used ring properly.

The pseudo code looks like below:

        while (1) {
                if (this_desc_has_no_room) {
                        this_desc = fetch_next_from_vec_buf();

                        if (it is the last of a desc chain) {
                                update_used_ring();
                        }
                }

                if (this_mbuf_has_drained_totally) {
                        this_mbuf = fetch_next_mbuf();

                        if (this_mbuf == NULL)
                                break;
                }

                COPY(this_desc, this_mbuf);
        }

This patch reduces quite many lines of code, therefore, make it much
more readable.

Signed-off-by: Yuanhan Liu <yuanhan.liu at linux.intel.com>
---
 lib/librte_vhost/vhost_rxtx.c | 390 ++++++++++++++++++------------------------
 1 file changed, 163 insertions(+), 227 deletions(-)

diff --git a/lib/librte_vhost/vhost_rxtx.c b/lib/librte_vhost/vhost_rxtx.c
index d3775ad..3909584 100644
--- a/lib/librte_vhost/vhost_rxtx.c
+++ b/lib/librte_vhost/vhost_rxtx.c
@@ -280,237 +280,200 @@ virtio_dev_rx(struct virtio_net *dev, uint16_t queue_id,
        return count;
 }

-static inline uint32_t __attribute__((always_inline))
-copy_from_mbuf_to_vring(struct virtio_net *dev, uint32_t queue_id,
-                       uint16_t res_base_idx, uint16_t res_end_idx,
-                       struct rte_mbuf *pkt)
+static inline int
+fill_vec_buf(struct vhost_virtqueue *vq, uint32_t avail_idx,
+            uint32_t *allocated, uint32_t *vec_idx)
 {
-       uint32_t vec_idx = 0;
-       uint32_t entry_success = 0;
-       struct vhost_virtqueue *vq;
-       /* The virtio_hdr is initialised to 0. */
-       struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {
-               {0, 0, 0, 0, 0, 0}, 0};
-       uint16_t cur_idx = res_base_idx;
-       uint64_t vb_addr = 0;
-       uint64_t vb_hdr_addr = 0;
-       uint32_t seg_offset = 0;
-       uint32_t vb_offset = 0;
-       uint32_t seg_avail;
-       uint32_t vb_avail;
-       uint32_t cpy_len, entry_len;
-
-       if (pkt == NULL)
-               return 0;
+       uint16_t idx = vq->avail->ring[avail_idx & (vq->size - 1)];
+       uint32_t vec_id = *vec_idx;
+       uint32_t len    = *allocated;

-       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Current Index %d| "
-               "End Index %d\n",
-               dev->device_fh, cur_idx, res_end_idx);
+       while (1) {
+               if (vec_id >= BUF_VECTOR_MAX)
+                       return -1;

-       /*
-        * Convert from gpa to vva
-        * (guest physical addr -> vhost virtual addr)
-        */
-       vq = dev->virtqueue[queue_id];
+               len += vq->desc[idx].len;
+               vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
+               vq->buf_vec[vec_id].buf_len  = vq->desc[idx].len;
+               vq->buf_vec[vec_id].desc_idx = idx;
+               vec_id++;

-       vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
-       vb_hdr_addr = vb_addr;
+               if ((vq->desc[idx].flags & VRING_DESC_F_NEXT) == 0)
+                       break;

-       /* Prefetch buffer address. */
-       rte_prefetch0((void *)(uintptr_t)vb_addr);
+               idx = vq->desc[idx].next;
+       }

-       virtio_hdr.num_buffers = res_end_idx - res_base_idx;
+       *allocated = len;
+       *vec_idx   = vec_id;

-       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
-               dev->device_fh, virtio_hdr.num_buffers);
+       return 0;
+}

-       virtio_enqueue_offload(pkt, &virtio_hdr.hdr);
+/*
+ * As many data cores may want to access available buffers concurrently,
+ * they need to be reserved.
+ *
+ * Returns -1 on fail, 0 on success
+ */
+static inline int
+reserve_avail_buf_mergeable(struct vhost_virtqueue *vq, uint32_t size,
+                           uint16_t *start, uint16_t *end)
+{
+       uint16_t res_start_idx;
+       uint16_t res_cur_idx;
+       uint16_t avail_idx;
+       uint32_t allocated;
+       uint32_t vec_idx;
+       uint16_t tries;

-       rte_memcpy((void *)(uintptr_t)vb_hdr_addr,
-               (const void *)&virtio_hdr, vq->vhost_hlen);
+again:
+       res_start_idx = vq->last_used_idx_res;
+       res_cur_idx  = res_start_idx;

-       PRINT_PACKET(dev, (uintptr_t)vb_hdr_addr, vq->vhost_hlen, 1);
+       allocated = 0;
+       vec_idx   = 0;
+       tries     = 0;
+       while (1) {
+               avail_idx = *((volatile uint16_t *)&vq->avail->idx);
+               if (unlikely(res_cur_idx == avail_idx)) {
+                       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") Failed "
+                               "to get enough desc from vring\n",
+                               dev->device_fh);
+                       return -1;
+               }

-       seg_avail = rte_pktmbuf_data_len(pkt);
-       vb_offset = vq->vhost_hlen;
-       vb_avail = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+               if (fill_vec_buf(vq, res_cur_idx, &allocated, &vec_idx) < 0)
+                       return -1;

-       entry_len = vq->vhost_hlen;
+               res_cur_idx++;
+               tries++;

-       if (vb_avail == 0) {
-               uint32_t desc_idx =
-                       vq->buf_vec[vec_idx].desc_idx;
+               if (allocated >= size)
+                       break;

-               if ((vq->desc[desc_idx].flags
-                       & VRING_DESC_F_NEXT) == 0) {
-                       /* Update used ring with desc information */
-                       vq->used->ring[cur_idx & (vq->size - 1)].id
-                               = vq->buf_vec[vec_idx].desc_idx;
-                       vq->used->ring[cur_idx & (vq->size - 1)].len
-                               = entry_len;
+               /*
+                * if we tried all available ring items, and still
+                * can't get enough buf, it means something abnormal
+                * happened.
+                */
+               if (tries >= vq->size)
+                       return -1;
+       }

-                       entry_len = 0;
-                       cur_idx++;
-                       entry_success++;
-               }
+       /*
+        * update vq->last_used_idx_res atomically.
+        * retry again if failed.
+        */
+       if (rte_atomic16_cmpset(&vq->last_used_idx_res,
+                               res_start_idx, res_cur_idx) == 0)
+               goto again;

-               vec_idx++;
-               vb_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+       *start = res_start_idx;
+       *end   = res_cur_idx;
+       return 0;
+}

-               /* Prefetch buffer address. */
-               rte_prefetch0((void *)(uintptr_t)vb_addr);
-               vb_offset = 0;
-               vb_avail = vq->buf_vec[vec_idx].buf_len;
-       }
+static inline uint32_t __attribute__((always_inline))
+copy_mbuf_to_desc_mergeable(struct virtio_net *dev, struct vhost_virtqueue *vq,
+                           uint16_t res_start_idx, uint16_t res_end_idx,
+                           struct rte_mbuf *m)
+{
+       struct virtio_net_hdr_mrg_rxbuf virtio_hdr = {{0, 0, 0, 0, 0, 0}, 0};
+       uint32_t vec_idx = 0;
+       uint16_t cur_idx = res_start_idx;
+       uint64_t desc_addr;
+       uint32_t mbuf_offset, mbuf_avail;
+       uint32_t desc_offset, desc_avail;
+       uint32_t cpy_len;
+       uint16_t desc_idx, used_idx;
+       uint32_t nr_used = 0;

-       cpy_len = RTE_MIN(vb_avail, seg_avail);
+       if (m == NULL)
+               return 0;

-       while (cpy_len > 0) {
-               /* Copy mbuf data to vring buffer */
-               rte_memcpy((void *)(uintptr_t)(vb_addr + vb_offset),
-                       rte_pktmbuf_mtod_offset(pkt, const void *, seg_offset),
-                       cpy_len);
+       LOG_DEBUG(VHOST_DATA,
+               "(%"PRIu64") Current Index %d| End Index %d\n",
+               dev->device_fh, cur_idx, res_end_idx);

-               PRINT_PACKET(dev,
-                       (uintptr_t)(vb_addr + vb_offset),
-                       cpy_len, 0);
+       desc_addr = gpa_to_vva(dev, vq->buf_vec[vec_idx].buf_addr);
+       rte_prefetch0((void *)(uintptr_t)desc_addr);
+
+       virtio_hdr.num_buffers = res_end_idx - res_start_idx;
+       LOG_DEBUG(VHOST_DATA, "(%"PRIu64") RX: Num merge buffers %d\n",
+               dev->device_fh, virtio_hdr.num_buffers);

-               seg_offset += cpy_len;
-               vb_offset += cpy_len;
-               seg_avail -= cpy_len;
-               vb_avail -= cpy_len;
-               entry_len += cpy_len;
-
-               if (seg_avail != 0) {
-                       /*
-                        * The virtio buffer in this vring
-                        * entry reach to its end.
-                        * But the segment doesn't complete.
-                        */
-                       if ((vq->desc[vq->buf_vec[vec_idx].desc_idx].flags &
-                               VRING_DESC_F_NEXT) == 0) {
+       virtio_enqueue_offload(m, &virtio_hdr.hdr);
+       rte_memcpy((void *)(uintptr_t)desc_addr,
+               (const void *)&virtio_hdr, vq->vhost_hlen);
+       PRINT_PACKET(dev, (uintptr_t)desc_addr, vq->vhost_hlen, 0);
+
+       desc_avail  = vq->buf_vec[vec_idx].buf_len - vq->vhost_hlen;
+       desc_offset = vq->vhost_hlen;
+
+       mbuf_avail  = rte_pktmbuf_data_len(m);
+       mbuf_offset = 0;
+       while (1) {
+               /* done with current desc buf, get the next one */
+               if (desc_avail == 0) {
+                       desc_idx = vq->buf_vec[vec_idx].desc_idx;
+
+                       if ((vq->desc[desc_idx].flags & VRING_DESC_F_NEXT) == 
0) {
                                /* Update used ring with desc information */
-                               vq->used->ring[cur_idx & (vq->size - 1)].id
-                                       = vq->buf_vec[vec_idx].desc_idx;
-                               vq->used->ring[cur_idx & (vq->size - 1)].len
-                                       = entry_len;
-                               entry_len = 0;
-                               cur_idx++;
-                               entry_success++;
+                               used_idx = cur_idx++ & (vq->size - 1);
+                               vq->used->ring[used_idx].id  = desc_idx;
+                               vq->used->ring[used_idx].len = desc_offset;
+
+                               nr_used++;
                        }

                        vec_idx++;
-                       vb_addr = gpa_to_vva(dev,
-                               vq->buf_vec[vec_idx].buf_addr);
-                       vb_offset = 0;
-                       vb_avail = vq->buf_vec[vec_idx].buf_len;
-                       cpy_len = RTE_MIN(vb_avail, seg_avail);
-               } else {
-                       /*
-                        * This current segment complete, need continue to
-                        * check if the whole packet complete or not.
-                        */
-                       pkt = pkt->next;
-                       if (pkt != NULL) {
-                               /*
-                                * There are more segments.
-                                */
-                               if (vb_avail == 0) {
-                                       /*
-                                        * This current buffer from vring is
-                                        * used up, need fetch next buffer
-                                        * from buf_vec.
-                                        */
-                                       uint32_t desc_idx =
-                                               vq->buf_vec[vec_idx].desc_idx;
-
-                                       if ((vq->desc[desc_idx].flags &
-                                               VRING_DESC_F_NEXT) == 0) {
-                                               uint16_t wrapped_idx =
-                                                       cur_idx & (vq->size - 
1);
-                                               /*
-                                                * Update used ring with the
-                                                * descriptor information
-                                                */
-                                               vq->used->ring[wrapped_idx].id
-                                                       = desc_idx;
-                                               vq->used->ring[wrapped_idx].len
-                                                       = entry_len;
-                                               entry_success++;
-                                               entry_len = 0;
-                                               cur_idx++;
-                                       }
-
-                                       /* Get next buffer from buf_vec. */
-                                       vec_idx++;
-                                       vb_addr = gpa_to_vva(dev,
-                                               vq->buf_vec[vec_idx].buf_addr);
-                                       vb_avail =
-                                               vq->buf_vec[vec_idx].buf_len;
-                                       vb_offset = 0;
-                               }
-
-                               seg_offset = 0;
-                               seg_avail = rte_pktmbuf_data_len(pkt);
-                               cpy_len = RTE_MIN(vb_avail, seg_avail);
-                       } else {
-                               /*
-                                * This whole packet completes.
-                                */
-                               /* Update used ring with desc information */
-                               vq->used->ring[cur_idx & (vq->size - 1)].id
-                                       = vq->buf_vec[vec_idx].desc_idx;
-                               vq->used->ring[cur_idx & (vq->size - 1)].len
-                                       = entry_len;
-                               entry_success++;
-                               break;
-                       }
+                       desc_addr = gpa_to_vva(dev, 
vq->buf_vec[vec_idx].buf_addr);
+
+                       /* Prefetch buffer address. */
+                       rte_prefetch0((void *)(uintptr_t)desc_addr);
+                       desc_offset = 0;
+                       desc_avail  = vq->buf_vec[vec_idx].buf_len;
                }
-       }

-       return entry_success;
-}
+               /* done with current mbuf, get the next one */
+               if (mbuf_avail == 0) {
+                       m = m->next;
+                       if (!m)
+                               break;

-static inline void __attribute__((always_inline))
-update_secure_len(struct vhost_virtqueue *vq, uint32_t id,
-       uint32_t *secure_len, uint32_t *vec_idx)
-{
-       uint16_t wrapped_idx = id & (vq->size - 1);
-       uint32_t idx = vq->avail->ring[wrapped_idx];
-       uint8_t next_desc;
-       uint32_t len = *secure_len;
-       uint32_t vec_id = *vec_idx;
+                       mbuf_offset = 0;
+                       mbuf_avail  = rte_pktmbuf_data_len(m);
+               }

-       do {
-               next_desc = 0;
-               len += vq->desc[idx].len;
-               vq->buf_vec[vec_id].buf_addr = vq->desc[idx].addr;
-               vq->buf_vec[vec_id].buf_len = vq->desc[idx].len;
-               vq->buf_vec[vec_id].desc_idx = idx;
-               vec_id++;
+               cpy_len = RTE_MIN(desc_avail, mbuf_avail);
+               rte_memcpy((void *)((uintptr_t)(desc_addr + desc_offset)),
+                       rte_pktmbuf_mtod_offset(m, void *, mbuf_offset),
+                       cpy_len);
+               PRINT_PACKET(dev, (uintptr_t)(desc_addr + desc_offset),
+                       cpy_len, 0);

-               if (vq->desc[idx].flags & VRING_DESC_F_NEXT) {
-                       idx = vq->desc[idx].next;
-                       next_desc = 1;
-               }
-       } while (next_desc);
+               mbuf_avail  -= cpy_len;
+               mbuf_offset += cpy_len;
+               desc_avail  -= cpy_len;
+               desc_offset += cpy_len;
+       }
+
+       used_idx = cur_idx & (vq->size - 1);
+       vq->used->ring[used_idx].id = vq->buf_vec[vec_idx].desc_idx;
+       vq->used->ring[used_idx].len = desc_offset;
+       nr_used++;

-       *secure_len = len;
-       *vec_idx = vec_id;
+       return nr_used;
 }

-/*
- * This function works for mergeable RX.
- */
 static inline uint32_t __attribute__((always_inline))
 virtio_dev_merge_rx(struct virtio_net *dev, uint16_t queue_id,
        struct rte_mbuf **pkts, uint32_t count)
 {
        struct vhost_virtqueue *vq;
-       uint32_t pkt_idx = 0, entry_success = 0;
-       uint16_t avail_idx;
-       uint16_t res_base_idx, res_cur_idx;
-       uint8_t success = 0;
+       uint32_t pkt_idx = 0, nr_used = 0;
+       uint16_t start, end;

        LOG_DEBUG(VHOST_DATA, "(%"PRIu64") virtio_dev_merge_rx()\n",
                dev->device_fh);
@@ -526,57 +489,30 @@ virtio_dev_merge_rx(struct virtio_net *dev, uint16_t 
queue_id,
                return 0;

        count = RTE_MIN((uint32_t)MAX_PKT_BURST, count);
-
        if (count == 0)
                return 0;

        for (pkt_idx = 0; pkt_idx < count; pkt_idx++) {
                uint32_t pkt_len = pkts[pkt_idx]->pkt_len + vq->vhost_hlen;

-               do {
-                       /*
-                        * As many data cores may want access to available
-                        * buffers, they need to be reserved.
-                        */
-                       uint32_t secure_len = 0;
-                       uint32_t vec_idx = 0;
-
-                       res_base_idx = vq->last_used_idx_res;
-                       res_cur_idx = res_base_idx;
-
-                       do {
-                               avail_idx = *((volatile uint16_t 
*)&vq->avail->idx);
-                               if (unlikely(res_cur_idx == avail_idx))
-                                       goto merge_rx_exit;
-
-                               update_secure_len(vq, res_cur_idx,
-                                                 &secure_len, &vec_idx);
-                               res_cur_idx++;
-                       } while (pkt_len > secure_len);
-
-                       /* vq->last_used_idx_res is atomically updated. */
-                       success = rte_atomic16_cmpset(&vq->last_used_idx_res,
-                                                       res_base_idx,
-                                                       res_cur_idx);
-               } while (success == 0);
-
-               entry_success = copy_from_mbuf_to_vring(dev, queue_id,
-                       res_base_idx, res_cur_idx, pkts[pkt_idx]);
+               if (reserve_avail_buf_mergeable(vq, pkt_len, &start, &end) < 0)
+                       break;

+               nr_used = copy_mbuf_to_desc_mergeable(dev, vq, start, end,
+                                                     pkts[pkt_idx]);
                rte_compiler_barrier();

                /*
                 * Wait until it's our turn to add our buffer
                 * to the used ring.
                 */
-               while (unlikely(vq->last_used_idx != res_base_idx))
+               while (unlikely(vq->last_used_idx != start))
                        rte_pause();

-               *(volatile uint16_t *)&vq->used->idx += entry_success;
-               vq->last_used_idx = res_cur_idx;
+               *(volatile uint16_t *)&vq->used->idx += nr_used;
+               vq->last_used_idx = end;
        }

-merge_rx_exit:
        if (likely(pkt_idx)) {
                /* flush used->idx update before we read avail->flags. */
                rte_mb();
-- 
1.9.0

Reply via email to