Optimize packed ring Tx datapath alike Rx datapath. Split Rx datapath
into batch and single Tx functions.

Signed-off-by: Marvin Liu <yong....@intel.com>

diff --git a/drivers/net/virtio/virtio_ethdev.h 
b/drivers/net/virtio/virtio_ethdev.h
index 10e39670e..c9aaef0af 100644
--- a/drivers/net/virtio/virtio_ethdev.h
+++ b/drivers/net/virtio/virtio_ethdev.h
@@ -107,6 +107,9 @@ uint16_t virtio_recv_pkts_vec(void *rx_queue, struct 
rte_mbuf **rx_pkts,
 uint16_t virtio_recv_pkts_packed_vec(void *rx_queue, struct rte_mbuf **rx_pkts,
                uint16_t nb_pkts);
 
+uint16_t virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+               uint16_t nb_pkts);
+
 int eth_virtio_dev_init(struct rte_eth_dev *eth_dev);
 
 void virtio_interrupt_handler(void *param);
diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c
index b8b4d3c25..125df3a13 100644
--- a/drivers/net/virtio/virtio_rxtx.c
+++ b/drivers/net/virtio/virtio_rxtx.c
@@ -2174,3 +2174,11 @@ virtio_recv_pkts_packed_vec(void __rte_unused *rx_queue,
 {
        return 0;
 }
+
+__rte_weak uint16_t
+virtio_xmit_pkts_packed_vec(void __rte_unused *tx_queue,
+                           struct rte_mbuf __rte_unused **tx_pkts,
+                           uint16_t __rte_unused nb_pkts)
+{
+       return 0;
+}
diff --git a/drivers/net/virtio/virtio_rxtx_packed_avx.c 
b/drivers/net/virtio/virtio_rxtx_packed_avx.c
index d8cda9d71..0872f2083 100644
--- a/drivers/net/virtio/virtio_rxtx_packed_avx.c
+++ b/drivers/net/virtio/virtio_rxtx_packed_avx.c
@@ -15,6 +15,11 @@
 #include "virtio_pci.h"
 #include "virtqueue.h"
 
+#define REF_CNT_OFFSET 16
+#define SEG_NUM_OFFSET 32
+#define BATCH_REARM_DATA (1ULL << SEG_NUM_OFFSET | \
+                         1ULL << REF_CNT_OFFSET | \
+                         RTE_PKTMBUF_HEADROOM)
 #define PACKED_FLAGS_MASK (1ULL << 55 | 1ULL << 63)
 
 #define PACKED_BATCH_SIZE (RTE_CACHE_LINE_SIZE / \
@@ -41,6 +46,48 @@
        for (iter = val; iter < num; iter++)
 #endif
 
