Hi Jiayu,
This is a first review, I need to spend more time on the series to
understand it well. Do you have a prototype of the OVS part, so that it
helps us to grasp how the full integration would look like?
On 11/22/21 11:54, Jiayu Hu wrote:
Since dmadev is introduced in 21.11, to avoid the overhead of vhost DMA
abstraction layer and simplify application logics, this patch integrates
dmadev in asynchronous data path.
Signed-off-by: Jiayu Hu <jiayu...@intel.com>
Signed-off-by: Sunil Pai G <sunil.pa...@intel.com>
---
doc/guides/prog_guide/vhost_lib.rst | 63 ++++----
examples/vhost/ioat.c | 218 ----------------------------
examples/vhost/ioat.h | 63 --------
examples/vhost/main.c | 144 +++++++++++++++---
examples/vhost/main.h | 12 ++
examples/vhost/meson.build | 6 +-
lib/vhost/meson.build | 3 +-
lib/vhost/rte_vhost_async.h | 73 +++-------
lib/vhost/vhost.c | 37 ++---
lib/vhost/vhost.h | 45 +++++-
lib/vhost/virtio_net.c | 198 ++++++++++++++++++++-----
11 files changed, 410 insertions(+), 452 deletions(-)
delete mode 100644 examples/vhost/ioat.c
delete mode 100644 examples/vhost/ioat.h
diff --git a/doc/guides/prog_guide/vhost_lib.rst
b/doc/guides/prog_guide/vhost_lib.rst
index 76f5d303c9..32969a1c41 100644
--- a/doc/guides/prog_guide/vhost_lib.rst
+++ b/doc/guides/prog_guide/vhost_lib.rst
@@ -113,8 +113,8 @@ The following is an overview of some key Vhost API
functions:
the async capability. Only packets enqueued/dequeued by async APIs are
processed through the async data path.
- Currently this feature is only implemented on split ring enqueue data
- path.
+ Currently this feature is only implemented on split and packed ring
+ enqueue data path.
That's not related to the topic of this patch, you may move it in a
dedicated patch in v1.
It is disabled by default.
@@ -218,11 +218,10 @@ The following is an overview of some key Vhost API functions:
Enable or disable zero copy feature of the vhost crypto backend.
-* ``rte_vhost_async_channel_register(vid, queue_id, config, ops)``
+* ``rte_vhost_async_channel_register(vid, queue_id, config)``
Register an async copy device channel for a vhost queue after vring
- is enabled. Following device ``config`` must be specified together
- with the registration:
+ is enabled.
* ``features``
@@ -235,21 +234,7 @@ The following is an overview of some key Vhost API functions:
Currently, only ``RTE_VHOST_ASYNC_INORDER`` capable device is
supported by vhost.
- Applications must provide following ``ops`` callbacks for vhost lib to
- work with the async copy devices:
-
- * ``transfer_data(vid, queue_id, descs, opaque_data, count)``
-
- vhost invokes this function to submit copy data to the async devices.
- For non-async_inorder capable devices, ``opaque_data`` could be used
- for identifying the completed packets.
-
- * ``check_completed_copies(vid, queue_id, opaque_data, max_packets)``
-
- vhost invokes this function to get the copy data completed by async
- devices.
-
-* ``rte_vhost_async_channel_register_thread_unsafe(vid, queue_id, config,
ops)``
+* ``rte_vhost_async_channel_register_thread_unsafe(vid, queue_id, config)``
Register an async copy device channel for a vhost queue without
performing any locking.
@@ -277,18 +262,13 @@ The following is an overview of some key Vhost API
functions:
This function is only safe to call in vhost callback functions
(i.e., struct rte_vhost_device_ops).
-* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, comp_pkts, comp_count)``
+* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, dma_id,
dma_vchan)``
Submit an enqueue request to transmit ``count`` packets from host to guest
- by async data path. Successfully enqueued packets can be transfer completed
- or being occupied by DMA engines; transfer completed packets are returned in
- ``comp_pkts``, but others are not guaranteed to finish, when this API
- call returns.
-
- Applications must not free the packets submitted for enqueue until the
- packets are completed.
+ by async data path. Applications must not free the packets submitted for
+ enqueue until the packets are completed.
-* ``rte_vhost_poll_enqueue_completed(vid, queue_id, pkts, count)``
+* ``rte_vhost_poll_enqueue_completed(vid, queue_id, pkts, count, dma_id,
dma_vchan)``
Poll enqueue completion status from async data path. Completed packets
are returned to applications through ``pkts``.
@@ -298,7 +278,7 @@ The following is an overview of some key Vhost API
functions:
This function returns the amount of in-flight packets for the vhost
queue using async acceleration.
-* ``rte_vhost_clear_queue_thread_unsafe(vid, queue_id, **pkts, count)``
+* ``rte_vhost_clear_queue_thread_unsafe(vid, queue_id, **pkts, count, dma_id,
dma_vchan)``
Clear inflight packets which are submitted to DMA engine in vhost async data
path. Completed packets are returned to applications through ``pkts``.
@@ -442,3 +422,26 @@ Finally, a set of device ops is defined for device
specific operations:
* ``get_notify_area``
Called to get the notify area info of the queue.
+
+Vhost asynchronous data path
+----------------------------
+
+Vhost asynchronous data path leverages DMA devices to offload memory
+copies from the CPU and it is implemented in an asynchronous way. It
+enables applcations, like OVS, to save CPU cycles and hide memory copy
+overhead, thus achieving higher throughput.
+
+Vhost doesn't manage DMA devices and applications, like OVS, need to
+manage and configure DMA devices. Applications need to tell vhost what
+DMA devices to use in every data path function call. This design enables
+the flexibility for applications to dynamically use DMA channels in
+different function modules, not limited in vhost.
+
+In addition, vhost supports M:N mapping between vrings and DMA virtual
+channels. Specifically, one vring can use multiple different DMA channels
+and one DMA channel can be shared by multiple vrings at the same time.
+The reason of enabling one vring to use multiple DMA channels is that
+it's possible that more than one dataplane threads enqueue packets to
+the same vring with their own DMA virtual channels. Besides, the number
+of DMA devices is limited. For the purpose of scaling, it's necessary to
+support sharing DMA channels among vrings.
diff --git a/examples/vhost/ioat.c b/examples/vhost/ioat.c
deleted file mode 100644
index 9aeeb12fd9..0000000000
--- a/examples/vhost/ioat.c
+++ /dev/null
Nice to see platform-specific code not being necessary for the
application.
diff --git a/examples/vhost/main.c b/examples/vhost/main.c
index 33d023aa39..16a02b9219 100644
--- a/examples/vhost/main.c
+++ b/examples/vhost/main.c
@@ -24,8 +24,9 @@
#include <rte_ip.h>
#include <rte_tcp.h>
#include <rte_pause.h>
+#include <rte_dmadev.h>
+#include <rte_vhost_async.h>
-#include "ioat.h"
#include "main.h"
#ifndef MAX_QUEUES
@@ -57,6 +58,11 @@
#define INVALID_PORT_ID 0xFF
+#define MAX_VHOST_DEVICE 1024
+#define DMA_RING_SIZE 4096
+
+struct dma_for_vhost dma_bind[MAX_VHOST_DEVICE];
+
/* mask of enabled ports */
static uint32_t enabled_port_mask = 0;
@@ -199,10 +205,113 @@ struct vhost_bufftable *vhost_txbuff[RTE_MAX_LCORE * MAX_VHOST_DEVICE];
static inline int
open_dma(const char *value)
{
- if (dma_type != NULL && strncmp(dma_type, "ioat", 4) == 0)
- return open_ioat(value);
+ struct dma_for_vhost *dma_info = dma_bind;
+ char *input = strndup(value, strlen(value) + 1);
+ char *addrs = input;
+ char *ptrs[2];
+ char *start, *end, *substr;
+ int64_t vid, vring_id;
+
+ struct rte_dma_info info;
+ struct rte_dma_conf dev_config = { .nb_vchans = 1 };
+ struct rte_dma_vchan_conf qconf = {
+ .direction = RTE_DMA_DIR_MEM_TO_MEM,
+ .nb_desc = DMA_RING_SIZE
+ };
+
+ int dev_id;
+ int ret = 0;
+ uint16_t i = 0;
+ char *dma_arg[MAX_VHOST_DEVICE];
+ int args_nr;
+
+ while (isblank(*addrs))
+ addrs++;
+ if (*addrs == '\0') {
+ ret = -1;
+ goto out;
+ }
+
+ /* process DMA devices within bracket. */
+ addrs++;
+ substr = strtok(addrs, ";]");
+ if (!substr) {
+ ret = -1;
+ goto out;
+ }
+
+ args_nr = rte_strsplit(substr, strlen(substr),
+ dma_arg, MAX_VHOST_DEVICE, ',');
+ if (args_nr <= 0) {
+ ret = -1;
+ goto out;
+ }
+
+ while (i < args_nr) {
+ char *arg_temp = dma_arg[i];
+ uint8_t sub_nr;
+
+ sub_nr = rte_strsplit(arg_temp, strlen(arg_temp), ptrs, 2, '@');
+ if (sub_nr != 2) {
+ ret = -1;
+ goto out;
+ }
+
+ start = strstr(ptrs[0], "txd");
+ if (start == NULL) {
+ ret = -1;
+ goto out;
+ }
+
+ start += 3;
+ vid = strtol(start, &end, 0);
+ if (end == start) {
+ ret = -1;
+ goto out;
+ }
+
+ vring_id = 0 + VIRTIO_RXQ;
+
+ dev_id = rte_dma_get_dev_id_by_name(ptrs[1]);
+ if (dev_id < 0) {
+ RTE_LOG(ERR, VHOST_CONFIG, "Fail to find DMA %s.\n",
ptrs[1]);
+ ret = -1;
+ goto out;
+ }
+
+ if (rte_dma_configure(dev_id, &dev_config) != 0) {
+ RTE_LOG(ERR, VHOST_CONFIG, "Fail to configure DMA
%d.\n", dev_id);
+ ret = -1;
+ goto out;
+ }
+
+ if (rte_dma_vchan_setup(dev_id, 0, &qconf) != 0) {
+ RTE_LOG(ERR, VHOST_CONFIG, "Fail to set up DMA %d.\n",
dev_id);
+ ret = -1;
+ goto out;
+ }
- return -1;
+ rte_dma_info_get(dev_id, &info);
+ if (info.nb_vchans != 1) {
+ RTE_LOG(ERR, VHOST_CONFIG, "DMA %d has no queues.\n",
dev_id);
+ ret = -1;
+ goto out;
+ }
+
+ if (rte_dma_start(dev_id) != 0) {
+ RTE_LOG(ERR, VHOST_CONFIG, "Fail to start DMA %u.\n",
dev_id);
+ ret = -1;
+ goto out;
+ }
+
+ (dma_info + vid)->dmas[vring_id].dev_id = dev_id;
+ (dma_info + vid)->dmas[vring_id].is_valid = true;
This is_valid field is never used AFAICT, my understanding is that it
had been added to differentiate between not valid and first dmadev,
where dev_id will be both zero. Either make use of is_valid in the code,
or change dev_id to int, and initialize it with -1 value.
+ dma_info->nr++;
+ i++;
+ }
+out:
+ free(input);
+ return ret;
}
/*
@@ -841,9 +950,10 @@ complete_async_pkts(struct vhost_dev *vdev)
{
struct rte_mbuf *p_cpl[MAX_PKT_BURST];
uint16_t complete_count;
+ uint16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
complete_count = rte_vhost_poll_enqueue_completed(vdev->vid,
- VIRTIO_RXQ, p_cpl, MAX_PKT_BURST);
+ VIRTIO_RXQ, p_cpl, MAX_PKT_BURST,
dma_id, 0);
if (complete_count) {
free_pkts(p_cpl, complete_count);
__atomic_sub_fetch(&vdev->pkts_inflight, complete_count,
__ATOMIC_SEQ_CST);
@@ -880,6 +990,7 @@ drain_vhost(struct vhost_dev *vdev)
uint32_t buff_idx = rte_lcore_id() * MAX_VHOST_DEVICE + vdev->vid;
uint16_t nr_xmit = vhost_txbuff[buff_idx]->len;
struct rte_mbuf **m = vhost_txbuff[buff_idx]->m_table;
+ uint16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
if (builtin_net_driver) {
ret = vs_enqueue_pkts(vdev, VIRTIO_RXQ, m, nr_xmit);
@@ -887,7 +998,7 @@ drain_vhost(struct vhost_dev *vdev)
uint16_t enqueue_fail = 0;
complete_async_pkts(vdev);
- ret = rte_vhost_submit_enqueue_burst(vdev->vid, VIRTIO_RXQ, m,
nr_xmit);
+ ret = rte_vhost_submit_enqueue_burst(vdev->vid, VIRTIO_RXQ, m,
nr_xmit, dma_id, 0);
__atomic_add_fetch(&vdev->pkts_inflight, ret, __ATOMIC_SEQ_CST);
enqueue_fail = nr_xmit - ret;
@@ -1213,10 +1324,11 @@ drain_eth_rx(struct vhost_dev *vdev)
pkts, rx_count);
} else if (async_vhost_driver) {
uint16_t enqueue_fail = 0;
+ uint16_t dma_id = dma_bind[vdev->vid].dmas[VIRTIO_RXQ].dev_id;
complete_async_pkts(vdev);
enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
- VIRTIO_RXQ, pkts, rx_count);
+ VIRTIO_RXQ, pkts, rx_count, dma_id, 0);
__atomic_add_fetch(&vdev->pkts_inflight, enqueue_count,
__ATOMIC_SEQ_CST);
enqueue_fail = rx_count - enqueue_count;
@@ -1389,11 +1501,12 @@ destroy_device(int vid)
if (async_vhost_driver) {
uint16_t n_pkt = 0;
+ uint16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
struct rte_mbuf *m_cpl[vdev->pkts_inflight];
while (vdev->pkts_inflight) {
n_pkt = rte_vhost_clear_queue_thread_unsafe(vid,
VIRTIO_RXQ,
- m_cpl, vdev->pkts_inflight);
+ m_cpl, vdev->pkts_inflight,
dma_id, 0);
free_pkts(m_cpl, n_pkt);
__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt,
__ATOMIC_SEQ_CST);
}
@@ -1470,18 +1583,10 @@ new_device(int vid)
if (async_vhost_driver) {
struct rte_vhost_async_config config = {0};
- struct rte_vhost_async_channel_ops channel_ops;
-
- if (dma_type != NULL && strncmp(dma_type, "ioat", 4) == 0) {
- channel_ops.transfer_data = ioat_transfer_data_cb;
- channel_ops.check_completed_copies =
- ioat_check_completed_copies_cb;
- config.features = RTE_VHOST_ASYNC_INORDER;
+ config.features = RTE_VHOST_ASYNC_INORDER;
- return rte_vhost_async_channel_register(vid, VIRTIO_RXQ,
- config, &channel_ops);
- }
+ return rte_vhost_async_channel_register(vid, VIRTIO_RXQ,
config);
}
return 0;
@@ -1505,11 +1610,12 @@ vring_state_changed(int vid, uint16_t queue_id, int
enable)
if (async_vhost_driver) {
if (!enable) {
uint16_t n_pkt = 0;
+ uint16_t dma_id = dma_bind[vid].dmas[VIRTIO_RXQ].dev_id;
struct rte_mbuf *m_cpl[vdev->pkts_inflight];
while (vdev->pkts_inflight) {
n_pkt =
rte_vhost_clear_queue_thread_unsafe(vid, queue_id,
- m_cpl,
vdev->pkts_inflight);
+ m_cpl,
vdev->pkts_inflight, dma_id, 0);
free_pkts(m_cpl, n_pkt);
__atomic_sub_fetch(&vdev->pkts_inflight, n_pkt,
__ATOMIC_SEQ_CST);
}
diff --git a/examples/vhost/main.h b/examples/vhost/main.h
index e7b1ac60a6..609fb406aa 100644
--- a/examples/vhost/main.h
+++ b/examples/vhost/main.h
@@ -8,6 +8,7 @@
#include <sys/queue.h>
#include <rte_ether.h>
+#include <rte_pci.h>
/* Macros for printing using RTE_LOG */
#define RTE_LOGTYPE_VHOST_CONFIG RTE_LOGTYPE_USER1
@@ -79,6 +80,17 @@ struct lcore_info {
struct vhost_dev_tailq_list vdev_list;
};
+struct dma_info {
+ struct rte_pci_addr addr;
+ uint16_t dev_id;
+ bool is_valid;
+};
+
+struct dma_for_vhost {
+ struct dma_info dmas[RTE_MAX_QUEUES_PER_PORT * 2];
+ uint16_t nr;
+};
+
/* we implement non-extra virtio net features */
#define VIRTIO_NET_FEATURES 0
diff --git a/examples/vhost/meson.build b/examples/vhost/meson.build
index 3efd5e6540..87a637f83f 100644
--- a/examples/vhost/meson.build
+++ b/examples/vhost/meson.build
@@ -12,13 +12,9 @@ if not is_linux
endif
deps += 'vhost'
+deps += 'dmadev'
allow_experimental_apis = true
sources = files(
'main.c',
'virtio_net.c',
)
-
-if dpdk_conf.has('RTE_RAW_IOAT')
- deps += 'raw_ioat'
- sources += files('ioat.c')
-endif
diff --git a/lib/vhost/meson.build b/lib/vhost/meson.build
index cdb37a4814..8107329400 100644
--- a/lib/vhost/meson.build
+++ b/lib/vhost/meson.build
@@ -33,7 +33,8 @@ headers = files(
'rte_vhost_async.h',
'rte_vhost_crypto.h',
)
+
driver_sdk_headers = files(
'vdpa_driver.h',
)
-deps += ['ethdev', 'cryptodev', 'hash', 'pci']
+deps += ['ethdev', 'cryptodev', 'hash', 'pci', 'dmadev']
diff --git a/lib/vhost/rte_vhost_async.h b/lib/vhost/rte_vhost_async.h
index a87ea6ba37..0594ae5fc5 100644
--- a/lib/vhost/rte_vhost_async.h
+++ b/lib/vhost/rte_vhost_async.h
@@ -36,48 +36,6 @@ struct rte_vhost_async_status {
uintptr_t *dst_opaque_data;
};
-/**
- * dma operation callbacks to be implemented by applications
- */
-struct rte_vhost_async_channel_ops {
- /**
- * instruct async engines to perform copies for a batch of packets
- *
- * @param vid
- * id of vhost device to perform data copies
- * @param queue_id
- * queue id to perform data copies
- * @param iov_iter
- * an array of IOV iterators
- * @param opaque_data
- * opaque data pair sending to DMA engine
- * @param count
- * number of elements in the "descs" array
- * @return
- * number of IOV iterators processed, negative value means error
- */
- int32_t (*transfer_data)(int vid, uint16_t queue_id,
- struct rte_vhost_iov_iter *iov_iter,
- struct rte_vhost_async_status *opaque_data,
- uint16_t count);
- /**
- * check copy-completed packets from the async engine
- * @param vid
- * id of vhost device to check copy completion
- * @param queue_id
- * queue id to check copy completion
- * @param opaque_data
- * buffer to receive the opaque data pair from DMA engine
- * @param max_packets
- * max number of packets could be completed
- * @return
- * number of async descs completed, negative value means error
- */
- int32_t (*check_completed_copies)(int vid, uint16_t queue_id,
- struct rte_vhost_async_status *opaque_data,
- uint16_t max_packets);
-};
-
/**
* async channel features
*/
@@ -102,15 +60,12 @@ struct rte_vhost_async_config {
* vhost queue id async channel to be attached to
* @param config
* Async channel configuration structure
- * @param ops
- * Async channel operation callbacks
* @return
* 0 on success, -1 on failures
*/
__rte_experimental
int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
- struct rte_vhost_async_config config,
- struct rte_vhost_async_channel_ops *ops);
+ struct rte_vhost_async_config config);
/**
* Unregister an async channel for a vhost queue
@@ -136,8 +91,6 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t
queue_id);
* vhost device id async channel to be attached to
* @param queue_id
* vhost queue id async channel to be attached to
- * @param config
- * Async channel configuration
* @param ops
* Async channel operation callbacks
* @return
@@ -145,8 +98,7 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t
queue_id);
*/
__rte_experimental
int rte_vhost_async_channel_register_thread_unsafe(int vid, uint16_t queue_id,
- struct rte_vhost_async_config config,
- struct rte_vhost_async_channel_ops *ops);
+ struct rte_vhost_async_config config);
/**
* Unregister an async channel for a vhost queue without performing any
@@ -179,12 +131,17 @@ int rte_vhost_async_channel_unregister_thread_unsafe(int
vid,
* array of packets to be enqueued
* @param count
* packets num to be enqueued
+ * @param dma_id
+ * the identifier of the DMA device
+ * @param dma_vchan
+ * the identifier of virtual DMA channel
* @return
* num of packets enqueued
*/
__rte_experimental
uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count);
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan);
/**
* This function checks async completion status for a specific vhost
@@ -199,12 +156,17 @@ uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t
queue_id,
* blank array to get return packet pointer
* @param count
* size of the packet array
+ * @param dma_id
+ * the identifier of the DMA device
+ * @param dma_vchan
+ * the identifier of virtual DMA channel
* @return
* num of packets returned
*/
__rte_experimental
uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count);
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan);
/**
* This function returns the amount of in-flight packets for the vhost
@@ -235,11 +197,16 @@ int rte_vhost_async_get_inflight(int vid, uint16_t
queue_id);
* Blank array to get return packet pointer
* @param count
* Size of the packet array
+ * @param dma_id
+ * the identifier of the DMA device
+ * @param dma_vchan
+ * the identifier of virtual DMA channel
* @return
* Number of packets returned
*/
__rte_experimental
uint16_t rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count);
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan);
#endif /* _RTE_VHOST_ASYNC_H_ */
diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c
index 13a9bb9dd1..595cf63b8d 100644
--- a/lib/vhost/vhost.c
+++ b/lib/vhost/vhost.c
@@ -344,6 +344,7 @@ vhost_free_async_mem(struct vhost_virtqueue *vq)
return;
rte_free(vq->async->pkts_info);
+ rte_free(vq->async->pkts_cmpl_flag);
rte_free(vq->async->buffers_packed);
vq->async->buffers_packed = NULL;
@@ -1626,8 +1627,7 @@ rte_vhost_extern_callback_register(int vid,
}
static __rte_always_inline int
-async_channel_register(int vid, uint16_t queue_id,
- struct rte_vhost_async_channel_ops *ops)
+async_channel_register(int vid, uint16_t queue_id)
{
struct virtio_net *dev = get_device(vid);
struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
@@ -1656,6 +1656,14 @@ async_channel_register(int vid, uint16_t queue_id,
goto out_free_async;
}
+ async->pkts_cmpl_flag = rte_zmalloc_socket(NULL, vq->size * sizeof(bool),
+ RTE_CACHE_LINE_SIZE, node);
+ if (!async->pkts_cmpl_flag) {
+ VHOST_LOG_CONFIG(ERR, "failed to allocate async pkts_cmpl_flag (vid
%d, qid: %d)\n",
+ vid, queue_id);
+ goto out_free_async;
+ }
+
if (vq_is_packed(dev)) {
async->buffers_packed = rte_malloc_socket(NULL,
vq->size * sizeof(struct
vring_used_elem_packed),
@@ -1676,9 +1684,6 @@ async_channel_register(int vid, uint16_t queue_id,
}
}
- async->ops.check_completed_copies = ops->check_completed_copies;
- async->ops.transfer_data = ops->transfer_data;
-
vq->async = async;
return 0;
@@ -1692,14 +1697,13 @@ async_channel_register(int vid, uint16_t queue_id,
int
rte_vhost_async_channel_register(int vid, uint16_t queue_id,
- struct rte_vhost_async_config config,
- struct rte_vhost_async_channel_ops *ops)
+ struct rte_vhost_async_config config)
{
struct vhost_virtqueue *vq;
struct virtio_net *dev = get_device(vid);
int ret;
- if (dev == NULL || ops == NULL)
+ if (dev == NULL)
return -1;
if (queue_id >= VHOST_MAX_VRING)
@@ -1717,12 +1721,8 @@ rte_vhost_async_channel_register(int vid, uint16_t
queue_id,
return -1;
}
- if (unlikely(ops->check_completed_copies == NULL ||
- ops->transfer_data == NULL))
- return -1;
-
rte_spinlock_lock(&vq->access_lock);
- ret = async_channel_register(vid, queue_id, ops);
+ ret = async_channel_register(vid, queue_id);
rte_spinlock_unlock(&vq->access_lock);
return ret;
@@ -1730,13 +1730,12 @@ rte_vhost_async_channel_register(int vid, uint16_t
queue_id,
int
rte_vhost_async_channel_register_thread_unsafe(int vid, uint16_t queue_id,
- struct rte_vhost_async_config config,
- struct rte_vhost_async_channel_ops *ops)
+ struct rte_vhost_async_config config)
{
struct vhost_virtqueue *vq;
struct virtio_net *dev = get_device(vid);
- if (dev == NULL || ops == NULL)
+ if (dev == NULL)
return -1;
if (queue_id >= VHOST_MAX_VRING)
@@ -1754,11 +1753,7 @@ rte_vhost_async_channel_register_thread_unsafe(int vid,
uint16_t queue_id,
return -1;
}
- if (unlikely(ops->check_completed_copies == NULL ||
- ops->transfer_data == NULL))
- return -1;
-
- return async_channel_register(vid, queue_id, ops);
+ return async_channel_register(vid, queue_id);
}
int
diff --git a/lib/vhost/vhost.h b/lib/vhost/vhost.h
index 7085e0885c..974e495b56 100644
--- a/lib/vhost/vhost.h
+++ b/lib/vhost/vhost.h
@@ -51,6 +51,11 @@
#define VHOST_MAX_ASYNC_IT (MAX_PKT_BURST)
#define VHOST_MAX_ASYNC_VEC 2048
+/* DMA device copy operation tracking ring size. */
+#define VHOST_ASYNC_DMA_TRACK_RING_SIZE (uint32_t)4096
How is this value chosen? Is that specific to your hardware?
+#define VHOST_ASYNC_DMA_TRACK_RING_MASK (VHOST_ASYNC_DMA_TRACK_RING_SIZE - 1)
+#define VHOST_ASYNC_DMA_BATCHING_SIZE 32
+
#define PACKED_DESC_ENQUEUE_USED_FLAG(w) \
((w) ? (VRING_DESC_F_AVAIL | VRING_DESC_F_USED | VRING_DESC_F_WRITE) : \
VRING_DESC_F_WRITE)
@@ -119,6 +124,29 @@ struct vring_used_elem_packed {
uint32_t count;
};
+struct async_dma_info {
+ /* circular array to track copy metadata */
+ bool *metadata[VHOST_ASYNC_DMA_TRACK_RING_SIZE];
+
+ /* batching copies before a DMA doorbell */
+ uint16_t nr_batching;
+
+ /**
+ * DMA virtual channel lock. Although it is able to bind DMA
+ * virtual channels to data plane threads, vhost control plane
+ * thread could call data plane functions too, thus causing
+ * DMA device contention.
+ *
+ * For example, in VM exit case, vhost control plane thread needs
+ * to clear in-flight packets before disable vring, but there could
+ * be anotther data plane thread is enqueuing packets to the same
+ * vring with the same DMA virtual channel. But dmadev PMD functions
+ * are lock-free, so the control plane and data plane threads
+ * could operate the same DMA virtual channel at the same time.
+ */
+ rte_spinlock_t dma_lock;
+};
+
/**
* inflight async packet information
*/
@@ -129,9 +157,6 @@ struct async_inflight_info {
};
struct vhost_async {
- /* operation callbacks for DMA */
- struct rte_vhost_async_channel_ops ops;
-
struct rte_vhost_iov_iter iov_iter[VHOST_MAX_ASYNC_IT];
struct rte_vhost_iovec iovec[VHOST_MAX_ASYNC_VEC];
uint16_t iter_idx;
@@ -139,8 +164,22 @@ struct vhost_async {
/* data transfer status */
struct async_inflight_info *pkts_info;
+ /**
+ * packet reorder array. "true" indicates that DMA
+ * device completes all copies for the packet.
+ *
+ * Note that this arry could be written by multiple
array
+ * threads at the same time. For example, two threads
+ * enqueue packets to the same virtqueue with their
+ * own DMA devices. However, since offloading is
+ * per-packet basis, each packet flag will only be
+ * written by one thread. And single byte write is
+ * atomic, so no lock is needed.
+ */
+ bool *pkts_cmpl_flag;
uint16_t pkts_idx;
uint16_t pkts_inflight_n;
+
union {
struct vring_used_elem *descs_split;
struct vring_used_elem_packed *buffers_packed;
diff --git a/lib/vhost/virtio_net.c b/lib/vhost/virtio_net.c
index b3d954aab4..95ecfeb64b 100644
--- a/lib/vhost/virtio_net.c
+++ b/lib/vhost/virtio_net.c
@@ -11,6 +11,7 @@
#include <rte_net.h>
#include <rte_ether.h>
#include <rte_ip.h>
+#include <rte_dmadev.h>
#include <rte_vhost.h>
#include <rte_tcp.h>
#include <rte_udp.h>
@@ -25,6 +26,9 @@
#define MAX_BATCH_LEN 256
+/* DMA device copy operation tracking array. */
+static struct async_dma_info dma_copy_track[RTE_DMADEV_DEFAULT_MAX];
+
static __rte_always_inline bool
rxvq_is_mergeable(struct virtio_net *dev)
{
@@ -43,6 +47,108 @@ is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t
nr_vring)
return (is_tx ^ (idx & 1)) == 0 && idx < nr_vring;
}
+static uint16_t
+vhost_async_dma_transfer(struct vhost_virtqueue *vq, uint16_t dma_id,
+ uint16_t dma_vchan, uint16_t head_idx,
+ struct rte_vhost_iov_iter *pkts, uint16_t nr_pkts)
+{
+ struct async_dma_info *dma_info = &dma_copy_track[dma_id];
+ uint16_t dma_space_left = rte_dma_burst_capacity(dma_id, 0);
+ uint16_t pkt_idx = 0;
+
+ rte_spinlock_lock(&dma_info->dma_lock);
+
+ while (pkt_idx < nr_pkts) {
A for loop would be prefered here.
+ struct rte_vhost_iovec *iov = pkts[pkt_idx].iov;
+ int copy_idx = 0;
+ uint16_t nr_segs = pkts[pkt_idx].nr_segs;
+ uint16_t i;
+
+ if (unlikely(dma_space_left < nr_segs)) {
+ goto out;
+ }
+
+ for (i = 0; i < nr_segs; i++) {
+ copy_idx = rte_dma_copy(dma_id, dma_vchan,
+ (rte_iova_t)iov[i].src_addr,
+ (rte_iova_t)iov[i].dst_addr,
+ iov[i].len, RTE_DMA_OP_FLAG_LLC);
+ if (unlikely(copy_idx < 0)) {
+ VHOST_LOG_DATA(ERR, "DMA device %u (%u) copy
failed\n",
+ dma_id, dma_vchan);
+ dma_info->nr_batching += i;
+ goto out;
+ }
+
+ dma_info->metadata[copy_idx &
VHOST_ASYNC_DMA_TRACK_RING_MASK] = NULL;
+ }
+
+ /**
+ * Only store packet completion flag address in the last copy's
+ * slot, and other slots are set to NULL.
+ */
+ dma_info->metadata[copy_idx & VHOST_ASYNC_DMA_TRACK_RING_MASK] =
+ &vq->async->pkts_cmpl_flag[head_idx % vq->size];
+
+ dma_info->nr_batching += nr_segs;
+ if (unlikely(dma_info->nr_batching >
VHOST_ASYNC_DMA_BATCHING_SIZE)) {
+ rte_dma_submit(dma_id, 0);
+ dma_info->nr_batching = 0;
+ }
+
+ dma_space_left -= nr_segs;
+ pkt_idx++;
+ head_idx++;
+ }
+
+out:
+ if (dma_info->nr_batching > 0) {
+ rte_dma_submit(dma_id, 0);
+ dma_info->nr_batching = 0;
+ }
+ rte_spinlock_unlock(&dma_info->dma_lock);
At a first sight, that looks like a lot of thing being done while the
spinlock is held. But maybe there will be only contention in some corner
cases?
+
+ return pkt_idx;
+}
+
+static uint16_t
+vhost_async_dma_check_completed(uint16_t dma_id, uint16_t dma_vchan, uint16_t
max_pkts)
+{
+ struct async_dma_info *dma_info = &dma_copy_track[dma_id];
+ uint16_t last_idx = 0;
+ uint16_t nr_copies;
+ uint16_t copy_idx;
+ uint16_t i;
+
+ rte_spinlock_lock(&dma_info->dma_lock);
+
+ nr_copies = rte_dma_completed(dma_id, dma_vchan, max_pkts, &last_idx,
NULL);
+ if (nr_copies == 0) {
+ goto out;
+ }
+
+ copy_idx = last_idx - nr_copies + 1;
+ for (i = 0; i < nr_copies; i++) {
+ bool *flag;
+
+ flag = dma_info->metadata[copy_idx &
VHOST_ASYNC_DMA_TRACK_RING_MASK];
+ if (flag) {
+ /**
+ * Mark the packet flag as received. The flag
+ * could belong to another virtqueue but write
+ * is atomic.
+ */
+ *flag = true;
+ dma_info->metadata[copy_idx &
VHOST_ASYNC_DMA_TRACK_RING_MASK] = NULL;
+ }
+ copy_idx++;
+ }
+
+out:
+ rte_spinlock_unlock(&dma_info->dma_lock);
+ return nr_copies;
+}
+
static inline void
do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
{
@@ -1451,7 +1557,8 @@ store_dma_desc_info_packed(struct vring_used_elem_packed
*s_ring,
static __rte_noinline uint32_t
virtio_dev_rx_async_submit_split(struct virtio_net *dev,
struct vhost_virtqueue *vq, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct buf_vector buf_vec[BUF_VECTOR_MAX];
uint32_t pkt_idx = 0;
@@ -1463,6 +1570,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
uint32_t pkt_err = 0;
int32_t n_xfer;
uint16_t slot_idx = 0;
+ uint16_t head_idx = async->pkts_idx & (vq->size - 1);
/*
* The ordering between avail index and desc reads need to be enforced.
@@ -1503,17 +1611,16 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
if (unlikely(pkt_idx == 0))
return 0;
- n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
- if (unlikely(n_xfer < 0)) {
- VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id
%d.\n",
- dev->vid, __func__, queue_id);
- n_xfer = 0;
- }
+ n_xfer = vhost_async_dma_transfer(vq, dma_id, dma_vchan, head_idx,
async->iov_iter,
+ pkt_idx);
pkt_err = pkt_idx - n_xfer;
if (unlikely(pkt_err)) {
uint16_t num_descs = 0;
+ VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer %u packets for queue %u.\n",
+ dev->vid, __func__, pkt_err, queue_id);
+
/* update number of completed packets */
pkt_idx = n_xfer;
@@ -1658,11 +1765,12 @@ dma_error_handler_packed(struct vhost_virtqueue *vq, uint16_t slot_idx,
static __rte_noinline uint32_t
virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
struct vhost_virtqueue *vq, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
uint32_t pkt_idx = 0;
uint32_t remained = count;
- int32_t n_xfer;
+ uint16_t n_xfer;
uint16_t num_buffers;
uint16_t num_descs;
@@ -1670,6 +1778,7 @@ virtio_dev_rx_async_submit_packed(struct virtio_net *dev,
struct async_inflight_info *pkts_info = async->pkts_info;
uint32_t pkt_err = 0;
uint16_t slot_idx = 0;
+ uint16_t head_idx = async->pkts_idx % vq->size;
do {
rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
@@ -1694,19 +1803,17 @@ virtio_dev_rx_async_submit_packed(struct virtio_net
*dev,
if (unlikely(pkt_idx == 0))
return 0;
- n_xfer = async->ops.transfer_data(dev->vid, queue_id, async->iov_iter, 0, pkt_idx);
- if (unlikely(n_xfer < 0)) {
- VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer data for queue id
%d.\n",
- dev->vid, __func__, queue_id);
- n_xfer = 0;
- }
-
- pkt_err = pkt_idx - n_xfer;
+ n_xfer = vhost_async_dma_transfer(vq, dma_id, dma_vchan, head_idx,
+ async->iov_iter, pkt_idx);
async_iter_reset(async);
- if (unlikely(pkt_err))
+ pkt_err = pkt_idx - n_xfer;
+ if (unlikely(pkt_err)) {
+ VHOST_LOG_DATA(ERR, "(%d) %s: failed to transfer %u packets for
queue %u.\n",
+ dev->vid, __func__, pkt_err, queue_id);
dma_error_handler_packed(vq, slot_idx, pkt_err, &pkt_idx);
+ }
if (likely(vq->shadow_used_idx)) {
/* keep used descriptors. */
@@ -1826,28 +1933,37 @@ write_back_completed_descs_packed(struct
vhost_virtqueue *vq,
static __rte_always_inline uint16_t
vhost_poll_enqueue_completed(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count)
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct vhost_virtqueue *vq = dev->virtqueue[queue_id];
struct vhost_async *async = vq->async;
struct async_inflight_info *pkts_info = async->pkts_info;
- int32_t n_cpl;
+ uint16_t nr_cpl_copies, nr_cpl_pkts = 0;
uint16_t n_descs = 0, n_buffers = 0;
uint16_t start_idx, from, i;
- n_cpl = async->ops.check_completed_copies(dev->vid, queue_id, 0, count);
- if (unlikely(n_cpl < 0)) {
- VHOST_LOG_DATA(ERR, "(%d) %s: failed to check completed copies for
queue id %d.\n",
- dev->vid, __func__, queue_id);
+ nr_cpl_copies = vhost_async_dma_check_completed(dma_id, dma_vchan,
count);
+ if (nr_cpl_copies == 0)
return 0;
- }
- if (n_cpl == 0)
- return 0;
+ /**
+ * The order of updating packet completion flag needs to be
+ * enforced.
+ */
+ rte_atomic_thread_fence(__ATOMIC_RELEASE);
start_idx = async_get_first_inflight_pkt_idx(vq);
- for (i = 0; i < n_cpl; i++) {
+ /* Calculate the number of copy completed packets */
+ from = start_idx;
+ while (vq->async->pkts_cmpl_flag[from]) {
+ vq->async->pkts_cmpl_flag[from] = false;
+ from = (from + 1) % vq->size;
+ nr_cpl_pkts++;
+ }
+
+ for (i = 0; i < nr_cpl_pkts; i++) {
from = (start_idx + i) % vq->size;
/* Only used with packed ring */
n_buffers += pkts_info[from].nr_buffers;
@@ -1856,7 +1972,7 @@ vhost_poll_enqueue_completed(struct virtio_net *dev,
uint16_t queue_id,
pkts[i] = pkts_info[from].mbuf;
}
- async->pkts_inflight_n -= n_cpl;
+ async->pkts_inflight_n -= nr_cpl_pkts;
if (likely(vq->enabled && vq->access_ok)) {
if (vq_is_packed(dev)) {
@@ -1877,12 +1993,13 @@ vhost_poll_enqueue_completed(struct virtio_net *dev,
uint16_t queue_id,
}
}
- return n_cpl;
+ return nr_cpl_pkts;
}
uint16_t
rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count)
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct virtio_net *dev = get_device(vid);
struct vhost_virtqueue *vq;
@@ -1908,7 +2025,7 @@ rte_vhost_poll_enqueue_completed(int vid, uint16_t
queue_id,
rte_spinlock_lock(&vq->access_lock);
- n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
+ n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count,
dma_id, dma_vchan);
rte_spinlock_unlock(&vq->access_lock);
@@ -1917,7 +2034,8 @@ rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
uint16_t
rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count)
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct virtio_net *dev = get_device(vid);
struct vhost_virtqueue *vq;
@@ -1941,14 +2059,15 @@ rte_vhost_clear_queue_thread_unsafe(int vid, uint16_t
queue_id,
return 0;
}
- n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count);
+ n_pkts_cpl = vhost_poll_enqueue_completed(dev, queue_id, pkts, count,
dma_id, dma_vchan);
return n_pkts_cpl;
}
static __rte_always_inline uint32_t
virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
- struct rte_mbuf **pkts, uint32_t count)
+ struct rte_mbuf **pkts, uint32_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct vhost_virtqueue *vq;
uint32_t nb_tx = 0;
@@ -1980,10 +2099,10 @@ virtio_dev_rx_async_submit(struct virtio_net *dev,
uint16_t queue_id,
if (vq_is_packed(dev))
nb_tx = virtio_dev_rx_async_submit_packed(dev, vq, queue_id,
- pkts, count);
+ pkts, count, dma_id, dma_vchan);
else
nb_tx = virtio_dev_rx_async_submit_split(dev, vq, queue_id,
- pkts, count);
+ pkts, count, dma_id, dma_vchan);
out:
if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1997,7 +2116,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev,
uint16_t queue_id,
uint16_t
rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
- struct rte_mbuf **pkts, uint16_t count)
+ struct rte_mbuf **pkts, uint16_t count, uint16_t dma_id,
+ uint16_t dma_vchan)
{
struct virtio_net *dev = get_device(vid);
@@ -2011,7 +2131,7 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
return 0;
}
- return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+ return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, dma_id,
dma_vchan);
}
static inline bool