This patch adds the initial support for DPDK vHost
async API's to offload the memory copy operations to the hardware.

Signed-off-by: Sunil Pai G <sunil.pa...@intel.com>
---
 lib/dpdk-stub.c   |   6 +
 lib/dpdk.c        |  13 ++
 lib/dpdk.h        |   1 +
 lib/dpif-netdev.c |  19 +-
 lib/netdev-dpdk.c | 548 +++++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.h |   3 +
 6 files changed, 579 insertions(+), 11 deletions(-)

diff --git a/lib/dpdk-stub.c b/lib/dpdk-stub.c
index b7d577870..b9272cbed 100644
--- a/lib/dpdk-stub.c
+++ b/lib/dpdk-stub.c
@@ -62,6 +62,12 @@ dpdk_vhost_postcopy_enabled(void)
     return false;
 }
 
+bool
+dpdk_vhost_async_copy_enabled(void)
+{
+    return false;
+}
+
 bool
 dpdk_per_port_memory(void)
 {
diff --git a/lib/dpdk.c b/lib/dpdk.c
index 319540394..586479bfb 100644
--- a/lib/dpdk.c
+++ b/lib/dpdk.c
@@ -49,6 +49,8 @@ static char *vhost_sock_dir = NULL;   /* Location of 
vhost-user sockets */
 static bool vhost_iommu_enabled = false; /* Status of vHost IOMMU support */
 static bool vhost_postcopy_enabled = false; /* Status of vHost POSTCOPY
                                              * support. */
+static bool vhost_async_copy_enabled = false; /* Status of vHost async copy
+                                             * support. */
 static bool dpdk_initialized = false; /* Indicates successful initialization
                                        * of DPDK. */
 static bool per_port_memory = false; /* Status of per port memory support */
@@ -413,6 +415,11 @@ dpdk_init__(const struct smap *ovs_other_config)
     VLOG_INFO("POSTCOPY support for vhost-user-client %s.",
               vhost_postcopy_enabled ? "enabled" : "disabled");
 
+    vhost_async_copy_enabled = smap_get_bool(ovs_other_config,
+                                            "vhost-async-copy-support", false);
+    VLOG_INFO("Async copy support for vhost-user-client %s.",
+              vhost_async_copy_enabled ? "enabled" : "disabled");
+
     per_port_memory = smap_get_bool(ovs_other_config,
                                     "per-port-memory", false);
     VLOG_INFO("Per port memory for DPDK devices %s.",
@@ -588,6 +595,12 @@ dpdk_vhost_postcopy_enabled(void)
     return vhost_postcopy_enabled;
 }
 
+bool
+dpdk_vhost_async_copy_enabled(void)
+{
+    return vhost_async_copy_enabled;
+}
+
 bool
 dpdk_per_port_memory(void)
 {
diff --git a/lib/dpdk.h b/lib/dpdk.h
index 445a51d06..b767c1728 100644
--- a/lib/dpdk.h
+++ b/lib/dpdk.h
@@ -40,6 +40,7 @@ void dpdk_set_lcore_id(unsigned cpu);
 const char *dpdk_get_vhost_sock_dir(void);
 bool dpdk_vhost_iommu_enabled(void);
 bool dpdk_vhost_postcopy_enabled(void);
+bool dpdk_vhost_async_copy_enabled(void);
 bool dpdk_per_port_memory(void);
 bool dpdk_available(void);
 void print_dpdk_version(void);
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 02df8f11e..bb8eba32b 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -4621,6 +4621,7 @@ dp_netdev_pmd_flush_output_on_port(struct 
dp_netdev_pmd_thread *pmd,
 
     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
     dp_packet_batch_init(&p->output_pkts);
+    netdev_dpdk_vhost_async_free_pkts(p->port->netdev, tx_qid, dynamic_txqs);
 
     /* Update time of the next flush. */
     atomic_read_relaxed(&pmd->dp->tx_flush_interval, &tx_flush_interval);
@@ -4650,9 +4651,11 @@ dp_netdev_pmd_flush_output_packets(struct 
dp_netdev_pmd_thread *pmd,
                                    bool force)
 {
     struct tx_port *p;
-    int output_cnt = 0;
+    int output_cnt = 0, tx_qid;
+    bool concurrent_txq = false;
+    const bool is_async = dpdk_vhost_async_copy_enabled();
 
-    if (!pmd->n_output_batches) {
+    if (!pmd->n_output_batches && !is_async) {
         return 0;
     }
 
@@ -4660,6 +4663,18 @@ dp_netdev_pmd_flush_output_packets(struct 
dp_netdev_pmd_thread *pmd,
         if (!dp_packet_batch_is_empty(&p->output_pkts)
             && (force || pmd->ctx.now >= p->flush_time)) {
             output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
+        } else {
+            /* Free the remaining packets sent via vHost async path. */
+            if (is_async) {
+                concurrent_txq = p->port->dynamic_txqs;
+                if (concurrent_txq) {
+                    tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p);
+                } else {
+                    tx_qid = pmd->static_tx_qid;
+                }
+                netdev_dpdk_vhost_async_free_pkts(p->port->netdev, tx_qid,
+                                                  concurrent_txq);
+            }
         }
     }
     return output_cnt;
diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index a62a75b87..15edfa18d 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -36,8 +36,11 @@
 #include <rte_mbuf.h>
 #include <rte_meter.h>
 #include <rte_pci.h>
+#include <rte_rawdev.h>
 #include <rte_version.h>
 #include <rte_vhost.h>
+#include <rte_vhost_async.h>
+#include <rte_ioat_rawdev.h>
 
 #include "cmap.h"
 #include "coverage.h"
@@ -145,6 +148,13 @@ typedef uint16_t dpdk_port_t;
 
 #define IF_NAME_SZ (PATH_MAX > IFNAMSIZ ? PATH_MAX : IFNAMSIZ)
 
+/* vHost async IOAT max threshold size. */
+#define VHOST_ASYNC_MAX_THRESOLD 4096
+/* vHost async IOAT ring size. */
+#define VHOST_ASYNC_IOAT_RING_SIZE 4096
+/* vHost async IOAT max enqueue size. */
+#define VHOST_ASYNC_MAX_ENQUEUED_SIZE VHOST_ASYNC_IOAT_RING_SIZE
+
 /* List of required flags advertised by the hardware that will be used
  * if TSO is enabled. Ideally this should include DEV_TX_OFFLOAD_SCTP_CKSUM.
  * However, very few drivers supports that the moment and SCTP is not a
@@ -192,6 +202,27 @@ static const struct vhost_device_ops virtio_net_device_ops 
=
     .guest_notified = vhost_guest_notified,
 };
 
+/*
+ * vHost async callbacks.
+ */
+static uint32_t
+ioat_transfer_data_cb(int vid,
+                      uint16_t queue_id,
+                      struct rte_vhost_async_desc *descs,
+                      struct rte_vhost_async_status *opaque_data,
+                      uint16_t count);
+static uint32_t
+ioat_check_completed_copies_cb(int vid,
+                               uint16_t queue_id,
+                               struct rte_vhost_async_status *opaque_data,
+                               uint16_t max_packets);
+
+/* Async channel operations for vhost ports. */
+static struct rte_vhost_async_channel_ops vhost_async_chnl_ops = {
+        .transfer_data = ioat_transfer_data_cb,
+        .check_completed_copies = ioat_check_completed_copies_cb
+};
+
 /* Custom software stats for dpdk ports */
 struct netdev_dpdk_sw_stats {
     /* No. of retries when unable to transmit. */
@@ -371,6 +402,40 @@ struct dpdk_mp {
      struct ovs_list list_node OVS_GUARDED_BY(dpdk_mp_mutex);
  };
 
+struct seg_info {
+    uint16_t count;
+    bool is_incompl;
+};
+
+/* Structure to keep track of packets to free in case of async use case. */
+struct async_pkt_tracker {
+    struct seg_info seg_info[VHOST_ASYNC_MAX_ENQUEUED_SIZE];
+    unsigned short seg_info_count;
+    unsigned short next_read;
+    unsigned short next_write;
+    unsigned short last_remain;
+};
+
+struct netdev_vhost_async {
+        /* vHost async mode data validity. */
+        bool is_valid;
+
+        /* vHost async channel registered. */
+        bool is_reg;
+
+        /* number of packets remaining to be freed. */
+        uint64_t nr_async_pkts;
+
+        /* vHost async device feature bits. */
+        struct rte_vhost_async_features feat;
+
+        /* vHost async device id. */
+        uint16_t dev_id;
+
+        /* vHost async packet tracker. */
+        struct async_pkt_tracker pkt_tracker;
+};
+
 /* There should be one 'struct dpdk_tx_queue' created for
  * each netdev tx queue. */
 struct dpdk_tx_queue {
@@ -380,6 +445,10 @@ struct dpdk_tx_queue {
          * It is used only if the queue is shared among different pmd threads
          * (see 'concurrent_txq'). */
         rte_spinlock_t tx_lock;
+
+        /* vhost async attributes. */
+        struct netdev_vhost_async vhost_async;
+
         /* Mapping of configured vhost-user queue to enabled by guest. */
         int map;
     );
@@ -2012,6 +2081,138 @@ out:
     return err;
 }
 
+/* Start IOAT channel for the given device id. */
+static inline int
+netdev_dpdk_vhost_async_start_chnl(uint16_t dev_id)
+{
+    int ret = 0;
+    struct rte_ioat_rawdev_config rawdev_config = {0};
+    struct rte_rawdev_info rawdev_info = { .dev_private = &rawdev_config };
+
+    /* Fetch the device info. */
+    ret = rte_rawdev_info_get(dev_id, &rawdev_info, sizeof(rawdev_config));
+    if ( ret != 0 ||
+        (strstr(rawdev_info.driver_name, "ioat") == NULL)) {
+        VLOG_ERR("rte_rawdev_info_get for device id %u failed\n", dev_id);
+        goto out;
+    }
+
+    /* Configure the device. */
+    rawdev_config.ring_size = VHOST_ASYNC_IOAT_RING_SIZE;
+    rawdev_config.hdls_disable = true;
+    ret = rte_rawdev_configure(dev_id, &rawdev_info, sizeof(rawdev_config));
+    if (ret != 0) {
+        VLOG_ERR("rte_rawdev_configure for device id %u failed\n", dev_id);
+        goto out;
+    }
+
+    /* Start the device. */
+    ret = rte_rawdev_start(dev_id);
+    if (ret != 0) {
+        VLOG_ERR("rte_rawdev_start failed for device id %u\n", dev_id);
+        goto out;
+    }
+
+out:
+    return ret;
+}
+
+/* Close IOAT channel for given device id. */
+static inline void
+netdev_dpdk_vhost_async_stop_chnl(uint16_t dev_id)
+{
+    rte_rawdev_stop(dev_id);
+}
+
+/* Parse the vhost async attributes. */
+static int
+netdev_dpdk_vhost_async_parse_attr(const char *vhost_async_attr,
+                                   struct netdev *netdev)
+{
+    int ret = 0;
+    int64_t qid;
+    uint16_t dev_id;
+    char *str_start;
+    char *str_end;
+    char *str_queue_attr;
+    char *queue_attrs[3];
+    char name[32] = {0};
+    char *dup_attr_str = strndup(vhost_async_attr,
+                                 strlen(vhost_async_attr) + 1);
+    char *in_str = dup_attr_str;
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct rte_vhost_async_features async_features = {0};
+    struct rte_pci_addr pci_addr = {0};
+
+    /* Process parameters txq , device PCI and async threshold */
+    for (int i = 0; (str_queue_attr = strtok_r(in_str, ")", &in_str)); i++) {
+        if (i != 0) {
+            str_queue_attr += 2;
+        } else {
+            str_queue_attr++;
+        }
+
+        /* Split the string into three parts */
+        rte_strsplit(str_queue_attr, strlen(str_queue_attr),
+                     queue_attrs, 3, ',');
+
+        /* Get the txq id */
+        str_start = strstr(queue_attrs[0], "txq");
+        if (str_start == NULL) {
+            ret = -1;
+            goto out;
+        }
+
+        /* Skip "txq" to get qid */
+        str_start += 3;
+        qid = strtol(str_start, &str_end, 0);
+
+        if (str_end == str_start || (qid < 0)
+            || (qid > OVS_VHOST_MAX_QUEUE_NUM)) {
+            ret = -1;
+            goto out;
+        }
+
+        /* Get the PCI address of the device to use */
+        ret = rte_pci_addr_parse(queue_attrs[1], &pci_addr);
+        if (ret != 0) {
+            VLOG_ERR("rte_pci_addr_parse for device %s failed\n",name);
+            goto out;
+        }
+
+        rte_pci_device_name(&pci_addr, name, sizeof(name));
+        dev_id = rte_rawdev_get_dev_id(name);
+        if (dev_id == (uint16_t)(-ENODEV) ||
+            dev_id == (uint16_t)(-EINVAL)) {
+            VLOG_ERR("rte_rawdev_get_dev_id for device %s failed\n",name);
+            ret = dev_id;
+            goto out;
+        }
+        /* assuming that OVS is going to use the queues sequentially
+        when qemu requests */
+        dev->tx_q[qid].vhost_async.dev_id = dev_id;
+
+        /* Get the async threshold */
+        async_features.async_threshold = strtol(queue_attrs[2], &str_end, 0);
+        if ((str_end == queue_attrs[2]) || (async_features.async_threshold < 0)
+             || (async_features.async_threshold > VHOST_ASYNC_MAX_THRESOLD)) {
+            VLOG_ERR("async threshold provided %d greater than limit %d\n",
+                      async_features.async_threshold,
+                      VHOST_ASYNC_MAX_THRESOLD);
+            ret = -1;
+            goto out;
+        }
+
+        async_features.async_inorder = 1;
+        dev->tx_q[qid].vhost_async.feat.intval = async_features.intval;
+        dev->tx_q[qid].vhost_async.is_valid = true;
+    }
+
+out:
+    free(dup_attr_str);
+    return ret;
+}
+
 static int
 netdev_dpdk_vhost_client_set_config(struct netdev *netdev,
                                     const struct smap *args,
@@ -2020,6 +2221,8 @@ netdev_dpdk_vhost_client_set_config(struct netdev *netdev,
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
     const char *path;
     int max_tx_retries, cur_max_tx_retries;
+    int err = 0;
+    const char *vhost_async_attr = NULL;
 
     ovs_mutex_lock(&dev->mutex);
     if (!(dev->vhost_driver_flags & RTE_VHOST_USER_CLIENT)) {
@@ -2031,6 +2234,20 @@ netdev_dpdk_vhost_client_set_config(struct netdev 
*netdev,
         }
     }
 
+    /* Get vhost async attributes. */
+    if (dpdk_vhost_async_copy_enabled()) {
+        vhost_async_attr = smap_get_def(args, "vhost-async-attr", NULL);
+        if (vhost_async_attr && (strlen(vhost_async_attr) <= NAME_MAX)) {
+            err = netdev_dpdk_vhost_async_parse_attr(vhost_async_attr,
+                                                     netdev);
+            if (err) {
+                VLOG_ERR("vhost async attribute parsing failed "
+                         "for %s with err: %d\n",netdev->name,err);
+                goto out;
+            }
+        }
+    }
+
     max_tx_retries = smap_get_int(args, "tx-retries-max",
                                   VHOST_ENQ_RETRY_DEF);
     if (max_tx_retries < VHOST_ENQ_RETRY_MIN
@@ -2043,9 +2260,10 @@ netdev_dpdk_vhost_client_set_config(struct netdev 
*netdev,
         VLOG_INFO("Max Tx retries for vhost device '%s' set to %d",
                   netdev_get_name(netdev), max_tx_retries);
     }
-    ovs_mutex_unlock(&dev->mutex);
 
-    return 0;
+out:
+    ovs_mutex_unlock(&dev->mutex);
+    return err;
 }
 
 static int
@@ -2544,6 +2762,55 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk 
*dev,
     }
 }
 
+/* Free the packets sent via the async path. */
+void
+netdev_dpdk_vhost_async_free_pkts(struct netdev *netdev, int qid,
+                                  const bool concurrent_txq)
+{
+    int vhost_qid;
+    uint16_t nr_copied_pkts = 0;
+    uint64_t tx_bytes = 0;
+    struct dp_packet *comp_pkts[NETDEV_MAX_BURST];
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    int vid = netdev_dpdk_get_vid(dev);
+
+    if (OVS_UNLIKELY(vid < 0 || (dev->type != DPDK_DEV_VHOST)
+                     || !(dev->flags & NETDEV_UP)
+                     || !dev->vhost_reconfigured)) {
+        return;
+    }
+
+    qid = dev->tx_q[qid % netdev->n_txq].map;
+    if (qid < 0) {
+        return;
+    }
+
+    if (concurrent_txq) {
+        if (OVS_UNLIKELY(!rte_spinlock_trylock(&dev->tx_q[qid].tx_lock))) {
+            COVERAGE_INC(vhost_tx_contention);
+            rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
+        }
+    }
+
+    if (dev->tx_q[qid].vhost_async.nr_async_pkts) {
+        vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+        /* Get the completion status of async transfer. */
+        nr_copied_pkts = rte_vhost_poll_enqueue_completed(vid, vhost_qid,
+                                                    (struct rte_mbuf **)
+                                                    comp_pkts,
+                                                    NETDEV_MAX_BURST);
+        dev->tx_q[qid].vhost_async.nr_async_pkts -= nr_copied_pkts;
+        for (int i = 0; i < nr_copied_pkts; i++) {
+            tx_bytes += dp_packet_size(comp_pkts[i]);
+            dp_packet_delete(comp_pkts[i]);
+        }
+    }
+
+    if (concurrent_txq) {
+        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
+    }
+}
+
 static void
 __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                          struct dp_packet **pkts, int cnt)
@@ -2553,9 +2820,13 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     struct netdev_dpdk_sw_stats sw_stats_add;
     unsigned int n_packets_to_free = cnt;
     unsigned int total_packets = cnt;
-    int i, retries = 0;
+    int i = 0, retries = 0;
     int max_retries = VHOST_ENQ_RETRY_MIN;
     int vid = netdev_dpdk_get_vid(dev);
+    int vhost_qid;
+    unsigned int tx_pkts;
+    bool is_async = false;
+    struct netdev_vhost_async *vasync = NULL;
 
     qid = dev->tx_q[qid % netdev->n_txq].map;
 
@@ -2588,12 +2859,20 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
     sw_stats_add.tx_qos_drops -= cnt;
 
     n_packets_to_free = cnt;
+    vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+    vasync = &(dev->tx_q[qid].vhost_async);
+    is_async = vasync->is_reg;
 
     do {
-        int vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
-        unsigned int tx_pkts;
+        if (is_async) {
+            /* Call the transfer data callback for async transfer.*/
+            tx_pkts = rte_vhost_submit_enqueue_burst(vid, vhost_qid,
+                                                     cur_pkts, cnt);
+            vasync->nr_async_pkts += tx_pkts;
+        } else {
+            tx_pkts = rte_vhost_enqueue_burst(vid, vhost_qid, cur_pkts, cnt);
+        }
 
-        tx_pkts = rte_vhost_enqueue_burst(vid, vhost_qid, cur_pkts, cnt);
         if (OVS_LIKELY(tx_pkts)) {
             /* Packets have been sent.*/
             cnt -= tx_pkts;
@@ -2622,8 +2901,13 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
                                          &sw_stats_add);
     rte_spinlock_unlock(&dev->stats_lock);
 
+    if (is_async) {
+        /* Delete the packets that are dropped. */
+        i = n_packets_to_free - cnt;
+    }
+
 out:
-    for (i = 0; i < n_packets_to_free; i++) {
+    for (; i < n_packets_to_free; i++) {
         dp_packet_delete(pkts[i]);
     }
 }
@@ -3851,6 +4135,63 @@ set_irq_status(int vid)
     }
 }
 
+/* Register the vHost async device for a queue. */
+static inline int
+netdev_dpdk_vhost_async_reg(int vid,
+                            struct netdev_dpdk *dev,
+                            const int qid)
+{
+    int vhost_qid, ret = -1;
+    struct netdev_vhost_async *vasync = &(dev->tx_q[qid].vhost_async);
+
+    if (vid >= 0 && !vasync->is_reg) {
+        vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+        ret = netdev_dpdk_vhost_async_start_chnl(vasync->dev_id);
+        if (ret != 0) {
+            VLOG_ERR("Start failed for device %s "
+                     "queue : txq %d vid :%d\n", dev->up.name, qid, vid);
+            goto out;
+        }
+
+        ret = rte_vhost_async_channel_register(vid,
+                                               (uint16_t)vhost_qid,
+                                               vasync->feat.intval,
+                                               &vhost_async_chnl_ops);
+        if (ret != 0) {
+            netdev_dpdk_vhost_async_stop_chnl(vasync->dev_id);
+            VLOG_ERR("Channel register failed for device %s "
+                     "queue : txq %d vid :%d\n", dev->up.name, qid, vid);
+        } else {
+            vasync->is_reg = true;
+        }
+    }
+out:
+    return ret;
+}
+
+/* Unregister the vHost async channel for a queue. */
+static inline void
+netdev_dpdk_vhost_async_unreg(int vid,
+                              struct netdev_dpdk *dev,
+                              const int qid)
+{
+    int vhost_qid, ret = -1;
+    struct netdev_vhost_async *vasync = &(dev->tx_q[qid].vhost_async);
+
+    if (vid >= 0 && vasync->is_reg) {
+        vhost_qid = qid * VIRTIO_QNUM + VIRTIO_RXQ;
+        netdev_dpdk_vhost_async_stop_chnl(vasync->dev_id);
+        ret = rte_vhost_async_channel_unregister(vid, (uint16_t)vhost_qid);
+
+        if (ret != 0) {
+            VLOG_ERR("Channel unregister detach failed for device %s "
+                     "queue : txq%d vid :%d\n", dev->up.name, qid, vid);
+        } else {
+            vasync->is_reg = false;
+        }
+    }
+}
+
 /*
  * Fixes mapping for vhost-user tx queues. Must be called after each
  * enabling/disabling of queues and n_txq modifications.
@@ -3900,6 +4241,179 @@ netdev_dpdk_remap_txqs(struct netdev_dpdk *dev)
     free(enabled_queues);
 }
 
+/* Fetch the device. */
+static inline struct netdev_dpdk *
+netdev_dpdk_vhost_get_dev(int vid)
+{
+    char ifname[IF_NAME_SZ] = {0};
+    struct netdev_dpdk *dev = NULL;
+    rte_vhost_get_ifname(vid, ifname, sizeof ifname);
+
+    ovs_mutex_lock(&dpdk_mutex);
+    LIST_FOR_EACH (dev, list_node, &dpdk_list) {
+        ovs_mutex_lock(&dev->mutex);
+        if (nullable_string_is_equal(ifname, dev->vhost_id)) {
+            ovs_mutex_unlock(&dev->mutex);
+            break;
+        }
+        ovs_mutex_unlock(&dev->mutex);
+    }
+    ovs_mutex_unlock(&dpdk_mutex);
+    return dev;
+}
+
+/* vHost async callback to transfer data via hardware. */
+static uint32_t
+ioat_transfer_data_cb(int vid,
+                      uint16_t queue_id,
+                      struct rte_vhost_async_desc *descs,
+                      struct rte_vhost_async_status *opaque_data,
+                      uint16_t count)
+{
+    int is_enqueued;
+    bool is_ring_full = false;
+    uint16_t dev_id;
+    uint32_t desc_idx = 0;
+    unsigned long seg_idx = 0;
+    unsigned int dma_src_len;
+    unsigned short write_idx = 0;
+    struct netdev_dpdk *dev = NULL;
+    struct async_pkt_tracker *pkt_tracker = NULL;
+    struct rte_vhost_iov_iter *src_ptr, *dst_ptr;
+    struct netdev_vhost_async *vhost_async_attr;
+    phys_addr_t iov_base, dma_src_start_addr, dma_dst_start_addr;
+
+    if (OVS_LIKELY(!opaque_data)) {
+        dev = netdev_dpdk_vhost_get_dev(vid);
+        queue_id = dev->tx_q[queue_id / VIRTIO_QNUM].map;
+        vhost_async_attr = &(dev->tx_q[queue_id].vhost_async);
+        if (vhost_async_attr) {
+            dev_id = vhost_async_attr->dev_id;
+            pkt_tracker = &(vhost_async_attr->pkt_tracker);
+            write_idx = pkt_tracker->next_write;
+            if (pkt_tracker->seg_info_count == VHOST_ASYNC_MAX_ENQUEUED_SIZE) {
+                /* Pkt tracker buffer is full, do not enqueue. */
+                goto out;
+            }
+            for (; (uint16_t)desc_idx < count; desc_idx++) {
+                src_ptr = descs[desc_idx].src;
+                dst_ptr = descs[desc_idx].dst;
+                for (seg_idx = 0; seg_idx < src_ptr->nr_segs; seg_idx++) {
+                    /* Fetch DMA source start addr. */
+                    iov_base = (uintptr_t)(src_ptr->iov[seg_idx].iov_base);
+                    dma_src_start_addr = src_ptr->offset + iov_base;
+                    /* Fetch DMA destination start addr. */
+                    iov_base = (uintptr_t)(dst_ptr->iov[seg_idx].iov_base);
+                    dma_dst_start_addr = dst_ptr->offset + iov_base;
+                    /* Fetch packet segment length. */
+                    dma_src_len = src_ptr->iov[seg_idx].iov_len;
+                    is_enqueued = rte_ioat_enqueue_copy(dev_id,
+                                                        dma_src_start_addr,
+                                                        dma_dst_start_addr,
+                                                        dma_src_len,
+                                                        0,0);
+                    if (OVS_UNLIKELY(!is_enqueued)) {
+                        /* Device ring is full. */
+                        is_ring_full = true;
+                        goto out;
+                    }
+                }
+                write_idx &= (VHOST_ASYNC_MAX_ENQUEUED_SIZE - 1);
+                pkt_tracker->seg_info[write_idx++].count = seg_idx;
+                pkt_tracker->seg_info_count++;
+                if (pkt_tracker->seg_info_count ==
+                                             VHOST_ASYNC_MAX_ENQUEUED_SIZE) {
+                    /* Pkt tracker buffer is full, do not enqueue. */
+                    goto out;
+                }
+            }
+        }
+    }
+
+out:
+    if (desc_idx != 0) {
+        /* Ring the doorbell. */
+        rte_ioat_perform_ops(dev_id);
+        if (is_ring_full && seg_idx) {
+            write_idx &= (VHOST_ASYNC_MAX_ENQUEUED_SIZE - 1);
+            pkt_tracker->seg_info[write_idx++].count = seg_idx;
+            pkt_tracker->seg_info[write_idx++].is_incompl = true;
+            pkt_tracker->seg_info_count++;
+        }
+        pkt_tracker->next_write = write_idx;
+    }
+    return desc_idx;
+}
+
+/* vHost async callback to query transfer status via hardware. */
+static uint32_t
+ioat_check_completed_copies_cb(int vid,
+                               uint16_t queue_id,
+                               struct rte_vhost_async_status *opaque_data,
+                               uint16_t max_packets)
+{
+    int qid;
+    int ret_segs = 0;
+    struct netdev_dpdk *dev = NULL;
+    struct async_pkt_tracker *pkt_tracker = NULL;
+    struct netdev_vhost_async *vhost_async_attr;
+    unsigned short read, enq_segs;
+    unsigned short nr_pkts = 0;
+
+    if (OVS_LIKELY(!opaque_data)) {
+        dev = netdev_dpdk_vhost_get_dev(vid);
+        if (OVS_UNLIKELY(!dev)) {
+            goto out;
+        }
+
+        qid = dev->tx_q[queue_id / VIRTIO_QNUM].map;
+        vhost_async_attr = &(dev->tx_q[queue_id].vhost_async);
+
+        if (OVS_UNLIKELY((qid < 0) || !vhost_async_attr)) {
+            goto out;
+        }
+
+        if (vhost_async_attr->is_valid) {
+            uint16_t dev_id = vhost_async_attr->dev_id;
+            ret_segs = rte_ioat_completed_ops(dev_id, 0, NULL, NULL);
+            if (OVS_UNLIKELY(ret_segs < 0)) {
+                VLOG_WARN_RL(&rl,"rte_ioat_completed_copies failed for vid: %d"
+                             " device : %d with err: %s\n",
+                             vid, dev_id, rte_strerror(rte_errno));
+            } else {
+                pkt_tracker = &(vhost_async_attr->pkt_tracker);
+                ret_segs += pkt_tracker->last_remain;
+                if (ret_segs == 0) {
+                    goto out;
+                }
+                read = pkt_tracker->next_read;
+                for (uint16_t i = 0; i < max_packets; i++) {
+                    read &= (VHOST_ASYNC_MAX_ENQUEUED_SIZE - 1);
+                    enq_segs = pkt_tracker->seg_info[read].count;
+                    if ((pkt_tracker->seg_info_count == 0)
+                         || (ret_segs < enq_segs)) {
+                        break;
+                    }
+                    ret_segs -= enq_segs;
+                    if (OVS_LIKELY(!pkt_tracker->seg_info[read].is_incompl)) {
+                        nr_pkts++;
+                    } else {
+                        pkt_tracker->seg_info[read].is_incompl = false;
+                    }
+                    pkt_tracker->seg_info[read].count = 0;
+                    pkt_tracker->seg_info_count--;
+                    read++;
+                }
+                vhost_async_attr->pkt_tracker.next_read = read;
+                vhost_async_attr->pkt_tracker.last_remain =
+                                                      (unsigned short)ret_segs;
+            }
+        }
+    }
+out:
+    return nr_pkts;
+}
+
 /*
  * A new virtio-net device is added to a vhost port.
  */
@@ -4000,6 +4514,11 @@ destroy_device(int vid)
 
             ovs_mutex_lock(&dev->mutex);
             dev->vhost_reconfigured = false;
+            if (dpdk_vhost_async_copy_enabled()) {
+                for (int qid = 0; qid < dev->up.n_txq; qid++) {
+                    netdev_dpdk_vhost_async_unreg(vid, dev, qid);
+                }
+            }
             ovsrcu_index_set(&dev->vid, -1);
             memset(dev->vhost_rxq_enabled, 0,
                    dev->up.n_rxq * sizeof *dev->vhost_rxq_enabled);
@@ -4039,6 +4558,8 @@ vring_state_changed(int vid, uint16_t queue_id, int 
enable)
     int qid = queue_id / VIRTIO_QNUM;
     bool is_rx = (queue_id % VIRTIO_QNUM) == VIRTIO_TXQ;
     char ifname[IF_NAME_SZ];
+    bool is_async = false;
+    int ret = 0;
 
     rte_vhost_get_ifname(vid, ifname, sizeof ifname);
 
@@ -4054,8 +4575,12 @@ vring_state_changed(int vid, uint16_t queue_id, int 
enable)
                     netdev_change_seq_changed(&dev->up);
                 }
             } else {
+                is_async = dev->tx_q[qid].vhost_async.is_valid;
                 if (enable) {
                     dev->tx_q[qid].map = qid;
+                    if (is_async) {
+                        ret = netdev_dpdk_vhost_async_reg(vid, dev, qid);
+                    }
                 } else {
                     dev->tx_q[qid].map = OVS_VHOST_QUEUE_DISABLED;
                 }
@@ -4075,10 +4600,10 @@ vring_state_changed(int vid, uint16_t queue_id, int 
enable)
                   qid, ifname, (enable == 1) ? "enabled" : "disabled");
     } else {
         VLOG_INFO("vHost Device '%s' not found", ifname);
-        return -1;
+        ret = -1;
     }
 
-    return 0;
+    return ret;
 }
 
 static void
@@ -5042,6 +5567,11 @@ netdev_dpdk_vhost_client_reconfigure(struct netdev 
*netdev)
             vhost_flags |= RTE_VHOST_USER_POSTCOPY_SUPPORT;
         }
 
+        /* Enable async copy flag, if requested. */
+        if (dpdk_vhost_async_copy_enabled()) {
+            vhost_flags |= RTE_VHOST_USER_ASYNC_COPY;
+        }
+
         /* Enable External Buffers if TCP Segmentation Offload is enabled. */
         if (userspace_tso_enabled()) {
             vhost_flags |= RTE_VHOST_USER_EXTBUF_SUPPORT;
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index 848346cb4..b343c1b56 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -56,6 +56,9 @@ netdev_dpdk_rte_flow_query_count(struct netdev *netdev,
 int
 netdev_dpdk_get_port_id(struct netdev *netdev);
 
+void
+netdev_dpdk_vhost_async_free_pkts(struct netdev *netdev, int qid,
+                                  const bool concurrent_txq);
 #else
 
 static inline void
-- 
2.17.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to