> -----Original Message----- > From: dev [mailto:dev-boun...@dpdk.org] On Behalf Of Tiwei Bie > Sent: Wednesday, June 20, 2018 3:41 PM > To: Liu, Yong <yong....@intel.com> > Cc: maxime.coque...@redhat.com; Wang, Zhihong <zhihong.w...@intel.com>; > dev@dpdk.org > Subject: Re: [dpdk-dev] [PATCH 5/7] net/virtio: support IN_ORDER Rx and Tx > > On Fri, Jun 08, 2018 at 05:07:22PM +0800, Marvin Liu wrote: > > IN_ORDER Rx function can support merge-able feature. Descriptors > > allocation and free will be done in bulk. > > > > Virtio dequeue logic: > > dequeue_burst_rx(burst mbufs) > > for (each mbuf b) { > > if (b need merge) { > > merge remained mbufs > > add merged mbuf to return mbufs list > > } else { > > add mbuf to return mbufs list > > } > > } > > if (last mbuf c need merge) { > > dequeue_burst_rx(required mbufs) > > merge last mbuf c > > } > > refill_avail_ring_bulk() > > update_avail_ring() > > return mbufs list > > > > IN_ORDER Tx function can support indirect and offloading features. > > > > Virtio enqueue logic: > > xmit_cleanup(used descs) > > for (each xmit mbuf b) { > > if (b can inorder xmit) { > > add mbuf b to inorder burst list > > } else { > > xmit inorder burst list > > xmit mbuf b with normal xmit > > } > > } > > if (inorder burst list not empty) { > > xmit inorder burst list > > } > > update_avail_ring() > > > > Signed-off-by: Marvin Liu <yong....@intel.com> > > > > diff --git a/drivers/net/virtio/virtio_ethdev.h > b/drivers/net/virtio/virtio_ethdev.h > > index bb40064ea..25697c872 100644 > > --- a/drivers/net/virtio/virtio_ethdev.h > > +++ b/drivers/net/virtio/virtio_ethdev.h > > @@ -83,9 +83,15 @@ uint16_t virtio_recv_pkts(void *rx_queue, struct > rte_mbuf **rx_pkts, > > uint16_t virtio_recv_mergeable_pkts(void *rx_queue, struct rte_mbuf > **rx_pkts, > > uint16_t nb_pkts); > > > > +uint16_t virtio_recv_inorder_pkts(void *rx_queue, struct rte_mbuf > **rx_pkts, > > + uint16_t nb_pkts); > > + > > uint16_t virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, > > uint16_t nb_pkts); > > > > +uint16_t virtio_xmit_inorder_pkts(void *tx_queue, struct rte_mbuf > **tx_pkts, > > + uint16_t nb_pkts); > > + > > uint16_t virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, > > uint16_t nb_pkts); > > > > diff --git a/drivers/net/virtio/virtio_rxtx.c > b/drivers/net/virtio/virtio_rxtx.c > > index 0bca29855..d0473d6b4 100644 > > --- a/drivers/net/virtio/virtio_rxtx.c > > +++ b/drivers/net/virtio/virtio_rxtx.c > > @@ -122,6 +122,45 @@ virtqueue_dequeue_burst_rx(struct virtqueue *vq, > struct rte_mbuf **rx_pkts, > > return i; > > } > > > > +static uint16_t > > +virtqueue_dequeue_inorder_rx(struct virtqueue *vq, > > virtqueue_dequeue_burst_rx_inorder() will be better. > > > + struct rte_mbuf **rx_pkts, > > + uint32_t *len, > > + uint16_t num) > > +{ > > + struct vring_used_elem *uep; > > + struct rte_mbuf *cookie; > > + uint16_t used_idx; > > + uint16_t i; > > + > > + if (unlikely(num == 0)) > > + return 0; > > + > > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > > Above assignment is misleading. > > > + for (i = 0; i < num; i++) { > > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > > + /* Desc idx same as used idx */ > > + uep = &vq->vq_ring.used->ring[used_idx]; > > + len[i] = uep->len; > > + cookie = (struct rte_mbuf *)vq->vq_descx[used_idx].cookie; > > + > > + if (unlikely(cookie == NULL)) { > > + PMD_DRV_LOG(ERR, "vring descriptor with no mbuf cookie > at %u", > > + vq->vq_used_cons_idx); > > + break; > > + } > > + > > + rte_prefetch0(cookie); > > + rte_packet_prefetch(rte_pktmbuf_mtod(cookie, void *)); > > + rx_pkts[i] = cookie; > > + vq->vq_used_cons_idx++; > > + vq->vq_descx[used_idx].cookie = NULL; > > + } > > + > > + vq_ring_free_inorder(vq, used_idx, i); > > + return i; > > +} > > + > > #ifndef DEFAULT_TX_FREE_THRESH > > #define DEFAULT_TX_FREE_THRESH 32 > > #endif > > @@ -150,6 +189,70 @@ virtio_xmit_cleanup(struct virtqueue *vq, uint16_t > num) > > } > > } > > > > +/* Cleanup from completed inorder transmits. */ > > +static void > > +virtio_xmit_cleanup_inorder(struct virtqueue *vq, uint16_t num) > > +{ > > + uint16_t i, used_idx; > > + > > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > > Above assignment is misleading. > > > + for (i = 0; i < num; i++) { > > + struct vq_desc_extra *dxp; > > + > > + used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); > > + dxp = &vq->vq_descx[used_idx]; > > + vq->vq_used_cons_idx++; > > + > > + if (dxp->cookie != NULL) { > > + rte_pktmbuf_free(dxp->cookie); > > + dxp->cookie = NULL; > > + } > > + } > > + vq_ring_free_inorder(vq, used_idx, num); > > If num is zero, we can't free used_idx. > > > +} > > + > > +static inline int > > +virtqueue_enqueue_inorder_refill(struct virtqueue *vq, > > virtqueue_enqueue_recv_refill_inorder() will be better. > > > > + struct rte_mbuf **cookies, > > + uint16_t num) > > +{ > > + struct vq_desc_extra *dxp; > > + struct virtio_hw *hw = vq->hw; > > + struct vring_desc *start_dp; > > + uint16_t head_idx, idx, i = 0; > > + > > + if (unlikely(vq->vq_free_cnt == 0)) > > + return -ENOSPC; > > + if (unlikely(vq->vq_free_cnt < num)) > > + return -EMSGSIZE; > > + > > + head_idx = vq->vq_desc_head_idx & (vq->vq_nentries - 1); > > + start_dp = vq->vq_ring.desc; > > + > > + while (i < num) { > > + idx = head_idx & (vq->vq_nentries - 1); > > + dxp = &vq->vq_descx[idx]; > > + dxp->cookie = (void *)(struct rte_mbuf *)cookies[i]; > > Casting isn't necessary. > > > + dxp->ndescs = 1; > > + > > + start_dp[idx].addr = > > + VIRTIO_MBUF_ADDR(cookies[i], vq) + > > + RTE_PKTMBUF_HEADROOM - hw->vtnet_hdr_size; > > + start_dp[idx].len = > > + cookies[i]->buf_len - > > + RTE_PKTMBUF_HEADROOM + > > + hw->vtnet_hdr_size; > > + start_dp[idx].flags = VRING_DESC_F_WRITE; > > + > > + head_idx++; > > + i++; > > + } > > + > > + vq->vq_avail_idx += num; > > + vq->vq_desc_head_idx += num; > > + vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - num); > > + return 0; > > +} > > > > static inline int > > virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf > *cookie) > > @@ -246,9 +349,113 @@ tx_offload_enabled(struct virtio_hw *hw) > > (var) = (val); \ > > } while (0) > > > [...] > > + > > +static void > > +virtio_discard_inorder_rxbuf(struct virtqueue *vq, struct rte_mbuf *m) > > virtio_discard_rxbuf_inorder() will be better. > > > +{ > > + int error; > > + > > + error = virtqueue_enqueue_inorder_refill(vq, &m, 1); > > if (unlikely(error)) { > > RTE_LOG(ERR, PMD, "cannot requeue discarded mbuf"); > > rte_pktmbuf_free(m); > > @@ -813,6 +1002,191 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf > **rx_pkts, uint16_t nb_pkts) > > return nb_rx; > > } > > > > +uint16_t > > +virtio_recv_inorder_pkts(void *rx_queue, > > virtio_recv_mergeable_pkts_inorder() will be better. > > > + struct rte_mbuf **rx_pkts, > > + uint16_t nb_pkts) > > +{ > > + struct virtnet_rx *rxvq = rx_queue; > > + struct virtqueue *vq = rxvq->vq; > > + struct virtio_hw *hw = vq->hw; > > + struct rte_mbuf *rxm; > > + struct rte_mbuf *prev; > > + uint16_t nb_used, num, nb_rx; > > + uint32_t len[VIRTIO_MBUF_BURST_SZ]; > > + struct rte_mbuf *rcv_pkts[VIRTIO_MBUF_BURST_SZ]; > > + int error; > > + uint32_t i, nb_enqueued; > > + uint32_t seg_num; > > + uint32_t seg_res; > > + uint32_t hdr_size; > > + int offload; > > + > > + nb_rx = 0; > > + if (unlikely(hw->started == 0)) > > + return nb_rx; > > + > > + nb_used = VIRTQUEUE_NUSED(vq); > > + nb_used = RTE_MIN(nb_used, nb_pkts); > > + nb_used = RTE_MIN(nb_used, VIRTIO_MBUF_BURST_SZ); > > + > > + virtio_rmb(); > > + > > + PMD_RX_LOG(DEBUG, "used:%d", nb_used); > > + > > + i = 0; > > + nb_enqueued = 0; > > + seg_num = 1; > > + seg_res = 0; > > + hdr_size = hw->vtnet_hdr_size; > > + offload = rx_offload_enabled(hw); > > + > > + num = virtqueue_dequeue_inorder_rx(vq, rcv_pkts, len, nb_used); > > + > > + for (; i < num; i++) { > > Why not initialize i in for ()? > > > + struct virtio_net_hdr_mrg_rxbuf *header; > > + > > + PMD_RX_LOG(DEBUG, "dequeue:%d", num); > > + PMD_RX_LOG(DEBUG, "packet len:%d", len[i]); > > + > > + rxm = rcv_pkts[i]; > > + > > + if (unlikely(len[i] < hdr_size + ETHER_HDR_LEN)) { > > + PMD_RX_LOG(ERR, "Packet drop"); > > + nb_enqueued++; > > + virtio_discard_inorder_rxbuf(vq, rxm); > > + rxvq->stats.errors++; > > + continue; > > + } > > + > > + header = (struct virtio_net_hdr_mrg_rxbuf *)((char *)rxm- > >buf_addr + > > + RTE_PKTMBUF_HEADROOM - hdr_size); > > + seg_num = header->num_buffers; > > + > > + if (seg_num == 0) > > + seg_num = 1; > > + > > + rxm->data_off = RTE_PKTMBUF_HEADROOM; > > + rxm->nb_segs = seg_num; > > + rxm->ol_flags = 0; > > + rxm->vlan_tci = 0; > > + rxm->pkt_len = (uint32_t)(len[i] - hdr_size); > > + rxm->data_len = (uint16_t)(len[i] - hdr_size); > > + > > + rxm->port = rxvq->port_id; > > + > > + rx_pkts[nb_rx] = rxm; > > + prev = rxm; > > + > > + if (offload && virtio_rx_offload(rxm, &header->hdr) < 0) { > > + virtio_discard_inorder_rxbuf(vq, rxm); > > + rxvq->stats.errors++; > > + continue; > > + } > > + > > + seg_res = seg_num - 1; > > + > > + /* Merge remaining segments */ > > + while (seg_res != 0 && i < num) { > > + rxm = rcv_pkts[i]; > > + rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; > > + rxm->pkt_len = (uint32_t)(len[i]); > > + rxm->data_len = (uint16_t)(len[i]); > > + > > + rx_pkts[nb_rx]->pkt_len += (uint32_t)(len[i]); > > + rx_pkts[nb_rx]->data_len += (uint16_t)(len[i]); > > + > > + i++; > > + > > + if (prev) > > + prev->next = rxm; > > + > > + prev = rxm; > > + seg_res -= 1; > > + } > > + > > + if (hw->vlan_strip) > > + rte_vlan_strip(rx_pkts[nb_rx]); > > + > > + VIRTIO_DUMP_PACKET(rx_pkts[nb_rx], > > + rx_pkts[nb_rx]->data_len); > > + > > + rxvq->stats.bytes += rx_pkts[nb_rx]->pkt_len; > > + virtio_update_packet_stats(&rxvq->stats, rx_pkts[nb_rx]); > > When seg_res != 0, we cannot run above code. > > > + nb_rx++; > > + } > > + > > + /* Last packet still need merge segments */ > > + while (seg_res != 0) { > > + uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, > > + VIRTIO_MBUF_BURST_SZ); > > + > > + prev = rcv_pkts[nb_rx - 1]; > > + if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { > > + num = virtqueue_dequeue_inorder_rx(vq, rcv_pkts, len, > > + rcv_cnt); > > + uint16_t extra_idx = 0; > > + > > + rcv_cnt = num; > > + while (extra_idx < rcv_cnt) { > > + rxm = rcv_pkts[extra_idx]; > > + rxm->data_off = RTE_PKTMBUF_HEADROOM - hdr_size; > > + rxm->pkt_len = (uint32_t)(len[extra_idx]); > > + rxm->data_len = (uint16_t)(len[extra_idx]); > > + prev->next = rxm; > > + prev = rxm; > > + rx_pkts[nb_rx - 1]->pkt_len += len[extra_idx]; > > + rx_pkts[nb_rx - 1]->data_len += len[extra_idx]; > > + extra_idx += 1; > > + }; > > + seg_res -= rcv_cnt; > > + } else { > > + PMD_RX_LOG(ERR, > > + "No enough segments for packet."); > > + virtio_discard_inorder_rxbuf(vq, prev); > > + rxvq->stats.errors++; > > + nb_rx--; > > + break; > > + } > > + } > > + > > + rxvq->stats.packets += nb_rx; > > + > > + /* Allocate new mbuf for the used descriptor */ > > + error = ENOSPC; > > Above assignment is meaningless.
Thanks, Will remove it in V2. > > > + > > + if (likely(!virtqueue_full(vq))) { > > + /* free_cnt may include mrg descs */ > > + uint16_t free_cnt = vq->vq_free_cnt; > > + struct rte_mbuf *new_pkts[free_cnt]; > > + > > + if (!rte_pktmbuf_alloc_bulk(rxvq->mpool, new_pkts, free_cnt)) > { > > + error = virtqueue_enqueue_inorder_refill(vq, new_pkts, > > + free_cnt); > > + if (unlikely(error)) { > > + for (i = 0; i < free_cnt; i++) > > + rte_pktmbuf_free(new_pkts[i]); > > + } > > + nb_enqueued += free_cnt; > > + } else { > > + struct rte_eth_dev *dev > > + = &rte_eth_devices[rxvq->port_id]; > > + dev->data->rx_mbuf_alloc_failed += free_cnt; > > + } > > + } > > + > > + if (likely(nb_enqueued)) { > > + vq_update_avail_idx(vq); > > + > > + if (unlikely(virtqueue_kick_prepare(vq))) { > > + virtqueue_notify(vq); > > + PMD_RX_LOG(DEBUG, "Notified"); > > + } > > + } > > + > > + return nb_rx; > > +} > > + > > uint16_t > > virtio_recv_mergeable_pkts(void *rx_queue, > > struct rte_mbuf **rx_pkts, > > @@ -1060,7 +1434,8 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf > **tx_pkts, uint16_t nb_pkts) > > } > > > > /* Enqueue Packet buffers */ > > - virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, > can_push); > > + virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, > > + can_push, 0); > > > > txvq->stats.bytes += txm->pkt_len; > > virtio_update_packet_stats(&txvq->stats, txm); > > @@ -1079,3 +1454,121 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf > **tx_pkts, uint16_t nb_pkts) > > > > return nb_tx; > > } > > + > > +uint16_t > > +virtio_xmit_inorder_pkts(void *tx_queue, > > virtio_xmit_pkts_inorder() will be better. > > > + struct rte_mbuf **tx_pkts, > > + uint16_t nb_pkts) > > +{ > > + struct virtnet_tx *txvq = tx_queue; > > + struct virtqueue *vq = txvq->vq; > > + struct virtio_hw *hw = vq->hw; > > + uint16_t hdr_size = hw->vtnet_hdr_size; > > + uint16_t nb_used, nb_avail, nb_tx = 0, inorder_tx = 0; > > + struct rte_mbuf *inorder_pkts[nb_pkts]; > > + int error; > > + > > + if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts)) > > + return nb_tx; > > + > > + if (unlikely(nb_pkts < 1)) > > + return nb_pkts; > > + > > + VIRTQUEUE_DUMP(vq); > > + PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); > > + nb_used = VIRTQUEUE_NUSED(vq); > > + > > + virtio_rmb(); > > + if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) > > + virtio_xmit_cleanup_inorder(vq, nb_used); > > + > > + nb_avail = RTE_MIN(vq->vq_free_cnt, nb_pkts); > > + > > + for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) { > > + struct rte_mbuf *txm = tx_pkts[nb_tx]; > > + int slots, need; > > + int can_push = 0, use_indirect = 0, flush_inorder = 1; > > + > > + /* Do VLAN tag insertion */ > > + if (unlikely(txm->ol_flags & PKT_TX_VLAN_PKT)) { > > + error = rte_vlan_insert(&txm); > > + if (unlikely(error)) { > > + rte_pktmbuf_free(txm); > > + continue; > > + } > > + } > > + > > + txvq->stats.bytes += txm->pkt_len; > > + virtio_update_packet_stats(&txvq->stats, txm); > > + > > + /* optimize ring usage */ > > + if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) || > > + vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) && > > + rte_mbuf_refcnt_read(txm) == 1 && > > + RTE_MBUF_DIRECT(txm) && > > + txm->nb_segs == 1 && > > + rte_pktmbuf_headroom(txm) >= hdr_size && > > + rte_is_aligned(rte_pktmbuf_mtod(txm, char *), > > + __alignof__(struct virtio_net_hdr_mrg_rxbuf))) { > > + can_push = 1; > > There is no need to have 'can_push' any more. > > > + > > + inorder_pkts[inorder_tx] = txm; > > + inorder_tx++; > > inorder_tx isn't a good name. > > > + flush_inorder = 0; > > I think there is no need to introduce 'flush_inorder'. > And it will make the code much more readable by writing > the code like: > > for (nb_tx = 0; nb_tx < nb_avail; nb_tx++) { > struct rte_mbuf *txm = tx_pkts[nb_tx]; > > ...... > > if ((vtpci_with_feature(hw, VIRTIO_F_ANY_LAYOUT) || > vtpci_with_feature(hw, VIRTIO_F_VERSION_1)) && > rte_mbuf_refcnt_read(txm) == 1 && > RTE_MBUF_DIRECT(txm) && > txm->nb_segs == 1 && > rte_pktmbuf_headroom(txm) >= hdr_size && > rte_is_aligned(rte_pktmbuf_mtod(txm, char *), > __alignof__(struct virtio_net_hdr_mrg_rxbuf))) { > > inorder_pkts[nb_inorder_pkts++] = txm; > continue; > } > > /* Flush inorder packets if any. */ > if (nb_inorder_pkts) { > virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, > nb_inorder_pkts); > nb_inorder_pkts = 0; > } > > ...... > > virtqueue_enqueue_xmit(txvq, txm, slots, 0, 0, 1); > } Thanks Tiwei, this logic is more clear. Will modified in V2. > > /* Flush inorder packets if any. */ > if (nb_inorder_pkts) { > virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, > nb_inorder_pkts); > nb_inorder_pkts = 0; > } > > > > > + } else if (vtpci_with_feature(hw, VIRTIO_RING_F_INDIRECT_DESC) > && > > + txm->nb_segs < VIRTIO_MAX_TX_INDIRECT) { > > + use_indirect = 1; > > + } > > There is no need to try to use INDIRECT. > > > + > > + if (flush_inorder) { > > + /* Flush inorder packets */ > > + virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, > > + inorder_tx); > > + inorder_tx = 0; > > + > > + /* How many main ring entries are needed to this Tx? > > + * any_layout => number of segments > > + * indirect => 1 > > + * default => number of segments + 1 > > + */ > > + slots = use_indirect ? 1 : (txm->nb_segs + !can_push); > > + need = slots - vq->vq_free_cnt; > > + > > + if (unlikely(need > 0)) { > > + nb_used = VIRTQUEUE_NUSED(vq); > > + virtio_rmb(); > > + need = RTE_MIN(need, (int)nb_used); > > + > > + virtio_xmit_cleanup_inorder(vq, need); > > + need = slots - vq->vq_free_cnt; > > + if (unlikely(need > 0)) { > > + PMD_TX_LOG(ERR, > > + "No free tx descriptors to > > transmit"); > > + break; > > + } > > + } > > + /* Enqueue Packet buffers */ > > + virtqueue_enqueue_xmit(txvq, txm, slots, use_indirect, > > + can_push, 1); > > + } > > + } > > + > > + /* Transmit all inorder packets */ > > + if (inorder_tx) > > + virtqueue_enqueue_xmit_inorder(txvq, inorder_pkts, inorder_tx); > > + > > + txvq->stats.packets += nb_tx; > > + > > + if (likely(nb_tx)) { > > + vq_update_avail_idx(vq); > > + > > + if (unlikely(virtqueue_kick_prepare(vq))) { > > + virtqueue_notify(vq); > > + PMD_TX_LOG(DEBUG, "Notified backend after xmit"); > > + } > > + } > > + > > + VIRTQUEUE_DUMP(vq); > > + > > + return nb_tx; > > +} > > -- > > 2.17.0 > >