Hi Jiayu,
> -----Original Message----- > From: Hu, Jiayu <jiayu...@intel.com> > Sent: Monday, December 28, 2020 12:04 PM > To: Jiang, Cheng1 <cheng1.ji...@intel.com>; maxime.coque...@redhat.com; > Xia, Chenbo <chenbo....@intel.com> > Cc: dev@dpdk.org; Yang, YvonneX <yvonnex.y...@intel.com> > Subject: RE: [PATCH v4 2/2] examples/vhost: refactor vhost data path > > Hi Cheng, > > Some comments are inline. > > Thanks, > Jiayu > > -----Original Message----- > > From: Jiang, Cheng1 <cheng1.ji...@intel.com> > > Sent: Friday, December 25, 2020 4:07 PM > > To: maxime.coque...@redhat.com; Xia, Chenbo <chenbo....@intel.com> > > Cc: dev@dpdk.org; Hu, Jiayu <jiayu...@intel.com>; Yang, YvonneX > > <yvonnex.y...@intel.com>; Jiang, Cheng1 <cheng1.ji...@intel.com> > > Subject: [PATCH v4 2/2] examples/vhost: refactor vhost data path > > > > Change the vm2vm data path to batch enqueue for better performance. > > Support latest async vhost API, refactor vhost async data path, > > replase rte_atomicNN_xxx to atomic_XXX and clean some codes. > > Typo: replase -> replace I'll fix it in the next version. > > > > > Signed-off-by: Cheng Jiang <cheng1.ji...@intel.com> > > --- > > examples/vhost/main.c | 202 +++++++++++++++++++++++++++++++----- > ------ > > examples/vhost/main.h | 7 +- > > 2 files changed, 154 insertions(+), 55 deletions(-) > > > > diff --git a/examples/vhost/main.c b/examples/vhost/main.c index > > 8d8c3038b..3ea12a474 100644 > > --- a/examples/vhost/main.c > > +++ b/examples/vhost/main.c > > @@ -179,9 +179,18 @@ struct mbuf_table { struct rte_mbuf > > *m_table[MAX_PKT_BURST]; }; > > > > +struct vhost_bufftable { > > +uint32_t len; > > +uint64_t pre_tsc; > > +struct rte_mbuf *m_table[MAX_PKT_BURST]; }; > > + > > /* TX queue for each data core. */ > > struct mbuf_table lcore_tx_queue[RTE_MAX_LCORE]; > > > > +/* TX queue for each vhost device. */ > > Every lcore maintains a TX buffer for every vhost device, which is to batch > pkts to enqueue for higher performance. > I suggest you to update the description of vhost_txbuff above, as it is not > very clear. Sure, will add some comments in the next version. > > > +struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * > > MAX_VHOST_DEVICE]; > > + > > #define MBUF_TABLE_DRAIN_TSC((rte_get_tsc_hz() + US_PER_S - 1) \ > > / US_PER_S * BURST_TX_DRAIN_US) > > #define VLAN_HLEN 4 > > @@ -804,39 +813,114 @@ unlink_vmdq(struct vhost_dev *vdev) } } > > > > +static inline void > > +free_pkts(struct rte_mbuf **pkts, uint16_t n) { while (n--) > > +rte_pktmbuf_free(pkts[n]); } > > + > > static __rte_always_inline void > > -virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev, > > +complete_async_pkts(struct vhost_dev *vdev) { struct rte_mbuf > > +*p_cpl[MAX_PKT_BURST]; uint16_t complete_count; > > + > > +complete_count = rte_vhost_poll_enqueue_completed(vdev->vid, > > +VIRTIO_RXQ, p_cpl, > > MAX_PKT_BURST); > > +if (complete_count) { > > +atomic_fetch_sub(&vdev->nr_async_pkts, complete_count); > > +free_pkts(p_cpl, complete_count); } } > > + > > +static __rte_always_inline void > > +sync_virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev > > +*src_vdev, > > struct rte_mbuf *m) > > { > > uint16_t ret; > > -struct rte_mbuf *m_cpl[1]; > > > > if (builtin_net_driver) { > > ret = vs_enqueue_pkts(dst_vdev, VIRTIO_RXQ, &m, 1); -} else if > > (async_vhost_driver) { -ret = > > rte_vhost_submit_enqueue_burst(dst_vdev->vid, > > VIRTIO_RXQ, > > -&m, 1); > > - > > -if (likely(ret)) > > -dst_vdev->nr_async_pkts++; > > - > > -while (likely(dst_vdev->nr_async_pkts)) { -if > > (rte_vhost_poll_enqueue_completed(dst_vdev- > > >vid, > > -VIRTIO_RXQ, m_cpl, 1)) > > -dst_vdev->nr_async_pkts--; > > -} > > } else { > > ret = rte_vhost_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ, &m, 1); } > > > > if (enable_stats) { > > -rte_atomic64_inc(&dst_vdev->stats.rx_total_atomic); > > -rte_atomic64_add(&dst_vdev->stats.rx_atomic, ret); > > +atomic_fetch_add(&dst_vdev->stats.rx_total_atomic, 1); > > +atomic_fetch_add(&dst_vdev->stats.rx_atomic, ret); > > src_vdev->stats.tx_total++; > > src_vdev->stats.tx += ret; > > } > > } > > > > +static __rte_always_inline void > > +drain_vhost(struct vhost_dev *vdev) > > +{ > > +uint16_t ret; > > +uint64_t queue_id = rte_lcore_id() * MAX_VHOST_DEVICE + vdev- > > >vid; > > +uint16_t nr_xmit = vhost_txbuff[queue_id]->len; struct rte_mbuf **m = > > +vhost_txbuff[queue_id]->m_table; > > "queue_id" is not a very good name, as it's not the queue id of vhost device, > but a buffer index which holds pkts to enqueue. Sure, I will change it to buff_idx. > > > + > > +if (builtin_net_driver) { > > +ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit); } else if > > +(async_vhost_driver) { uint32_t cpu_cpl_nr = 0; uint16_t enqueue_fail > > += 0; struct rte_mbuf *m_cpu_cpl[nr_xmit]; > > + > > +complete_async_pkts(vdev); > > +ret = rte_vhost_submit_enqueue_burst(vdev->vid, > > VIRTIO_RXQ, > > +m, nr_xmit, m_cpu_cpl, &cpu_cpl_nr); > > +atomic_fetch_add(&vdev->nr_async_pkts, ret - cpu_cpl_nr); > > + > > +if (cpu_cpl_nr) > > +free_pkts(m_cpu_cpl, cpu_cpl_nr); > > + > > +enqueue_fail = nr_xmit - ret; > > +if (enqueue_fail) > > +free_pkts(&m[ret], nr_xmit - ret); > > +} else { > > +ret = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ, m, nr_xmit); } > > + > > +if (enable_stats) { > > +atomic_fetch_add(&vdev->stats.rx_total_atomic, nr_xmit); > > +atomic_fetch_add(&vdev->stats.rx_atomic, ret); } > > + > > +if (!async_vhost_driver) > > +free_pkts(m, nr_xmit); > > +} > > + > > +static __rte_always_inline void > > +drain_vhost_table(void) > > +{ > > +const uint16_t lcore_id = rte_lcore_id(); struct vhost_bufftable > > +*vhost_txq; struct vhost_dev *vdev; uint64_t cur_tsc; > > + > > +TAILQ_FOREACH(vdev, &lcore_info[lcore_id].vdev_list, > > lcore_vdev_entry) { > > A lcore may have pkts to enqueue for any vhost device, as it's decided by > mac address. So drain_vhost_table() shouldn't just process vhost ports > allocated to its lcore, but all vhost ports that have un-sent packets. Sure, I will fix it in the next version. > > > +if (!vdev->remove) { > > +vhost_txq = vhost_txbuff[lcore_id * > > MAX_VHOST_DEVICE > > ++ vdev->vid]; > > +cur_tsc = rte_rdtsc(); > > + > > +if (unlikely(cur_tsc - vhost_txq->pre_tsc > > +> MBUF_TABLE_DRAIN_TSC)) { > > +RTE_LOG_DP(DEBUG, VHOST_DATA, > > +"Vhost tX queue drained after > > "tX" -> "TX" I will fix it in the next version. > > > timeout with burst size %u\n", > > +vhost_txq->len); > > +drain_vhost(vdev); > > +vhost_txq->len = 0; > > +vhost_txq->pre_tsc = cur_tsc; > > +} > > +} > > +} > > +} > > + > > /* > > * Check if the packet destination MAC address is for a local device. > > If so then put > > * the packet on that devices RX queue. If not then return. > > @@ -846,7 +930,8 @@ virtio_tx_local(struct vhost_dev *vdev, struct > > rte_mbuf *m) { struct rte_ether_hdr *pkt_hdr; struct vhost_dev > > *dst_vdev; > > - > > +struct vhost_bufftable *vhost_txq; > > +const uint16_t lcore_id = rte_lcore_id(); > > Why use "const"? No particular reason, maybe I just forget to delete it. I will fix it in the next version. > > > pkt_hdr = rte_pktmbuf_mtod(m, struct rte_ether_hdr *); > > > > dst_vdev = find_vhost_dev(&pkt_hdr->d_addr); @@ -869,7 +954,19 @@ > > virtio_tx_local(struct vhost_dev *vdev, struct rte_mbuf *m) return 0; > > } > > > > -virtio_xmit(dst_vdev, vdev, m); > > +vhost_txq = vhost_txbuff[lcore_id * MAX_VHOST_DEVICE + dst_vdev- > > >vid]; > > +vhost_txq->m_table[vhost_txq->len++] = m; > > + > > +if (enable_stats) { > > +vdev->stats.tx_total++; > > +vdev->stats.tx++; > > +} > > + > > +if (unlikely(vhost_txq->len == MAX_PKT_BURST)) { > > +drain_vhost(dst_vdev); vhost_txq->len = 0; vhost_txq->pre_tsc = > > +rte_rdtsc(); } > > return 0; > > } > > > > @@ -940,13 +1037,6 @@ static void virtio_tx_offload(struct rte_mbuf > > *m) tcp_hdr->cksum = get_psd_sum(l3_hdr, m->ol_flags); } > > > > -static inline void > > -free_pkts(struct rte_mbuf **pkts, uint16_t n) -{ -while (n--) > > -rte_pktmbuf_free(pkts[n]); -} > > - > > static __rte_always_inline void > > do_drain_mbuf_table(struct mbuf_table *tx_q) { @@ -979,16 +1069,14 > > @@ virtio_tx_route(struct vhost_dev *vdev, struct rte_mbuf *m, > > uint16_t vlan_tag) > > > > TAILQ_FOREACH(vdev2, &vhost_dev_list, global_vdev_entry) { if (vdev2 > > != vdev) -virtio_xmit(vdev2, vdev, m); > > +sync_virtio_xmit(vdev2, vdev, m); > > } > > goto queue2nic; > > } > > > > /*check if destination is local VM*/ > > -if ((vm2vm_mode == VM2VM_SOFTWARE) && (virtio_tx_local(vdev, > > m) == 0)) { > > -rte_pktmbuf_free(m); > > +if ((vm2vm_mode == VM2VM_SOFTWARE) && (virtio_tx_local(vdev, > > m) == 0)) > > return; > > -} > > > > if (unlikely(vm2vm_mode == VM2VM_HARDWARE)) { if > > (unlikely(find_local_dest(vdev, m, &offset, @@ -1073,19 +1161,6 @@ > > drain_mbuf_table(struct mbuf_table *tx_q) } } > > > > -static __rte_always_inline void > > -complete_async_pkts(struct vhost_dev *vdev, uint16_t qid) -{ -struct > > rte_mbuf *p_cpl[MAX_PKT_BURST]; -uint16_t complete_count; > > - > > -complete_count = rte_vhost_poll_enqueue_completed(vdev->vid, > > -qid, p_cpl, MAX_PKT_BURST); > > -vdev->nr_async_pkts -= complete_count; > > -if (complete_count) > > -free_pkts(p_cpl, complete_count); > > -} > > - > > static __rte_always_inline void > > drain_eth_rx(struct vhost_dev *vdev) > > { > > @@ -1095,9 +1170,6 @@ drain_eth_rx(struct vhost_dev *vdev) rx_count > = > > rte_eth_rx_burst(ports[0], vdev->vmdq_rx_q, > > pkts, MAX_PKT_BURST); > > > > -while (likely(vdev->nr_async_pkts)) > > -complete_async_pkts(vdev, VIRTIO_RXQ); > > - > > if (!rx_count) > > return; > > > > @@ -1123,17 +1195,31 @@ drain_eth_rx(struct vhost_dev *vdev) > > enqueue_count = vs_enqueue_pkts(vdev, VIRTIO_RXQ, pkts, rx_count); } > > else if (async_vhost_driver) { > > +uint32_t cpu_cpl_nr = 0; > > +uint16_t enqueue_fail = 0; > > +struct rte_mbuf *m_cpu_cpl[MAX_PKT_BURST]; > > + > > +complete_async_pkts(vdev); > > enqueue_count = rte_vhost_submit_enqueue_burst(vdev- > > >vid, > > -VIRTIO_RXQ, pkts, rx_count); > > -vdev->nr_async_pkts += enqueue_count; > > +VIRTIO_RXQ, pkts, rx_count, > > +m_cpu_cpl, &cpu_cpl_nr); > > +atomic_fetch_add(&vdev->nr_async_pkts, > > +enqueue_count - cpu_cpl_nr); > > +if (cpu_cpl_nr) > > +free_pkts(m_cpu_cpl, cpu_cpl_nr); > > + > > +enqueue_fail = rx_count - enqueue_count; if (enqueue_fail) > > +free_pkts(&pkts[enqueue_count], enqueue_fail); > > + > > } else { > > enqueue_count = rte_vhost_enqueue_burst(vdev->vid, > > VIRTIO_RXQ, > > pkts, rx_count); > > } > > > > if (enable_stats) { > > -rte_atomic64_add(&vdev->stats.rx_total_atomic, rx_count); > > -rte_atomic64_add(&vdev->stats.rx_atomic, enqueue_count); > > +atomic_fetch_add(&vdev->stats.rx_total_atomic, rx_count); > > +atomic_fetch_add(&vdev->stats.rx_atomic, enqueue_count); > > } > > > > if (!async_vhost_driver) > > @@ -1202,7 +1288,7 @@ switch_worker(void *arg __rte_unused) > > > > while(1) { > > drain_mbuf_table(tx_q); > > - > > +drain_vhost_table(); > > /* > > * Inform the configuration core that we have exited the > > * linked list and that no devices are in use if requested. > > @@ -1298,6 +1384,7 @@ static int > > new_device(int vid) > > { > > int lcore, core_add = 0; > > +uint16_t i; > > uint32_t device_num_min = num_devices; struct vhost_dev *vdev; vdev > > = rte_zmalloc("vhost device", sizeof(*vdev), RTE_CACHE_LINE_SIZE); @@ > > -1309,6 +1396,13 @@ new_device(int vid) } vdev->vid = vid; > > > > +for (i = 0; i < RTE_MAX_LCORE; i++) { vhost_txbuff[i * > > +MAX_VHOST_DEVICE + vid] = rte_zmalloc("vhost bufftable", > > +sizeof(struct vhost_bufftable), RTE_CACHE_LINE_SIZE); > > Rte_zmalloc() may fail. Need to handle failure. Sure, I will add failure handler in the next version. Thanks. Cheng > > > +} > > + > > if (builtin_net_driver) > > vs_vhost_net_setup(vdev); > > > > @@ -1343,12 +1437,15 @@ new_device(int vid) if (async_vhost_driver) { > > struct rte_vhost_async_features f; struct rte_vhost_async_channel_ops > > channel_ops; > > + > > if (strncmp(dma_type, "ioat", 4) == 0) { channel_ops.transfer_data = > > ioat_transfer_data_cb; channel_ops.check_completed_copies = > > ioat_check_completed_copies_cb; > > + > > f.async_inorder = 1; > > f.async_threshold = 256; > > + > > return rte_vhost_async_channel_register(vid, > > VIRTIO_RXQ, > > f.intval, &channel_ops); > > } > > @@ -1392,8 +1489,8 @@ print_stats(__rte_unused void *arg) > > tx = vdev->stats.tx; > > tx_dropped = tx_total - tx; > > > > -rx_total = rte_atomic64_read(&vdev- > > >stats.rx_total_atomic); > > -rx = rte_atomic64_read(&vdev->stats.rx_atomic); > > +rx_total = atomic_load(&vdev- > > >stats.rx_total_atomic); > > +rx = atomic_load(&vdev->stats.rx_atomic); > > rx_dropped = rx_total - rx; > > > > printf("Statistics for device %d\n" > > @@ -1592,6 +1689,7 @@ main(int argc, char *argv[]) > > /* Register vhost user driver to handle vhost messages. */ for (i = > > 0; i < nb_sockets; i++) { char *file = socket_files + i * PATH_MAX; > > + > > if (async_vhost_driver) > > flags = flags | RTE_VHOST_USER_ASYNC_COPY; > > > > diff --git a/examples/vhost/main.h b/examples/vhost/main.h index > > 4317b6ae8..6aa798a3e 100644 > > --- a/examples/vhost/main.h > > +++ b/examples/vhost/main.h > > @@ -8,6 +8,7 @@ > > #include <sys/queue.h> > > > > #include <rte_ether.h> > > +#include <stdatomic.h> > > > > /* Macros for printing using RTE_LOG */ #define > > RTE_LOGTYPE_VHOST_CONFIG RTE_LOGTYPE_USER1 @@ -21,8 +22,8 @@ > enum > > {VIRTIO_RXQ, VIRTIO_TXQ, VIRTIO_QNUM}; struct device_statistics { > > uint64_ttx; uint64_ttx_total; -rte_atomic64_trx_atomic; > > -rte_atomic64_trx_total_atomic; > > +atomic_int_least64_trx_atomic; > > +atomic_int_least64_trx_total_atomic; > > }; > > > > struct vhost_queue { > > @@ -51,7 +52,7 @@ struct vhost_dev { > > uint64_t features; > > size_t hdr_len; > > uint16_t nr_vrings; > > -uint16_t nr_async_pkts; > > +atomic_int_least16_t nr_async_pkts; > > struct rte_vhost_memory *mem; > > struct device_statistics stats; > > TAILQ_ENTRY(vhost_dev) global_vdev_entry; > > -- > > 2.29.2 >