vHost async API's allow the copy operations to be offloaded to a DMA device asynchronously for better performance.
Signed-off-by: Sunil Pai G <sunil.pa...@intel.com> --- lib/dpdk-stub.c | 6 + lib/dpdk.c | 33 ++ lib/dpdk.h | 1 + lib/dpif-netdev.c | 7 + lib/netdev-dpdk.c | 795 ++++++++++++++++++++++++++++++++++++++++++++-- lib/netdev-dpdk.h | 14 + 6 files changed, 829 insertions(+), 27 deletions(-) diff --git a/lib/dpdk-stub.c b/lib/dpdk-stub.c index 3eee1f485..e8e00abea 100644 --- a/lib/dpdk-stub.c +++ b/lib/dpdk-stub.c @@ -67,6 +67,12 @@ dpdk_vhost_postcopy_enabled(void) return false; } +bool +dpdk_vhost_async_enabled(void) +{ + return false; +} + bool dpdk_per_port_memory(void) { diff --git a/lib/dpdk.c b/lib/dpdk.c index 6886fbd9d..0ac4ca7d3 100644 --- a/lib/dpdk.c +++ b/lib/dpdk.c @@ -51,6 +51,8 @@ static char *vhost_sock_dir = NULL; /* Location of vhost-user sockets */ static bool vhost_iommu_enabled = false; /* Status of vHost IOMMU support */ static bool vhost_postcopy_enabled = false; /* Status of vHost POSTCOPY * support. */ +static bool vhost_async_copy_enabled = false; /* Status of vhost async + * support. */ static bool per_port_memory = false; /* Status of per port memory support */ /* Indicates successful initialization of DPDK. */ @@ -399,6 +401,25 @@ dpdk_init__(const struct smap *ovs_other_config) VLOG_INFO("POSTCOPY support for vhost-user-client %s.", vhost_postcopy_enabled ? "enabled" : "disabled"); + vhost_async_copy_enabled = smap_get_bool(ovs_other_config, + "vhost-async-support", false); + if (vhost_async_copy_enabled) { + if (vhost_postcopy_enabled) { + VLOG_WARN("Async-copy and post-copy are not compatible " + "for vhost-user-client. Disabling POSTCOPY support."); + vhost_postcopy_enabled = false; + } + + if (vhost_iommu_enabled) { + vhost_iommu_enabled = false; + VLOG_WARN("Async copy is not compatible with IOMMU support for" + " vhost-user-client. IOMMU support disabled."); + } + VLOG_INFO("Async support enabled for vhost-user-client."); + } else { + VLOG_INFO("Async support disabled for vhost-user-client."); + } + per_port_memory = smap_get_bool(ovs_other_config, "per-port-memory", false); VLOG_INFO("Per port memory for DPDK devices %s.", @@ -576,6 +597,12 @@ dpdk_vhost_postcopy_enabled(void) return vhost_postcopy_enabled; } +bool +dpdk_vhost_async_enabled(void) +{ + return vhost_async_copy_enabled; +} + bool dpdk_per_port_memory(void) { @@ -608,6 +635,9 @@ dpdk_attach_thread(unsigned cpu) } VLOG_INFO("PMD thread uses DPDK lcore %u.", rte_lcore_id()); + if (dpdk_vhost_async_enabled()) { + dpdk_dmadev_assign(); + } return true; } @@ -617,6 +647,9 @@ dpdk_detach_thread(void) unsigned int lcore_id; lcore_id = rte_lcore_id(); + if (dpdk_vhost_async_enabled()) { + dpdk_dmadev_free(); + } rte_thread_unregister(); VLOG_INFO("PMD thread released DPDK lcore %u.", lcore_id); } diff --git a/lib/dpdk.h b/lib/dpdk.h index 64ebca47d..806f440c5 100644 --- a/lib/dpdk.h +++ b/lib/dpdk.h @@ -41,6 +41,7 @@ void dpdk_detach_thread(void); const char *dpdk_get_vhost_sock_dir(void); bool dpdk_vhost_iommu_enabled(void); bool dpdk_vhost_postcopy_enabled(void); +bool dpdk_vhost_async_enabled(void); bool dpdk_per_port_memory(void); bool dpdk_available(void); void print_dpdk_version(void); diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index e28e0b554..c8371175e 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -6972,6 +6972,13 @@ reload: atomic_read_explicit(&pmd->reload, &reload, memory_order_acquire); if (OVS_UNLIKELY(reload)) { + /* Drain all rxq before pmd is reloaded. */ + for (i = 0; i < poll_cnt; i++) { + if (!poll_list[i].rxq_enabled) { + continue; + } + netdev_rxq_drain(poll_list[i].rxq->rx); + } break; } diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 667d758b2..f31c9f9cb 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -29,6 +29,7 @@ #include <rte_bus_pci.h> #include <rte_config.h> #include <rte_cycles.h> +#include <rte_dmadev.h> #include <rte_errno.h> #include <rte_ethdev.h> #include <rte_flow.h> @@ -36,8 +37,10 @@ #include <rte_mbuf.h> #include <rte_meter.h> #include <rte_pci.h> +#include <rte_ring.h> #include <rte_version.h> #include <rte_vhost.h> +#include <rte_vhost_async.h> #include "cmap.h" #include "coverage.h" @@ -76,6 +79,14 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20); COVERAGE_DEFINE(vhost_tx_contention); COVERAGE_DEFINE(vhost_notification); +COVERAGE_DEFINE(vhost_async_tx_poll); +COVERAGE_DEFINE(vhost_async_tx_poll_empty); +COVERAGE_DEFINE(vhost_async_rx_poll); +COVERAGE_DEFINE(vhost_async_rx_poll_empty); +COVERAGE_DEFINE(vhost_async_rx_enqueue); +COVERAGE_DEFINE(vhost_async_tx_enqueue); +COVERAGE_DEFINE(vhost_async_tx_ring_enqueue); +COVERAGE_DEFINE(vhost_async_tx_ring_dequeue); #define DPDK_PORT_WATCHDOG_INTERVAL 5 @@ -145,6 +156,9 @@ typedef uint16_t dpdk_port_t; #define IF_NAME_SZ (PATH_MAX > IFNAMSIZ ? PATH_MAX : IFNAMSIZ) +/* vHost async DMA ring size. */ +#define VHOST_ASYNC_DMA_RING_SIZE 4096 + /* List of required flags advertised by the hardware that will be used * if TSO is enabled. Ideally this should include * RTE_ETH_TX_OFFLOAD_SCTP_CKSUM. However, very few drivers support that @@ -193,6 +207,201 @@ static const struct rte_vhost_device_ops virtio_net_device_ops = .guest_notified = vhost_guest_notified, }; +#define DMADEV_ID_UNASSIGNED UINT16_MAX +#define DMADEV_ID_INVALID (UINT16_MAX - 1) + +/* For vHost async datapath, dmadev id alloation is per dataplane thread. */ +DEFINE_STATIC_PER_THREAD_DATA(uint16_t, dmadev_id, DMADEV_ID_UNASSIGNED); + +typedef struct { + bool used; + int lcore_id; +} dma_use_info; + +static dma_use_info dmadev_devices_used[RTE_DMADEV_DEFAULT_MAX]; +static struct ovs_mutex dmadev_mutex = OVS_MUTEX_INITIALIZER; + +static uint16_t +dmadev_find_free_dev(int pmd_numa_id, struct rte_dma_info *dev_info) +{ + uint16_t dmadev_id = 0; + int other_numa_dmadev_id = DMADEV_ID_INVALID; + uint64_t capab = RTE_DMA_CAPA_MEM_TO_MEM | RTE_DMA_CAPA_OPS_COPY; + + for (dmadev_id = 0; dmadev_id < rte_dma_count_avail(); dmadev_id++) { + if (!dmadev_devices_used[dmadev_id].used && + !rte_dma_info_get(dmadev_id, dev_info)) { + /* DMA device must be capable of : + * MEM to MEM COPY operation and have atleast 1 virtual channel. */ + if (!((dev_info->dev_capa & capab) && dev_info->max_vchans >= 1)) { + continue; + } + + if (dev_info->numa_node == pmd_numa_id) { + return dmadev_id; + } else if (other_numa_dmadev_id == DMADEV_ID_INVALID) { + other_numa_dmadev_id = dmadev_id; + } + } + } + + if (other_numa_dmadev_id != DMADEV_ID_INVALID) { + /* No DMA device found on same NUMA, hence + * allocating an available DMA from other NUMA. */ + rte_dma_info_get(other_numa_dmadev_id, dev_info); + return other_numa_dmadev_id; + } + + return DMADEV_ID_INVALID; +} + +static uint16_t +dmadev_get_free_dev(int pmd_core_id, int pmd_numa_id) +{ + uint16_t dmadev_id; + struct rte_dma_info dev_info = {0}; + struct rte_dma_conf dev_conf = {0}; + struct rte_dma_vchan_conf vchan_conf = {0}; + size_t ring_size = VHOST_ASYNC_DMA_RING_SIZE; + + dmadev_id = dmadev_find_free_dev(pmd_numa_id, &dev_info); + if (dmadev_id == DMADEV_ID_INVALID) { + VLOG_INFO("No available DMA device found for vhost async copy " + "offload for this pmd."); + goto out; + } + + /* Configure the device. */ + dev_conf.nb_vchans = 1; + dev_conf.enable_silent = false; + int ret = rte_dma_configure(dmadev_id, &dev_conf); + if (OVS_UNLIKELY(ret)) { + VLOG_ERR("Configure failed for DMA device with dev id: %u" + " while assigning to pmd for vhost async copy offload.", + dmadev_id); + dmadev_devices_used[dmadev_id].used = false; + dmadev_id = DMADEV_ID_INVALID; + } else { + vchan_conf.direction = RTE_DMA_DIR_MEM_TO_MEM; + vchan_conf.nb_desc = ring_size; + ret = rte_dma_vchan_setup(dmadev_id, 0, &vchan_conf); + if (ret < 0) { + VLOG_ERR("Virtual channel setup failed with err %d for" + " DMA device with dev id: %d", + ret, dmadev_id); + dmadev_devices_used[dmadev_id].used = false; + dmadev_id = DMADEV_ID_INVALID; + goto out; + } + + rte_dma_start(dmadev_id); + + if (rte_vhost_async_dma_configure(dmadev_id, 0) < 0) { + VLOG_ERR("Failed to configure DMA device %u in vhost.\n", + dmadev_id); + dmadev_devices_used[dmadev_id].used = false; + dmadev_id = DMADEV_ID_INVALID; + goto out; + } + + if (dev_info.numa_node != pmd_numa_id) { + VLOG_WARN("No available DMA device found on numa node %d," + " assigning DMA with dev id: %d on numa %d to pmd for" + " vhost async copy offload.", + pmd_numa_id, dmadev_id, + dev_info.numa_node); + } else { + VLOG_INFO("DMA device with dev id: %d assigned to pmd for vhost" + " async copy offload.", dmadev_id); + } + dmadev_devices_used[dmadev_id].used = true; + dmadev_devices_used[dmadev_id].lcore_id = pmd_core_id; + } + +out: + return dmadev_id; +} + +static uint16_t +dmadev_id_init(void) +{ + uint16_t new_id = *dmadev_id_get(); + unsigned int pmd_core_id = rte_lcore_id(); + int pmd_numa_id = ovs_numa_get_numa_id(pmd_core_id); + + new_id = *dmadev_id_get(); + + ovs_assert(new_id == DMADEV_ID_UNASSIGNED); + ovs_mutex_lock(&dmadev_mutex); + new_id = dmadev_get_free_dev(pmd_core_id, pmd_numa_id); + ovs_mutex_unlock(&dmadev_mutex); + + return *dmadev_id_get() = new_id; +} + +static uint16_t +dmadev_get_device(void) +{ + uint16_t id = *dmadev_id_get(); + + if (id == DMADEV_ID_UNASSIGNED) { + id = dmadev_id_init(); + } + return id; +} + +void +dpdk_dmadev_assign(void) +{ + dmadev_get_device(); +} + +void +dpdk_dmadev_free(void) +{ + uint16_t dmadev_id = dmadev_get_device(); + struct rte_dma_stats stats; + + if (dmadev_id == DMADEV_ID_INVALID) { + return; + } + + ovs_mutex_lock(&dmadev_mutex); + rte_dma_stats_get(dmadev_id, 0 , &stats); + rte_dma_stop(dmadev_id); + dmadev_devices_used[dmadev_id].used = false; + dmadev_devices_used[dmadev_id].lcore_id = -1; + ovs_mutex_unlock(&dmadev_mutex); + VLOG_INFO("DMADEV id %u stats: submitted: %lu, completed: %lu," + "errors: %lu\n\n", dmadev_id, stats.submitted, stats.completed, + stats.errors); + VLOG_INFO("DMA device with dev id: %d used for vhost async copy offload " + "released from pmd.", dmadev_id); + + *dmadev_id_get() = DMADEV_ID_UNASSIGNED; +} + +static void +dpdk_dmadev_dump_stats(int dmadev_id, FILE *stream) +{ + struct rte_dma_stats stats; + + rte_dma_dump(dmadev_id, stream); + rte_dma_stats_get(dmadev_id, 0 , &stats); + fprintf(stream,"DMA stats: submitted: %lu, completed: %lu," + "errors: %lu\n\n", stats.submitted, stats.completed, + stats.errors); +} + +static bool +dpdk_dmadev_has_inflight(int dmadev_id) +{ + struct rte_dma_stats dma_stats; + + rte_dma_stats_get(dmadev_id, 0 , &dma_stats); + return (dma_stats.completed + dma_stats.errors) < dma_stats.submitted; +} + /* Custom software stats for dpdk ports */ struct netdev_dpdk_sw_stats { /* No. of retries when unable to transmit. */ @@ -207,6 +416,8 @@ struct netdev_dpdk_sw_stats { uint64_t rx_qos_drops; /* Packet drops in HWOL processing. */ uint64_t tx_invalid_hwol_drops; + /* No. of packets pending to be tx'ed by async device. */ + uint64_t tx_async_inflight; }; enum dpdk_dev_type { @@ -381,6 +592,16 @@ struct dpdk_tx_queue { * It is used only if the queue is shared among different pmd threads * (see 'concurrent_txq'). */ rte_spinlock_t tx_lock; + + /* vHost asynchronous channel registration status. */ + bool is_async_reg; + + /* For vHost async to pause enqueue so that packets can be drained. */ + atomic_bool pause_enqueue; + + /* MP/SC ring to hold packets. */ + struct rte_ring *ring; + /* Mapping of configured vhost-user queue to enabled by guest. */ int map; ); @@ -475,6 +696,8 @@ struct netdev_dpdk { /* Array of vhost rxq states, see vring_state_changed. */ bool *vhost_rxq_enabled; + /* Array of vhost rxq async registration status. */ + bool *vhost_rxq_async_reg; ); PADDED_MEMBERS(CACHE_LINE_SIZE, @@ -1295,9 +1518,18 @@ vhost_common_construct(struct netdev *netdev) if (!dev->vhost_rxq_enabled) { return ENOMEM; } + + dev->vhost_rxq_async_reg = dpdk_rte_mzalloc(OVS_VHOST_MAX_QUEUE_NUM * + sizeof(bool)); + if (!dev->vhost_rxq_async_reg) { + rte_free(dev->vhost_rxq_enabled); + return ENOMEM; + } + dev->tx_q = netdev_dpdk_alloc_txq(OVS_VHOST_MAX_QUEUE_NUM); if (!dev->tx_q) { rte_free(dev->vhost_rxq_enabled); + rte_free(dev->vhost_rxq_async_reg); return ENOMEM; } @@ -1334,6 +1566,11 @@ netdev_dpdk_vhost_construct(struct netdev *netdev) /* There is no support for multi-segments buffers. */ dev->vhost_driver_flags |= RTE_VHOST_USER_LINEARBUF_SUPPORT; + + /* Enable async copy flag, if explicitly requested. */ + if (dpdk_vhost_async_enabled()) { + dev->vhost_driver_flags |= RTE_VHOST_USER_ASYNC_COPY; + } err = rte_vhost_driver_register(dev->vhost_id, dev->vhost_driver_flags); if (err) { VLOG_ERR("vhost-user socket device setup failure for socket %s\n", @@ -1417,6 +1654,49 @@ netdev_dpdk_construct(struct netdev *netdev) return err; } +/* Register the vHost async device for a queue. */ +static inline int +netdev_dpdk_vhost_async_reg(const int vid, const int qid, + const int virtq_id, const bool is_rx) +{ + int ret = -1; + + if (OVS_UNLIKELY(vid < 0)) { + return ret; + } + + ret = rte_vhost_async_channel_register_thread_unsafe(vid, virtq_id); + if (ret) { + VLOG_ERR("Async channel register failed for vid: %d, queue: %s%d " + "with status: %d", vid, is_rx ? "rxq" : "txq", qid, ret); + return ret; + } + VLOG_INFO("Async channel register success for vid: %d, queue: %s%d", + vid, is_rx ? "rxq" : "txq", qid); + return ret; +} + +static void +netdev_dpdk_vhost_clear_queue(const int vid, const int virtq_id, + struct netdev_dpdk *dev, bool is_rx); + +/* Unregister the vHost async channel for a queue. */ +static inline void +netdev_dpdk_vhost_async_unreg(const int vid, const int qid, + const int virtq_id, struct netdev_dpdk *dev, + const bool is_rx) +{ + netdev_dpdk_vhost_clear_queue(vid, virtq_id, dev, is_rx); + int ret = rte_vhost_async_channel_unregister_thread_unsafe(vid, virtq_id); + if (ret) { + VLOG_ERR("Async channel unregister failed for vid: %d, queue: %s%d " + "with status: %d", vid, is_rx ? "rxq" : "txq", qid, ret); + return; + } + VLOG_INFO("Async channel unregister success for vid: %d, queue: %s%d", + vid, is_rx ? "rxq" : "txq", qid); +} + static void common_destruct(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex) @@ -1534,6 +1814,7 @@ netdev_dpdk_vhost_destruct(struct netdev *netdev) vhost_id = dev->vhost_id; dev->vhost_id = NULL; rte_free(dev->vhost_rxq_enabled); + rte_free(dev->vhost_rxq_async_reg); common_destruct(dev); @@ -2372,6 +2653,8 @@ netdev_dpdk_vhost_update_rx_counters(struct netdev_dpdk *dev, } } +static void +netdev_dpdk_vhost_async_tx_free(struct netdev_dpdk *dev, int qid, bool drain); /* * The receive path for the vhost port is the TX path out from guest. */ @@ -2385,17 +2668,37 @@ netdev_dpdk_vhost_rxq_recv(struct netdev_rxq *rxq, uint16_t qos_drops = 0; int qid = rxq->queue_id * VIRTIO_QNUM + VIRTIO_TXQ; int vid = netdev_dpdk_get_vid(dev); + int async_inflight = 0; if (OVS_UNLIKELY(vid < 0 || !dev->vhost_reconfigured || !(dev->flags & NETDEV_UP))) { return EAGAIN; } - nb_rx = rte_vhost_dequeue_burst(vid, qid, dev->dpdk_mp->mp, - (struct rte_mbuf **) batch->packets, - NETDEV_MAX_BURST); - if (!nb_rx) { - return EAGAIN; + if (dev->vhost_rxq_async_reg[rxq->queue_id] + && dmadev_get_device() != DMADEV_ID_INVALID) { + /* Poll sibling txq packets. */ + netdev_dpdk_vhost_async_tx_free(dev, rxq->queue_id, false); + + nb_rx = rte_vhost_async_try_dequeue_burst(vid, qid, dev->dpdk_mp->mp, + (struct rte_mbuf **) + batch->packets, + NETDEV_MAX_BURST, + &async_inflight, + dmadev_get_device(), 0); + COVERAGE_INC(vhost_async_rx_poll); + if (!nb_rx) { + COVERAGE_INC(vhost_async_rx_poll_empty); + return EAGAIN; + } + COVERAGE_ADD(vhost_async_rx_enqueue, nb_rx); + } else { + nb_rx = rte_vhost_dequeue_burst(vid, qid, dev->dpdk_mp->mp, + (struct rte_mbuf **) batch->packets, + NETDEV_MAX_BURST); + if (!nb_rx) { + return EAGAIN; + } } if (qfill) { @@ -2435,6 +2738,52 @@ netdev_dpdk_vhost_rxq_enabled(struct netdev_rxq *rxq) return dev->vhost_rxq_enabled[rxq->queue_id]; } +static int +netdev_dpdk_vhost_rxq_drain(struct netdev_rxq *rxq) +{ + struct netdev_dpdk *dev = netdev_dpdk_cast(rxq->netdev); + int rx_qid = rxq->queue_id * VIRTIO_QNUM + VIRTIO_TXQ; + uint16_t async_inflight = 0, n_drops = 0; + int vid = netdev_dpdk_get_vid(dev); + int dmadev_id = dpdk_vhost_async_enabled() ? + dmadev_get_device() : DMADEV_ID_INVALID; + + if (OVS_UNLIKELY(vid < 0 || !dev->vhost_reconfigured || rx_qid < 0 + || !(dev->flags & NETDEV_UP)) + || dmadev_id == DMADEV_ID_INVALID) { + return 0; + } + + async_inflight = rte_vhost_async_get_inflight(vid, rx_qid); + VLOG_INFO("Draining Rx async inflight packets for vid: %d, qid: %u," + " inflight: %u", vid, rx_qid, async_inflight); + while ((async_inflight > 0) || dpdk_dmadev_has_inflight(dmadev_id)) { + /* If there are inflight packets left on the DMA, + * poll for completions through an RXQ. */ + struct dp_packet *inflight_pkts[NETDEV_MAX_BURST]; + n_drops = rte_vhost_clear_queue(vid, rx_qid, + (struct rte_mbuf **) + inflight_pkts, + NETDEV_MAX_BURST, + dmadev_id, 0); + if (!n_drops) { + continue; + } + rte_spinlock_lock(&dev->stats_lock); + netdev_dpdk_vhost_update_rx_counters(dev, NULL, 0, n_drops); + rte_spinlock_unlock(&dev->stats_lock); + + for (int i = 0; i < n_drops; i++) { + dp_packet_delete(inflight_pkts[i]); + } + async_inflight -= n_drops; + } + + netdev_dpdk_vhost_async_tx_free(dev, rxq->queue_id, true); + + return 0; +} + static int netdev_dpdk_rxq_recv(struct netdev_rxq *rxq, struct dp_packet_batch *batch, int *qfill) @@ -2534,14 +2883,15 @@ static inline void netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev, struct dp_packet **packets, int attempted, - struct netdev_dpdk_sw_stats *sw_stats_add) + struct netdev_dpdk_sw_stats *sw_stats_add, + bool is_sent) { int dropped = sw_stats_add->tx_mtu_exceeded_drops + sw_stats_add->tx_qos_drops + sw_stats_add->tx_failure_drops + sw_stats_add->tx_invalid_hwol_drops; struct netdev_stats *stats = &dev->stats; - int sent = attempted - dropped; + int sent = is_sent ? attempted - dropped : 0; int i; stats->tx_packets += sent; @@ -2563,27 +2913,136 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev, } static void -__netdev_dpdk_vhost_send(struct netdev *netdev, int qid, - struct dp_packet **pkts, int cnt) +netdev_dpdk_vhost_clear_queue(const int vid, const int virtq_id, + struct netdev_dpdk *dev, bool is_rx) { - struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); - struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts; - struct netdev_dpdk_sw_stats sw_stats_add; - unsigned int n_packets_to_free = cnt; - unsigned int total_packets = cnt; - int i, retries = 0; - int max_retries = VHOST_ENQ_RETRY_MIN; - int vid = netdev_dpdk_get_vid(dev); + uint16_t dma_count = rte_dma_count_avail(); + uint16_t async_inflight = 0; - qid = dev->tx_q[qid % netdev->n_txq].map; + async_inflight = rte_vhost_async_get_inflight_thread_unsafe(vid, virtq_id); + VLOG_INFO("Clearing async inflight packets for vid: %d, qid: %u," + " inflight: %u", vid, virtq_id, async_inflight); + + while (async_inflight > 0) { + ovs_mutex_lock(&dmadev_mutex); + for (int dmadev_id = 0; dmadev_id < dma_count; dmadev_id++) { + if (!dmadev_devices_used[dmadev_id].used) { + continue; + } + + struct dp_packet *inflight_pkts[NETDEV_MAX_BURST]; + uint16_t nr_dropped = rte_vhost_clear_queue_thread_unsafe(vid, + virtq_id, + (struct rte_mbuf **) + inflight_pkts, + NETDEV_MAX_BURST, + dmadev_id, 0); + rte_spinlock_lock(&dev->stats_lock); + if (is_rx) { + netdev_dpdk_vhost_update_rx_counters(dev, NULL, 0, nr_dropped); + } else { + struct netdev_dpdk_sw_stats sw_stats_add = {0}; + dev->sw_stats->tx_async_inflight -= nr_dropped; + sw_stats_add.tx_failure_drops = nr_dropped; + netdev_dpdk_vhost_update_tx_counters(dev, NULL, 0, + &sw_stats_add, false); + } + rte_spinlock_unlock(&dev->stats_lock); + + for (int i = 0; i < nr_dropped; i++) { + dp_packet_delete(inflight_pkts[i]); + } + async_inflight -= nr_dropped; + if (!async_inflight) { + return; + } + } + ovs_mutex_unlock(&dmadev_mutex); + } +} + +static unsigned int +__netdev_dpdk_vhost_send_async(struct netdev_dpdk *dev, int vid, + int qid, bool drain); + +/* Free the packets sent via the async data path. */ +static void +netdev_dpdk_vhost_async_tx_free(struct netdev_dpdk *dev, int qid, bool drain) +{ + struct netdev_dpdk_sw_stats sw_stats_add = {0}; + struct dp_packet *cmpl_pkts[NETDEV_MAX_BURST]; + uint16_t nr_xfrd_pkts = 0, vhost_qid; + int dmadev_id = dmadev_get_device(); + int vid = netdev_dpdk_get_vid(dev); + uint16_t tx_inflight = 0; + qid = dev->tx_q[qid].map; + unsigned int remaining = 0; if (OVS_UNLIKELY(vid < 0 || !dev->vhost_reconfigured || qid < 0 || !(dev->flags & NETDEV_UP))) { + return; + } + + vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; + if (drain) { + atomic_store_relaxed(&dev->tx_q[qid].pause_enqueue, true); + tx_inflight = rte_vhost_async_get_inflight(vid, vhost_qid); + VLOG_INFO("Draining Tx async inflight packets for vid: %d, qid: %d, " + "inflight: %d", vid, qid, tx_inflight); + } else { + bool pause_enqueue = false; + atomic_read_relaxed(&dev->tx_q[qid].pause_enqueue, &pause_enqueue); + if (OVS_UNLIKELY(pause_enqueue)) { + atomic_store_relaxed(&dev->tx_q[qid].pause_enqueue, false); + } + } + + do { + remaining = __netdev_dpdk_vhost_send_async(dev, vid, qid, drain); + } while (drain && remaining); + + do { + tx_inflight = rte_vhost_async_get_inflight(vid, vhost_qid); + if (tx_inflight <= 0) { + /* No more packets to free, so return. */ + return; + } + + COVERAGE_INC(vhost_async_tx_poll); + /* Get the completion status of async transfer. */ + nr_xfrd_pkts = rte_vhost_poll_enqueue_completed(vid, vhost_qid, + (struct rte_mbuf **) + cmpl_pkts, + NETDEV_MAX_BURST, + dmadev_id, 0); + + if (!nr_xfrd_pkts) { + COVERAGE_INC(vhost_async_tx_poll_empty); + continue; + } + rte_spinlock_lock(&dev->stats_lock); - dev->stats.tx_dropped+= cnt; + dev->sw_stats->tx_async_inflight -= nr_xfrd_pkts; + netdev_dpdk_vhost_update_tx_counters(dev, cmpl_pkts, nr_xfrd_pkts, + &sw_stats_add, true); rte_spinlock_unlock(&dev->stats_lock); - goto out; - } + + for (int i = 0; i < nr_xfrd_pkts; i++) { + dp_packet_delete(cmpl_pkts[i]); + } + } while (drain); +} + +static void +__netdev_dpdk_vhost_send_sync(struct netdev_dpdk *dev, int vid, int qid, + struct dp_packet **pkts, int cnt) +{ + struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts; + struct netdev_dpdk_sw_stats sw_stats_add = {0}; + unsigned int n_packets_to_free = cnt; + unsigned int total_packets = cnt; + int i, retries = 0; + int max_retries = VHOST_ENQ_RETRY_MIN; if (OVS_UNLIKELY(!rte_spinlock_trylock(&dev->tx_q[qid].tx_lock))) { COVERAGE_INC(vhost_tx_contention); @@ -2605,13 +3064,14 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt, true); sw_stats_add.tx_qos_drops -= cnt; + sw_stats_add.tx_async_inflight = 0; n_packets_to_free = cnt; do { int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; unsigned int tx_pkts; - tx_pkts = rte_vhost_enqueue_burst(vid, vhost_qid, cur_pkts, cnt); + if (OVS_LIKELY(tx_pkts)) { /* Packets have been sent.*/ cnt -= tx_pkts; @@ -2629,7 +3089,6 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, break; } } while (cnt && (retries++ < max_retries)); - rte_spinlock_unlock(&dev->tx_q[qid].tx_lock); sw_stats_add.tx_failure_drops = cnt; @@ -2637,9 +3096,166 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, rte_spinlock_lock(&dev->stats_lock); netdev_dpdk_vhost_update_tx_counters(dev, pkts, total_packets, - &sw_stats_add); + &sw_stats_add, true); + rte_spinlock_unlock(&dev->stats_lock); + + for (i = 0; i < n_packets_to_free; i++) { + dp_packet_delete(pkts[i]); + } +} + +static void +__netdev_dpdk_vhost_submit_lockless(struct netdev_dpdk *dev, int qid, + struct dp_packet **pkts, int cnt) +{ + + struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts; + struct netdev_dpdk_sw_stats sw_stats_add = {0}; + int enqueues, i; + + enqueues = rte_ring_enqueue_bulk(dev->tx_q[qid].ring, (void **) cur_pkts, + cnt, NULL); + COVERAGE_ADD(vhost_async_tx_ring_enqueue, enqueues); + sw_stats_add.tx_failure_drops = cnt - enqueues; + + rte_spinlock_lock(&dev->stats_lock); + netdev_dpdk_vhost_update_tx_counters(dev, pkts, cnt, &sw_stats_add, false); + rte_spinlock_unlock(&dev->stats_lock); + + for (i = enqueues; i < cnt; i++) { + dp_packet_delete(pkts[i]); + } +} + +static unsigned int +__netdev_dpdk_vhost_send_async(struct netdev_dpdk *dev, int vid, + int qid, bool drain) +{ + struct netdev_dpdk_sw_stats sw_stats_add = {0}; + struct dp_packet *pkts[NETDEV_MAX_BURST]; + unsigned int n_packets_to_free = 0; + struct rte_mbuf **cur_pkts = NULL; + unsigned int total_packets = 0; + unsigned int num_remaining = 0; + int i, retries = 0; + int max_retries = VHOST_ENQ_RETRY_MIN; + int free_start_idx = 0; + int cnt = 0; + + cnt = rte_ring_dequeue_burst(dev->tx_q[qid].ring, (void **) pkts, + NETDEV_MAX_BURST, &num_remaining); + if (!cnt) { + goto out; + } + COVERAGE_ADD(vhost_async_tx_ring_dequeue, cnt); + if (drain) { + sw_stats_add.tx_failure_drops = cnt; + rte_spinlock_lock(&dev->stats_lock); + netdev_dpdk_vhost_update_tx_counters(dev, NULL, 0, + &sw_stats_add, false); + rte_spinlock_unlock(&dev->stats_lock); + goto out; + } + + cur_pkts = (struct rte_mbuf **) pkts; + n_packets_to_free = total_packets = cnt; + sw_stats_add.tx_invalid_hwol_drops = cnt; + if (userspace_tso_enabled()) { + cnt = netdev_dpdk_prep_hwol_batch(dev, cur_pkts, cnt); + } + + sw_stats_add.tx_invalid_hwol_drops -= cnt; + sw_stats_add.tx_mtu_exceeded_drops = cnt; + cnt = netdev_dpdk_filter_packet_len(dev, cur_pkts, cnt); + sw_stats_add.tx_mtu_exceeded_drops -= cnt; + + /* Check has QoS has been configured for the netdev */ + sw_stats_add.tx_qos_drops = cnt; + cnt = netdev_dpdk_qos_run(dev, cur_pkts, cnt, true); + sw_stats_add.tx_qos_drops -= cnt; + + sw_stats_add.tx_async_inflight = 0; + n_packets_to_free = cnt; + + do { + int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ; + unsigned int tx_pkts; + tx_pkts = rte_vhost_submit_enqueue_burst(vid, vhost_qid, cur_pkts, cnt, + dmadev_get_device(), 0); + sw_stats_add.tx_async_inflight += tx_pkts; + + if (OVS_LIKELY(tx_pkts)) { + /* Packets have been sent.*/ + cnt -= tx_pkts; + /* Prepare for possible retry.*/ + cur_pkts = &cur_pkts[tx_pkts]; + if (OVS_UNLIKELY(cnt && !retries)) { + /* + * Read max retries as there are packets not sent + * and no retries have already occurred. + */ + atomic_read_relaxed(&dev->vhost_tx_retries_max, &max_retries); + } + } else { + /* No packets sent - do not retry.*/ + break; + } + } while (cnt && (retries++ < max_retries)); + + sw_stats_add.tx_failure_drops = cnt; + sw_stats_add.tx_retries = MIN(retries, max_retries); + + rte_spinlock_lock(&dev->stats_lock); + dev->sw_stats->tx_async_inflight += sw_stats_add.tx_async_inflight; + netdev_dpdk_vhost_update_tx_counters(dev, pkts, total_packets, + &sw_stats_add, false); rte_spinlock_unlock(&dev->stats_lock); + /* Since dropped packets are at the end of the burst, + * update index to delete the packets dropped in current burst. */ + free_start_idx = sw_stats_add.tx_async_inflight; + COVERAGE_ADD(vhost_async_tx_enqueue, free_start_idx); +out: + for (i = free_start_idx; i < n_packets_to_free; i++) { + dp_packet_delete(pkts[i]); + } + return num_remaining; +} + +static void +__netdev_dpdk_vhost_send(struct netdev *netdev, int qid, + struct dp_packet **pkts, int cnt, bool dpdk_buf) +{ + struct netdev_dpdk *dev = netdev_dpdk_cast(netdev); + int vid = netdev_dpdk_get_vid(dev); + bool is_async = false, pause_enqueue = false; + int i, n_packets_to_free = cnt; + + qid = dev->tx_q[qid % netdev->n_txq].map; + + if (OVS_UNLIKELY(vid < 0 || !dev->vhost_reconfigured || qid < 0 + || !(dev->flags & NETDEV_UP))) { + rte_spinlock_lock(&dev->stats_lock); + dev->stats.tx_dropped+= cnt; + rte_spinlock_unlock(&dev->stats_lock); + goto out; + } + + is_async = dev->tx_q[qid].is_async_reg && dpdk_buf + && (dmadev_get_device() != DMADEV_ID_INVALID); + + if (is_async) { + atomic_read_relaxed(&dev->tx_q[qid].pause_enqueue, &pause_enqueue); + if (OVS_UNLIKELY(pause_enqueue)) { + goto out; + } + __netdev_dpdk_vhost_submit_lockless(dev, qid, pkts, cnt); + return; + } else { + __netdev_dpdk_vhost_send_sync(dev, vid, qid, pkts, cnt); + return; + } + out: for (i = 0; i < n_packets_to_free; i++) { dp_packet_delete(pkts[i]); @@ -2803,7 +3419,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) if (OVS_LIKELY(txcnt)) { if (dev->type == DPDK_DEV_VHOST) { - __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt); + __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt, false); } else { tx_failure += netdev_dpdk_eth_tx_burst(dev, qid, (struct rte_mbuf **)pkts, @@ -2833,7 +3449,7 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid, dp_packet_delete_batch(batch, true); } else { __netdev_dpdk_vhost_send(netdev, qid, batch->packets, - dp_packet_batch_size(batch)); + dp_packet_batch_size(batch), true); } return 0; } @@ -3226,7 +3842,8 @@ netdev_dpdk_get_sw_custom_stats(const struct netdev *netdev, SW_CSTAT(tx_mtu_exceeded_drops) \ SW_CSTAT(tx_qos_drops) \ SW_CSTAT(rx_qos_drops) \ - SW_CSTAT(tx_invalid_hwol_drops) + SW_CSTAT(tx_invalid_hwol_drops) \ + SW_CSTAT(tx_async_inflight) #define SW_CSTAT(NAME) + 1 custom_stats->size = SW_CSTATS; @@ -3882,6 +4499,63 @@ out: netdev_close(netdev); } +static void +netdev_dpdk_get_dma_info(struct unixctl_conn *conn, + int argc OVS_UNUSED, + const char *argv[] OVS_UNUSED, + void *aux OVS_UNUSED) +{ + size_t size; + FILE *stream; + char *response = NULL; + int lcore_id = -1; + + if (argc == 2) { + lcore_id = (int) strtol(argv[1], NULL, 0); + if (lcore_id > RTE_MAX_LCORE) { + unixctl_command_reply_error(conn, "Not a valid lcore."); + goto out; + } + } + + stream = open_memstream(&response, &size); + if (!stream) { + response = xasprintf("Unable to open memstream: %s.", + ovs_strerror(errno)); + unixctl_command_reply_error(conn, response); + goto out; + } + + if (lcore_id != -1) { + ovs_mutex_lock(&dmadev_mutex); + for (int i = 0; i < rte_dma_count_avail(); i++) { + if (dmadev_devices_used[i].used && + (lcore_id == dmadev_devices_used[i].lcore_id)) { + fprintf(stream,"lcore id: %d, dma id: %d\n", + dmadev_devices_used[i].lcore_id, i); + dpdk_dmadev_dump_stats(i, stream); + break; + } + } + ovs_mutex_unlock(&dmadev_mutex); + } else { + ovs_mutex_lock(&dmadev_mutex); + for (int i = 0; i < rte_dma_count_avail(); i++) { + if (dmadev_devices_used[i].used) { + fprintf(stream,"lcore id: %d, dma id: %d\n", + dmadev_devices_used[i].lcore_id, i); + dpdk_dmadev_dump_stats(i, stream); + } + } + ovs_mutex_unlock(&dmadev_mutex); + } + fclose(stream); + unixctl_command_reply(conn, response); +out: + free(response); +} + + /* * Set virtqueue flags so that we do not receive interrupts. */ @@ -4047,6 +4721,8 @@ destroy_device(int vid) ovsrcu_index_set(&dev->vid, -1); memset(dev->vhost_rxq_enabled, 0, dev->up.n_rxq * sizeof *dev->vhost_rxq_enabled); + memset(dev->vhost_rxq_async_reg, 0, + dev->up.n_rxq * sizeof *dev->vhost_rxq_async_reg); netdev_dpdk_txq_map_clear(dev); netdev_change_seq_changed(&dev->up); @@ -4075,6 +4751,28 @@ destroy_device(int vid) } } +static int +netdev_dpdk_vhost_async_tx_ring_create(struct netdev_dpdk *dev, int qid) +{ + char r_name[RTE_RING_NAMESIZE]; + snprintf(r_name, sizeof(r_name), "%s_txq%d", dev->up.name, qid); + dev->tx_q[qid].ring = rte_ring_create(r_name, + 512, + rte_socket_id(), + RING_F_MP_HTS_ENQ | RING_F_SC_DEQ); + if (!dev->tx_q[qid].ring) { + return -1; + } + return 0; +} + +static void +netdev_dpdk_vhost_async_tx_ring_free(struct netdev_dpdk *dev, int qid) +{ + rte_ring_free(dev->tx_q[qid].ring); + VLOG_INFO("Cleared MP/SC ring for device %s, qid: %d", dev->up.name , qid); +} + static int vring_state_changed(int vid, uint16_t queue_id, int enable) { @@ -4094,13 +4792,46 @@ vring_state_changed(int vid, uint16_t queue_id, int enable) bool old_state = dev->vhost_rxq_enabled[qid]; dev->vhost_rxq_enabled[qid] = enable != 0; + if (enable) { + if ((dev->vhost_driver_flags & RTE_VHOST_USER_ASYNC_COPY) + && !dev->vhost_rxq_async_reg[qid]) { + if (!netdev_dpdk_vhost_async_reg(vid, qid, queue_id, + is_rx)) { + if (!netdev_dpdk_vhost_async_tx_ring_create(dev, + qid)) { + dev->vhost_rxq_async_reg[qid] = true; + } + } + } + } else { + if ((dev->vhost_driver_flags & RTE_VHOST_USER_ASYNC_COPY) + && dev->vhost_rxq_async_reg[qid]) { + netdev_dpdk_vhost_async_unreg(vid, qid, queue_id, dev, + is_rx); + dev->vhost_rxq_async_reg[qid] = false; + netdev_dpdk_vhost_async_tx_ring_free(dev, qid); + } + } if (old_state != dev->vhost_rxq_enabled[qid]) { netdev_change_seq_changed(&dev->up); } } else { if (enable) { dev->tx_q[qid].map = qid; + if ((dev->vhost_driver_flags & RTE_VHOST_USER_ASYNC_COPY) + && !dev->tx_q[qid].is_async_reg) { + if (!netdev_dpdk_vhost_async_reg(vid, qid, queue_id, + is_rx)) { + dev->tx_q[qid].is_async_reg = true; + } + } } else { + if ((dev->vhost_driver_flags & RTE_VHOST_USER_ASYNC_COPY) + && dev->tx_q[qid].is_async_reg) { + netdev_dpdk_vhost_async_unreg(vid, qid, queue_id, dev, + is_rx); + dev->tx_q[qid].is_async_reg = false; + } dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED; } netdev_dpdk_remap_txqs(dev); @@ -4220,6 +4951,9 @@ netdev_dpdk_class_init(void) unixctl_command_register("netdev-dpdk/get-mempool-info", "[netdev]", 0, 1, netdev_dpdk_get_mempool_info, NULL); + unixctl_command_register("netdev-dpdk/get-dmadev-info", + "[lcore_id]", 0, 1, + netdev_dpdk_get_dma_info, NULL); ret = rte_eth_dev_callback_register(RTE_ETH_ALL, RTE_ETH_EVENT_INTR_RESET, @@ -5107,6 +5841,11 @@ netdev_dpdk_vhost_client_reconfigure(struct netdev *netdev) vhost_flags |= RTE_VHOST_USER_POSTCOPY_SUPPORT; } + /* Enable async copy flag, if explicitly requested. */ + if (dpdk_vhost_async_enabled()) { + vhost_flags |= RTE_VHOST_USER_ASYNC_COPY; + } + /* Enable External Buffers if TCP Segmentation Offload is enabled. */ if (userspace_tso_enabled()) { vhost_flags |= RTE_VHOST_USER_EXTBUF_SUPPORT; @@ -5448,6 +6187,7 @@ static const struct netdev_class dpdk_vhost_class = { .reconfigure = netdev_dpdk_vhost_reconfigure, .rxq_recv = netdev_dpdk_vhost_rxq_recv, .rxq_enabled = netdev_dpdk_vhost_rxq_enabled, + .rxq_drain = netdev_dpdk_vhost_rxq_drain, }; static const struct netdev_class dpdk_vhost_client_class = { @@ -5464,6 +6204,7 @@ static const struct netdev_class dpdk_vhost_client_class = { .reconfigure = netdev_dpdk_vhost_client_reconfigure, .rxq_recv = netdev_dpdk_vhost_rxq_recv, .rxq_enabled = netdev_dpdk_vhost_rxq_enabled, + .rxq_drain = netdev_dpdk_vhost_rxq_drain, }; void diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h index 699be3fb4..f361d9a0f 100644 --- a/lib/netdev-dpdk.h +++ b/lib/netdev-dpdk.h @@ -30,6 +30,8 @@ struct netdev; void netdev_dpdk_register(void); void free_dpdk_buf(struct dp_packet *); +void dpdk_dmadev_assign(void); +void dpdk_dmadev_free(void); bool netdev_dpdk_flow_api_supported(struct netdev *); @@ -160,6 +162,18 @@ free_dpdk_buf(struct dp_packet *buf OVS_UNUSED) /* Nothing */ } +static inline void +dpdk_dmadev_assign(void) +{ + /* Nothing. */ +} + +static inline void +dpdk_dmadev_free(void) +{ + /* Nothing. */ +} + #endif #endif /* netdev-dpdk.h */ -- 2.25.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev