> -----Original Message-----
> From: dev <dev-boun...@dpdk.org> On Behalf Of Maxime Coquelin
> Sent: Friday, December 21, 2018 1:27 AM
> To: dev@dpdk.org; jfreim...@redhat.com; tiwei....@intel.com;
> zhihong.w...@intel.com
> Cc: Maxime Coquelin <maxime.coque...@redhat.com>
> Subject: [dpdk-dev] [PATCH v3 3/3] net/virtio: improve batching in
> mergeable path
> 
> This patch improves both descriptors dequeue and refill,
> by using the same batching strategy as done in in-order path.
> 
> Signed-off-by: Maxime Coquelin <maxime.coque...@redhat.com>
> Tested-by: Jens Freimann <jfreim...@redhat.com>
> Reviewed-by: Jens Freimann <jfreim...@redhat.com>
> ---
>  drivers/net/virtio/virtio_rxtx.c | 239 +++++++++++++++++--------------
>  1 file changed, 129 insertions(+), 110 deletions(-)
> 
> diff --git a/drivers/net/virtio/virtio_rxtx.c 
> b/drivers/net/virtio/virtio_rxtx.c
> index 58376ced3..1cfa2f0d6 100644
> --- a/drivers/net/virtio/virtio_rxtx.c
> +++ b/drivers/net/virtio/virtio_rxtx.c
> @@ -353,41 +353,44 @@ virtqueue_enqueue_refill_inorder(struct
> virtqueue *vq,
>  }
> 
>  static inline int
> -virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
> *cookie)
> +virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf
> **cookie,
> +                             uint16_t num)
>  {
>       struct vq_desc_extra *dxp;
>       struct virtio_hw *hw = vq->hw;
> -     struct vring_desc *start_dp;
> -     uint16_t needed = 1;
> -     uint16_t head_idx, idx;
> +     struct vring_desc *start_dp = vq->vq_ring.desc;
> +     uint16_t idx, i;
> 
>       if (unlikely(vq->vq_free_cnt == 0))
>               return -ENOSPC;
> -     if (unlikely(vq->vq_free_cnt < needed))
> +     if (unlikely(vq->vq_free_cnt < num))
>               return -EMSGSIZE;
> 
> -     head_idx = vq->vq_desc_head_idx;
> -     if (unlikely(head_idx >= vq->vq_nentries))
> +     if (unlikely(vq->vq_desc_head_idx >= vq->vq_nentries))
>               return -EFAULT;
> 
> -     idx = head_idx;
> -     dxp = &vq->vq_descx[idx];
> -     dxp->cookie = (void *)cookie;
> -     dxp->ndescs = needed;
> +     for (i = 0; i < num; i++) {
> +             idx = vq->vq_desc_head_idx;
> +             dxp = &vq->vq_descx[idx];
> +             dxp->cookie = (void *)cookie[i];
> +             dxp->ndescs = 1;
> 
> -     start_dp = vq->vq_ring.desc;
> -     start_dp[idx].addr =
> -             VIRTIO_MBUF_ADDR(cookie, vq) +
> -             RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
> -     start_dp[idx].len =
> -             cookie->buf_len - RTE_PKTMBUF_HEADROOM + hw-
> >vtnet_hdr_size;
> -     start_dp[idx].flags =  VRING_DESC_F_WRITE;
> -     idx = start_dp[idx].next;
> -     vq->vq_desc_head_idx = idx;
> -     if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> -             vq->vq_desc_tail_idx = idx;
> -     vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
> -     vq_update_avail_ring(vq, head_idx);
> +             start_dp[idx].addr =
> +                     VIRTIO_MBUF_ADDR(cookie[i], vq) +
> +                     RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size;
> +             start_dp[idx].len =
> +                     cookie[i]->buf_len - RTE_PKTMBUF_HEADROOM +
> +                     hw->vtnet_hdr_size;
> +             start_dp[idx].flags = VRING_DESC_F_WRITE;
> +             vq->vq_desc_head_idx = start_dp[idx].next;
> +             vq_update_avail_ring(vq, idx);
> +             if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
> {
> +                     vq->vq_desc_tail_idx = vq->vq_desc_head_idx;
> +                     break;
> +             }
> +     }
> +
> +     vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num);
> 
>       return 0;
>  }
> @@ -892,7 +895,8 @@ virtio_dev_rx_queue_setup_finish(struct
> rte_eth_dev *dev, uint16_t queue_idx)
>                               error =
> virtqueue_enqueue_recv_refill_packed(vq,
>                                               &m, 1);
>                       else
> -                             error = virtqueue_enqueue_recv_refill(vq,
> m);
> +                             error = virtqueue_enqueue_recv_refill(vq,
> +                                             &m, 1);
>                       if (error) {
>                               rte_pktmbuf_free(m);
>                               break;
> @@ -991,7 +995,7 @@ virtio_discard_rxbuf(struct virtqueue *vq, struct
> rte_mbuf *m)
>       if (vtpci_packed_queue(vq->hw))
>               error = virtqueue_enqueue_recv_refill_packed(vq, &m, 1);
>       else
> -             error = virtqueue_enqueue_recv_refill(vq, m);
> +             error = virtqueue_enqueue_recv_refill(vq, &m, 1);
> 
>       if (unlikely(error)) {
>               RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf");
> @@ -1211,7 +1215,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf
> **rx_pkts, uint16_t nb_pkts)
>                       dev->data->rx_mbuf_alloc_failed++;
>                       break;
>               }
> -             error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
> +             error = virtqueue_enqueue_recv_refill(vq, &new_mbuf, 1);
>               if (unlikely(error)) {
>                       rte_pktmbuf_free(new_mbuf);
>                       break;
> @@ -1528,19 +1532,18 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>       struct virtnet_rx *rxvq = rx_queue;
>       struct virtqueue *vq = rxvq->vq;
>       struct virtio_hw *hw = vq->hw;
> -     struct rte_mbuf *rxm, *new_mbuf;
> -     uint16_t nb_used, num, nb_rx;
> +     struct rte_mbuf *rxm;
> +     struct rte_mbuf *prev;
> +     uint16_t nb_used, num, nb_rx = 0;
>       uint32_t len[VIRTIO_MBUF_BURST_SZ];
>       struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ];
> -     struct rte_mbuf *prev;
>       int error;
> -     uint32_t i, nb_enqueued;
> -     uint32_t seg_num;
> -     uint16_t extra_idx;
> -     uint32_t seg_res;
> -     uint32_t hdr_size;
> +     uint32_t nb_enqueued = 0;
> +     uint32_t seg_num = 0;
> +     uint32_t seg_res = 0;
> +     uint32_t hdr_size = hw->vtnet_hdr_size;
> +     int32_t i;
> 
> -     nb_rx = 0;
>       if (unlikely(hw->started == 0))
>               return nb_rx;
> 
> @@ -1550,31 +1553,25 @@ virtio_recv_mergeable_pkts(void *rx_queue,
> 
>       PMD_RX_LOG(DEBUG, "used:%d", nb_used);
> 
> -     i = 0;
> -     nb_enqueued = 0;
> -     seg_num = 0;
> -     extra_idx = 0;
> -     seg_res = 0;
> -     hdr_size = hw->vtnet_hdr_size;
> -
> -     while (i < nb_used) {
> -             struct virtio_net_hdr_mrg_rxbuf *header;
> +     num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts;
> +     if (unlikely(num > VIRTIO_MBUF_BURST_SZ))
> +             num = VIRTIO_MBUF_BURST_SZ;
> +     if (likely(num > DESC_PER_CACHELINE))
> +             num = num - ((vq->vq_used_cons_idx + num) %
> +                             DESC_PER_CACHELINE);
> 
> -             if (nb_rx == nb_pkts)
> -                     break;
> 
> -             num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, 1);
> -             if (num != 1)
> -                     continue;
> +     num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, num);
> 
> -             i++;
> +     for (i = 0; i < num; i++) {
> +             struct virtio_net_hdr_mrg_rxbuf *header;
> 
>               PMD_RX_LOG(DEBUG, "dequeue:%d", num);
> -             PMD_RX_LOG(DEBUG, "packet len:%d", len[0]);
> +             PMD_RX_LOG(DEBUG, "packet len:%d", len[i]);
> 
> -             rxm = rcv_pkts[0];
> +             rxm = rcv_pkts[i];
> 
> -             if (unlikely(len[0] < hdr_size + ETHER_HDR_LEN)) {
> +             if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) {
>                       PMD_RX_LOG(ERR, "Packet drop");
>                       nb_enqueued++;
>                       virtio_discard_rxbuf(vq, rxm);
> @@ -1582,10 +1579,10 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>                       continue;
>               }
> 
> -             header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm-
> >buf_addr +
> -                     RTE_PKTMBUF_HEADROOM - hdr_size);
> +             header = (struct virtio_net_hdr_mrg_rxbuf *)
> +                      ((char *)rxm->buf_addr +
> RTE_PKTMBUF_HEADROOM
> +                      - hdr_size);
>               seg_num = header->num_buffers;
> -
>               if (seg_num == 0)
>                       seg_num = 1;
> 
> @@ -1593,10 +1590,11 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>               rxm->nb_segs = seg_num;
>               rxm->ol_flags = 0;
>               rxm->vlan_tci = 0;
> -             rxm->pkt_len = (uint32_t)(len[0] - hdr_size);
> -             rxm->data_len = (uint16_t)(len[0] - hdr_size);
> +             rxm->pkt_len = (uint32_t)(len[i] - hdr_size);
> +             rxm->data_len = (uint16_t)(len[i] - hdr_size);
> 
>               rxm->port = rxvq->port_id;
> +
>               rx_pkts[nb_rx] = rxm;
>               prev = rxm;
> 
> @@ -1607,75 +1605,96 @@ virtio_recv_mergeable_pkts(void *rx_queue,
>                       continue;
>               }
> 
> +             if (hw->vlan_strip)
> +                     rte_vlan_strip(rx_pkts[nb_rx]);
> +
>               seg_res = seg_num - 1;
> 
> -             while (seg_res != 0) {
> -                     /*
> -                      * Get extra segments for current uncompleted
> packet.
> -                      */
> -                     uint16_t  rcv_cnt =
> -                             RTE_MIN(seg_res, RTE_DIM(rcv_pkts));
> -                     if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
> -                             uint32_t rx_num =
> -                                     virtqueue_dequeue_burst_rx(vq,
> -                                     rcv_pkts, len, rcv_cnt);
> -                             i += rx_num;
> -                             rcv_cnt = rx_num;
> -                     } else {
> -                             PMD_RX_LOG(ERR,
> -                                        "No enough segments for
> packet.");
> -                             nb_enqueued++;
> -                             virtio_discard_rxbuf(vq, rxm);
> -                             rxvq->stats.errors++;
> -                             break;
> -                     }
> +             /* Merge remaining segments */
> +             while (seg_res != 0 && i < (num - 1)) {
> +                     i++;
> +
> +                     rxm = rcv_pkts[i];
> +                     rxm->data_off = RTE_PKTMBUF_HEADROOM -
> hdr_size;
> +                     rxm->pkt_len = (uint32_t)(len[i]);
> +                     rxm->data_len = (uint16_t)(len[i]);
> +
> +                     rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]);
> +                     rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]);
> +
> +                     if (prev)
> +                             prev->next = rxm;
> +
> +                     prev = rxm;
> +                     seg_res -= 1;
> +             }
> +
> +             if (!seg_res) {
> +                     virtio_rx_stats_updated(rxvq, rx_pkts[nb_rx]);
> +                     nb_rx++;
> +             }
> +     }
> +
> +     /* Last packet still need merge segments */
> +     while (seg_res != 0) {
> +             uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res,
> +                                     VIRTIO_MBUF_BURST_SZ);
> 
> -                     extra_idx = 0;
> +             prev = rcv_pkts[nb_rx];
> +             if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) {
> +                     num = virtqueue_dequeue_burst_rx(vq, rcv_pkts,
> len,
> +                                                        rcv_cnt);
> +                     uint16_t extra_idx = 0;
> 
> +                     rcv_cnt = num;
>                       while (extra_idx < rcv_cnt) {
>                               rxm = rcv_pkts[extra_idx];
> -
> -                             rxm->data_off =
> RTE_PKTMBUF_HEADROOM - hdr_size;
> +                             rxm->data_off =
> +                                     RTE_PKTMBUF_HEADROOM -
> hdr_size;
>                               rxm->pkt_len = (uint32_t)(len[extra_idx]);
>                               rxm->data_len = (uint16_t)(len[extra_idx]);
> -
> -                             if (prev)
> -                                     prev->next = rxm;
> -
> +                             prev->next = rxm;
>                               prev = rxm;
> -                             rx_pkts[nb_rx]->pkt_len += rxm->pkt_len;
> -                             extra_idx++;
> +                             rx_pkts[nb_rx]->pkt_len += len[extra_idx];
> +                             rx_pkts[nb_rx]->data_len += len[extra_idx];
> +                             extra_idx += 1;
>                       };
>                       seg_res -= rcv_cnt;
> -             }
> -
> -             if (hw->vlan_strip)
> -                     rte_vlan_strip(rx_pkts[nb_rx]);
> -
> -             VIRTIO_DUMP_PACKET(rx_pkts[nb_rx],
> -                     rx_pkts[nb_rx]->data_len);
> 
> -             virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]);
> -             nb_rx++;
> +                     if (!seg_res) {
> +                             virtio_rx_stats_updated(rxvq,
> rx_pkts[nb_rx]);
> +                             nb_rx++;
> +                     }
> +             } else {
> +                     PMD_RX_LOG(ERR,
> +                                     "No enough segments for packet.");
> +                     virtio_discard_rxbuf(vq, prev);
> +                     rxvq->stats.errors++;
> +                     break;
> +             }
>       }
> 
>       rxvq->stats.packets += nb_rx;
> 
>       /* Allocate new mbuf for the used descriptor */
> -     while (likely(!virtqueue_full(vq))) {
> -             new_mbuf = rte_mbuf_raw_alloc(rxvq->mpool);
> -             if (unlikely(new_mbuf == NULL)) {
> -                     struct rte_eth_dev *dev
> -                             = &rte_eth_devices[rxvq->port_id];
> -                     dev->data->rx_mbuf_alloc_failed++;
> -                     break;
> -             }
> -             error = virtqueue_enqueue_recv_refill(vq, new_mbuf);
> -             if (unlikely(error)) {
> -                     rte_pktmbuf_free(new_mbuf);
> -                     break;
> +     if (likely(!virtqueue_full(vq))) {
> +             /* free_cnt may include mrg descs */
> +             uint16_t free_cnt = vq->vq_free_cnt;
> +             struct rte_mbuf *new_pkts[free_cnt];
> +
> +             if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts,
> free_cnt)) {
> +                     error = virtqueue_enqueue_recv_refill(vq, new_pkts,
> +                                     free_cnt);
> +                     if (unlikely(error)) {
> +                             for (i = 0; i < free_cnt; i++)
> +                                     rte_pktmbuf_free(new_pkts[i]);
Missing error handling here?  the execution keeps going on without the mbufs?
/Gavin

> +                     }
> +                     nb_enqueued += free_cnt;
> +             } else {
> +                     struct rte_eth_dev *dev =
> +                             &rte_eth_devices[rxvq->port_id];
> +                     dev->data->rx_mbuf_alloc_failed += free_cnt;
>               }
> -             nb_enqueued++;
>       }
> 
>       if (likely(nb_enqueued)) {
> --
> 2.17.2

Reply via email to