+static void
+virtio_xmit_cleanup_packed_vec(struct virtqueue *vq)
+{
+       struct vring_packed_desc *desc = vq->vq_packed.ring.desc;
+       struct vq_desc_extra *dxp;
+       uint16_t used_idx, id, curr_id, free_cnt = 0;
+       uint16_t size = vq->vq_nentries;
+       struct rte_mbuf *mbufs[size];
+       uint16_t nb_mbuf = 0, i;
+
+       used_idx = vq->vq_used_cons_idx;
+
+       if (desc_is_used(&desc[used_idx], vq))
+               id = desc[used_idx].id;
+       else
+               return;
+
+       do {
+               curr_id = used_idx;
+               dxp = &vq->vq_descx[used_idx];
+               used_idx += dxp->ndescs;
+               free_cnt += dxp->ndescs;
+
+               if (dxp->cookie != NULL) {
+                       mbufs[nb_mbuf] = dxp->cookie;
+                       dxp->cookie = NULL;
+                       nb_mbuf++;
+               }
+
+               if (used_idx >= size) {
+                       used_idx -= size;
+                       vq->vq_packed.used_wrap_counter ^= 1;
+               }
+       } while (curr_id != id);
+
+       for (i = 0; i < nb_mbuf; i++)
+               rte_pktmbuf_free(mbufs[i]);
+
+       vq->vq_used_cons_idx = used_idx;
+       vq->vq_free_cnt += free_cnt;
+}
+
 static inline void
 virtio_update_batch_stats(struct virtnet_stats *stats,
                          uint16_t pkt_len1,
@@ -54,6 +101,185 @@ virtio_update_batch_stats(struct virtnet_stats *stats,
        stats->bytes += pkt_len4;
 }
 
+static inline int
+virtqueue_enqueue_batch_packed_vec(struct virtnet_tx *txvq,
+                                  struct rte_mbuf **tx_pkts)
+{
+       struct virtqueue *vq = txvq->vq;
+       uint16_t head_size = vq->hw->vtnet_hdr_size;
+       struct vq_desc_extra *dxps[PACKED_BATCH_SIZE];
+       uint16_t idx = vq->vq_avail_idx;
+       uint64_t descs[PACKED_BATCH_SIZE];
+       struct virtio_net_hdr *hdrs[PACKED_BATCH_SIZE];
+       uint16_t i;
+
+       if (vq->vq_avail_idx & PACKED_BATCH_MASK)
+               return -1;
+
+       /* Load four mbufs rearm data */
+       __m256i mbufs = _mm256_set_epi64x(
+                       *tx_pkts[3]->rearm_data,
+                       *tx_pkts[2]->rearm_data,
+                       *tx_pkts[1]->rearm_data,
+                       *tx_pkts[0]->rearm_data);
+
+       /* hdr_room=128, refcnt=1 and nb_segs=1 */
+       __m256i mbuf_ref = _mm256_set_epi64x(
+                       BATCH_REARM_DATA, BATCH_REARM_DATA,
+                       BATCH_REARM_DATA, BATCH_REARM_DATA);
+
+       /* Check hdr_room,refcnt and nb_segs */
+       uint16_t cmp = _mm256_cmpneq_epu16_mask(mbufs, mbuf_ref);
+       if (cmp & 0x7777)
+               return -1;
+
+       virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+               dxps[i] = &vq->vq_descx[idx + i];
+               dxps[i]->ndescs = 1;
+               dxps[i]->cookie = tx_pkts[i];
+       }
+
+       virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+               rte_pktmbuf_prepend(tx_pkts[i], head_size);
+               tx_pkts[i]->pkt_len -= head_size;
+       }
+
+       virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               descs[i] = (uint64_t)tx_pkts[i]->data_len |
+               (uint64_t)(idx + i) << 32 |
+               (uint64_t)vq->vq_packed.cached_flags << 48;
+
+       __m512i new_descs = _mm512_set_epi64(
+                       descs[3], VIRTIO_MBUF_DATA_DMA_ADDR(tx_pkts[3], vq),
+                       descs[2], VIRTIO_MBUF_DATA_DMA_ADDR(tx_pkts[2], vq),
+                       descs[1], VIRTIO_MBUF_DATA_DMA_ADDR(tx_pkts[1], vq),
+                       descs[0], VIRTIO_MBUF_DATA_DMA_ADDR(tx_pkts[0], vq));
+
+       virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               hdrs[i] = rte_pktmbuf_mtod_offset(tx_pkts[i],
+                               struct virtio_net_hdr *, -head_size);
+
+       if (!vq->hw->has_tx_offload) {
+               virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+                       virtqueue_clear_net_hdr(hdrs[i]);
+       } else {
+               virtio_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+                       virtqueue_xmit_offload(hdrs[i], tx_pkts[i], true);
+       }
+
+       /* Enqueue Packet buffers */
+       rte_smp_wmb();
+       _mm512_storeu_si512((void *)&vq->vq_packed.ring.desc[idx], new_descs);
+
+       virtio_update_batch_stats(&txvq->stats, tx_pkts[0]->pkt_len,
+                       tx_pkts[1]->pkt_len, tx_pkts[2]->pkt_len,
+                       tx_pkts[3]->pkt_len);
+
+       vq->vq_avail_idx += PACKED_BATCH_SIZE;
+       vq->vq_free_cnt -= PACKED_BATCH_SIZE;
+
+       if (vq->vq_avail_idx >= vq->vq_nentries) {
+               vq->vq_avail_idx -= vq->vq_nentries;
+               vq->vq_packed.cached_flags ^=
+                       VRING_PACKED_DESC_F_AVAIL_USED;
+       }
+
+       return 0;
+}
+
+static inline int
+virtqueue_enqueue_single_packed_vec(struct virtnet_tx *txvq,
+                                   struct rte_mbuf *txm)
+{
+       struct virtqueue *vq = txvq->vq;
+       struct virtio_hw *hw = vq->hw;
+       uint16_t hdr_size = hw->vtnet_hdr_size;
+       uint16_t slots, can_push;
+       int16_t need;
+
+       /* How many main ring entries are needed to this Tx?
+        * any_layout => number of segments
+        * default    => number of segments + 1
+        */
+       can_push = rte_mbuf_refcnt_read(txm) == 1 &&
+                  RTE_MBUF_DIRECT(txm) &&
+                  txm->nb_segs == 1 &&
+                  rte_pktmbuf_headroom(txm) >= hdr_size;
+
+       slots = txm->nb_segs + !can_push;
+       need = slots - vq->vq_free_cnt;
+
+       /* Positive value indicates it need free vring descriptors */
+       if (unlikely(need > 0)) {
+               virtio_xmit_cleanup_packed_vec(vq);
+               need = slots - vq->vq_free_cnt;
+               if (unlikely(need > 0)) {
+                       PMD_TX_LOG(ERR,
+                                  "No free tx descriptors to transmit");
+                       return -1;
+               }
+       }
+
+       /* Enqueue Packet buffers */
+       virtqueue_enqueue_xmit_packed(txvq, txm, slots, can_push, 1);
+
+       txvq->stats.bytes += txm->pkt_len;
+       return 0;
+}
+
+uint16_t
+virtio_xmit_pkts_packed_vec(void *tx_queue, struct rte_mbuf **tx_pkts,
+                       uint16_t nb_pkts)
+{
+       struct virtnet_tx *txvq = tx_queue;
+       struct virtqueue *vq = txvq->vq;
+       struct virtio_hw *hw = vq->hw;
+       uint16_t nb_tx = 0;
+       uint16_t remained;
+
+       if (unlikely(hw->started == 0 && tx_pkts != hw->inject_pkts))
+               return nb_tx;
+
+       if (unlikely(nb_pkts < 1))
+               return nb_pkts;
+
+       PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts);
+
+       if (vq->vq_free_cnt <= vq->vq_nentries - vq->vq_free_thresh)
+               virtio_xmit_cleanup_packed_vec(vq);
+
+       remained = RTE_MIN(nb_pkts, vq->vq_free_cnt);
+
+       while (remained) {
+               if (remained >= PACKED_BATCH_SIZE) {
+                       if (!virtqueue_enqueue_batch_packed_vec(txvq,
+                                               &tx_pkts[nb_tx])) {
+                               nb_tx += PACKED_BATCH_SIZE;
+                               remained -= PACKED_BATCH_SIZE;
+                               continue;
+                       }
+               }
+               if (!virtqueue_enqueue_single_packed_vec(txvq,
+                                       tx_pkts[nb_tx])) {
+                       nb_tx++;
+                       remained--;
+                       continue;
+               }
+               break;
+       };
+
+       txvq->stats.packets += nb_tx;
+
+       if (likely(nb_tx)) {
+               if (unlikely(virtqueue_kick_prepare_packed(vq))) {
+                       virtqueue_notify(vq);
+                       PMD_TX_LOG(DEBUG, "Notified backend after xmit");
+               }
+       }
+
+       return nb_tx;
+}
+
 /* Optionally fill offload information in structure */
 static inline int
 virtio_vec_rx_offload(struct rte_mbuf *m, struct virtio_net_hdr *hdr)
-- 
2.17.1

Reply via email to