These work rings help with handling the asynchronous TX usecase. In this usecase, netdev_send will be called, but packets won't be immediately sent by the thread calling netdev_send, but instead handled by a different resource. Since the TX is not instantaneous, the thread calling netdev_send can't immediately free the packets being sent, or report them as sent. Rather than the thread polling for completion of the TX, it is desirable that the thread to move on and process more packets.
The work ring serves as a FIFO queue to keep track of the asynchronous TX calls that have been kicked off. The work ring is added/queued to when netdev_send returns '-EINPROGRESS' indicating it kicked off an asynchronous TX. The work ring is taken/dequeued from in 2 main cases: 1. In pmd_thread_main after processing every rxq assigned to the thread. 2. When the ring is full while trying to queue work. Some dp_defer functions are defined in dpif-netev.c. It would be nice to define these in dpif-netdev-private-defer.h, but these functions rely on the cycle_time_stop/start() functions and dp_netdev_rxq_add_cycles(). I tried to move these to a header so they could be included, but they rely on more structs. I stopped this because I would have to move bigger structs like "struct dp_netdev_rxq" which rely on even more structs all defined in dpif-netdev.c. Signed-off-by: Cian Ferriter <cian.ferri...@intel.com> Co-authored-by: Harry van Haaren <harry.van.haa...@intel.com> Signed-off-by: Harry van Haaren <harry.van.haa...@intel.com> Co-authored-by: Sunil Pai G <sunil.pa...@intel.com> Signed-off-by: Sunil Pai G <sunil.pa...@intel.com> --- v2: - Implement and use a simpler ring buffer in OVS, rather than using the DPDK implementation. - Only print work defer stats if some work has actually been deferred. - Add a "force" flag to the "process_async()" API to implement an attempt limit on the number of times an asynchronous piece of work should be attempted. - Do all outstanding work on a PMD thread before allowing a reload to occur. --- lib/automake.mk | 1 + lib/dpif-netdev-perf.c | 20 ++++- lib/dpif-netdev-perf.h | 9 ++ lib/dpif-netdev-private-defer.h | 84 +++++++++++++++++++ lib/dpif-netdev-private-thread.h | 4 + lib/dpif-netdev.c | 139 +++++++++++++++++++++++++++++-- lib/netdev-dpdk.c | 22 +++-- lib/netdev-provider.h | 19 ++++- lib/netdev.c | 3 +- 9 files changed, 286 insertions(+), 15 deletions(-) create mode 100644 lib/dpif-netdev-private-defer.h diff --git a/lib/automake.mk b/lib/automake.mk index 46f869a33..0d910bc92 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -115,6 +115,7 @@ lib_libopenvswitch_la_SOURCES = \ lib/dpif-netdev-lookup-generic.c \ lib/dpif-netdev.c \ lib/dpif-netdev.h \ + lib/dpif-netdev-private-defer.h \ lib/dpif-netdev-private-dfc.c \ lib/dpif-netdev-private-dfc.h \ lib/dpif-netdev-private-dpcls.h \ diff --git a/lib/dpif-netdev-perf.c b/lib/dpif-netdev-perf.c index d7676ea2b..859ef300c 100644 --- a/lib/dpif-netdev-perf.c +++ b/lib/dpif-netdev-perf.c @@ -230,6 +230,7 @@ pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s, uint64_t tot_iter = histogram_samples(&s->pkts); uint64_t idle_iter = s->pkts.bin[0]; uint64_t busy_iter = tot_iter >= idle_iter ? tot_iter - idle_iter : 0; + uint64_t work_deferred = stats[PMD_STAT_WORK_DEFER]; ds_put_format(str, " Iterations: %12"PRIu64" (%.2f us/it)\n" @@ -284,7 +285,24 @@ pmd_perf_format_overall_stats(struct ds *str, struct pmd_perf_stats *s, tx_packets, (tx_packets / duration) / 1000, tx_batches, 1.0 * tx_packets / tx_batches); } else { - ds_put_format(str, " Tx packets: %12d\n\n", 0); + ds_put_format(str, " Tx packets: %12d\n", 0); + } + if (work_deferred > 0) { + uint64_t work_compl_checks = stats[PMD_STAT_WORK_IN_PROG] + + stats[PMD_STAT_WORK_DONE]; + + ds_put_format(str, + " Work deferred: %12"PRIu64"\n" + " - Deferred work done: %12"PRIu64"\n" + " - Work completion checks: %12"PRIu64 + " (%.2f checks/work item)\n" + " - Ring full when deferring work: %12"PRIu64"\n" + " - Deferred work dropped: %12"PRIu64"\n", + work_deferred, stats[PMD_STAT_WORK_DONE], work_compl_checks, + 1.0 * work_compl_checks / stats[PMD_STAT_WORK_DONE], + stats[PMD_STAT_WORK_R_FULL], stats[PMD_STAT_WORK_DROPPED]); + } else { + ds_put_format(str, " Work deferred: %12d\n\n", 0); } } diff --git a/lib/dpif-netdev-perf.h b/lib/dpif-netdev-perf.h index 834c26260..e9c02a866 100644 --- a/lib/dpif-netdev-perf.h +++ b/lib/dpif-netdev-perf.h @@ -76,6 +76,15 @@ enum pmd_stat_type { * recirculation. */ PMD_STAT_SENT_PKTS, /* Packets that have been sent. */ PMD_STAT_SENT_BATCHES, /* Number of batches sent. */ + PMD_STAT_WORK_DEFER, /* Number of times that work was deferred. */ + PMD_STAT_WORK_IN_PROG, /* Number of times that work was still in progress + when checked by a thread. */ + PMD_STAT_WORK_R_FULL, /* Number of times work ring was full when + * deferring work. */ + PMD_STAT_WORK_DONE, /* Number of times that deferred work was + * completed. */ + PMD_STAT_WORK_DROPPED, /* Number of times that deferred work was dropped. + */ PMD_CYCLES_ITER_IDLE, /* Cycles spent in idle iterations. */ PMD_CYCLES_ITER_BUSY, /* Cycles spent in busy iterations. */ PMD_CYCLES_UPCALL, /* Cycles spent processing upcalls. */ diff --git a/lib/dpif-netdev-private-defer.h b/lib/dpif-netdev-private-defer.h new file mode 100644 index 000000000..78c140f56 --- /dev/null +++ b/lib/dpif-netdev-private-defer.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2021 Intel Corporation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef DPIF_NETDEV_PRIVATE_DEFER_H +#define DPIF_NETDEV_PRIVATE_DEFER_H 1 + +#include <stdbool.h> +#include <stdint.h> + +#include "dpif.h" +#include "dpif-netdev-perf.h" +#include "cmap.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* Function definition for deferred work. */ +typedef int (*dp_defer_work_func)(struct netdev *netdev, int qid, bool force); + +/* Structure to track outstanding work to be done. */ +struct dp_defer_work_item { + dp_defer_work_func work_func; + void *netdev; + int qid; + uint32_t attempts; +}; + +#define WORK_RING_SIZE 128 +#define WORK_RING_MASK (WORK_RING_SIZE - 1) + +#define ATTEMPT_LIMIT 1000 + +/* The read and write indexes are between 0 and 2^32, and we mask their value + * when we access the work_ring[] array. */ +struct dp_defer { + uint32_t read_idx; + uint32_t write_idx; + struct dp_defer_work_item work_ring[WORK_RING_SIZE]; +}; + +static inline void +dp_defer_init(struct dp_defer *defer) +{ + defer->read_idx = 0; + defer->write_idx = 0; +} + +static inline int +dp_defer_work_ring_empty(const struct dp_defer *defer) +{ + return defer->write_idx == defer->read_idx; +} + +static inline int +dp_defer_work_ring_full(const struct dp_defer *defer) +{ + /* When the write index is exactly (WORK_RING_SIZE - 1) or WORK_RING_MASK + * elements ahead of the read index, the ring is full. When calculating the + * difference between the indexes, wraparound is not an issue since + * unsigned ints are used. */ + uint16_t count = (defer->write_idx - defer->read_idx) & WORK_RING_MASK; + + return count == WORK_RING_MASK; +} + +#ifdef __cplusplus +} +#endif + +#endif /* dpif-netdev-private-defer.h */ diff --git a/lib/dpif-netdev-private-thread.h b/lib/dpif-netdev-private-thread.h index a782d9678..d14a5ade7 100644 --- a/lib/dpif-netdev-private-thread.h +++ b/lib/dpif-netdev-private-thread.h @@ -20,6 +20,7 @@ #include "dpif.h" #include "dpif-netdev-perf.h" +#include "dpif-netdev-private-defer.h" #include "dpif-netdev-private-dfc.h" #include "dpif-netdev-private-dpif.h" @@ -219,6 +220,9 @@ struct dp_netdev_pmd_thread { /* Next time when PMD should try RCU quiescing. */ long long next_rcu_quiesce; + + /* Structure to track deferred work in this thread. */ + struct dp_defer defer; }; #ifdef __cplusplus diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index b3e57bb95..f4143a93a 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -435,6 +435,7 @@ struct tx_port { long long flush_time; struct dp_packet_batch output_pkts; struct dp_netdev_rxq *output_pkts_rxqs[NETDEV_MAX_BURST]; + dp_defer_work_func cached_work_func; }; /* Contained by struct tx_bond 'member_buckets'. */ @@ -4591,6 +4592,97 @@ pmd_perf_metrics_enabled(const struct dp_netdev_pmd_thread *pmd OVS_UNUSED) } #endif +/* Try to do one piece of work from the work ring. + * + * Returns: + * -ENOENT: No work to do. The ring is empty. + * -EINPROGRESS: The work is still in progress, I can't do it. + * 0: One piece of work was taken from the ring. It was either successfully + * handled, or dropped if attempted too many times. + */ +static inline unsigned int +dp_defer_do_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats) +{ + struct dp_defer_work_item *work; + uint32_t read_idx; + int ret; + + /* Check that there's a piece of work in the ring to do. */ + if (dp_defer_work_ring_empty(defer)) { + return -ENOENT; + } + + read_idx = defer->read_idx & WORK_RING_MASK; + work = &defer->work_ring[read_idx]; + ret = work->work_func(work->netdev, work->qid, false); + + if (ret == -EINPROGRESS) { + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_IN_PROG, 1); + + work->attempts++; + if (work->attempts > ATTEMPT_LIMIT) { + ret = work->work_func(work->netdev, work->qid, true); + defer->read_idx++; + + if (ret) { + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DROPPED, 1); + } else { + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1); + } + + return 0; + } + + return ret; + } + + defer->read_idx++; + + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DONE, 1); + + return 0; +} + +static inline void +dp_defer_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats, + struct dp_defer_work_item *work) +{ + struct dp_defer_work_item *ring_item; + uint32_t write_idx; + + /* Check that we have enough room in ring. */ + if (dp_defer_work_ring_full(defer)) { + /* The work ring is full, try to make room by doing work. Doing work + * can fail to make room if the work has to be requeued. Keep trying to + * do work until there is room in the ring. */ + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_R_FULL, 1); + + while (dp_defer_do_work(defer, perf_stats)) { + continue; + } + } + + write_idx = defer->write_idx & WORK_RING_MASK; + ring_item = &defer->work_ring[write_idx]; + + ring_item->work_func = work->work_func; + ring_item->netdev = work->netdev; + ring_item->qid = work->qid; + ring_item->attempts = 0; + + defer->write_idx++; + + pmd_perf_update_counter(perf_stats, PMD_STAT_WORK_DEFER, 1); +} + +static inline void +dp_defer_do_all_work(struct dp_defer *defer, struct pmd_perf_stats *perf_stats) +{ + while (dp_defer_do_work(defer, perf_stats) != -ENOENT) { + continue; + } +} + static int dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, struct tx_port *p) @@ -4600,10 +4692,12 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, int output_cnt; bool dynamic_txqs; struct cycle_timer timer; + struct netdev *netdev; uint64_t cycles; uint32_t tx_flush_interval; + struct pmd_perf_stats *perf_stats = &pmd->perf_stats; - cycle_timer_start(&pmd->perf_stats, &timer); + cycle_timer_start(perf_stats, &timer); dynamic_txqs = p->port->dynamic_txqs; if (dynamic_txqs) { @@ -4615,7 +4709,21 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, output_cnt = dp_packet_batch_size(&p->output_pkts); ovs_assert(output_cnt > 0); - netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs); + netdev = p->port->netdev; + int ret = netdev_send(netdev, tx_qid, &p->output_pkts, + dynamic_txqs); + + if (ret == -EINPROGRESS) { + struct dp_defer_work_item work = { + .work_func = p->cached_work_func, + .netdev = netdev, + .qid = tx_qid, + }; + + /* Defer the work. */ + dp_defer_work(&pmd->defer, perf_stats, &work); + } + dp_packet_batch_init(&p->output_pkts); /* Update time of the next flush. */ @@ -4625,12 +4733,15 @@ dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, ovs_assert(pmd->n_output_batches > 0); pmd->n_output_batches--; - pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_PKTS, output_cnt); - pmd_perf_update_counter(&pmd->perf_stats, PMD_STAT_SENT_BATCHES, 1); + /* The batch and number of packets are updated as sent here, even though + * some packets might have been dropped, or are in transit asynchronously. + */ + pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_PKTS, output_cnt); + pmd_perf_update_counter(perf_stats, PMD_STAT_SENT_BATCHES, 1); /* Distribute send cycles evenly among transmitted packets and assign to * their respective rx queues. */ - cycles = cycle_timer_stop(&pmd->perf_stats, &timer) / output_cnt; + cycles = cycle_timer_stop(perf_stats, &timer) / output_cnt; for (i = 0; i < output_cnt; i++) { if (p->output_pkts_rxqs[i]) { dp_netdev_rxq_add_cycles(p->output_pkts_rxqs[i], @@ -6196,6 +6307,7 @@ reload: ovs_mutex_lock(&pmd->perf_stats.stats_mutex); for (;;) { uint64_t rx_packets = 0, tx_packets = 0; + struct dp_defer *defer = &pmd->defer; pmd_perf_start_iteration(s); @@ -6228,10 +6340,20 @@ reload: tx_packets = dp_netdev_pmd_flush_output_packets(pmd, false); } + /* Try to clear the work ring. If a piece of work is still in progress, + * don't attempt to do the remaining work items. They will be postponed + * to the next interation of pmd_thread_main(). */ + while (!dp_defer_do_work(defer, s)) { + continue; + } + /* Do RCU synchronization at fixed interval. This ensures that * synchronization would not be delayed long even at high load of * packet processing. */ if (pmd->ctx.now > pmd->next_rcu_quiesce) { + /* Do any work outstanding on this PMD thread. */ + dp_defer_do_all_work(defer, s); + if (!ovsrcu_try_quiesce()) { pmd->next_rcu_quiesce = pmd->ctx.now + PMD_RCU_QUIESCE_INTERVAL; @@ -6240,6 +6362,8 @@ reload: if (lc++ > 1024) { lc = 0; + /* Do any work outstanding on this PMD thread. */ + dp_defer_do_all_work(defer, s); coverage_try_clear(); dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt); @@ -6262,6 +6386,8 @@ reload: atomic_read_explicit(&pmd->reload, &reload, memory_order_acquire); if (OVS_UNLIKELY(reload)) { + /* Do any work outstanding on this PMD thread. */ + dp_defer_do_all_work(defer, s); break; } @@ -6748,6 +6874,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, pmd_perf_stats_init(&pmd->perf_stats); cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node), hash_int(core_id, 0)); + + dp_defer_init(&pmd->defer); } static void @@ -6918,6 +7046,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread *pmd, tx->qid = -1; tx->flush_time = 0LL; dp_packet_batch_init(&tx->output_pkts); + tx->cached_work_func = port->netdev->netdev_class->process_async; hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no)); pmd->need_reload = true; diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c index 45a96b9be..96d8210e3 100644 --- a/lib/netdev-dpdk.c +++ b/lib/netdev-dpdk.c @@ -2585,7 +2585,7 @@ netdev_dpdk_vhost_update_tx_counters(struct netdev_dpdk *dev, } } -static void +static int __netdev_dpdk_vhost_send(struct netdev *netdev, int qid, struct dp_packet **pkts, int cnt) { @@ -2667,6 +2667,8 @@ out: for (i = 0; i < n_packets_to_free; i++) { dp_packet_delete(pkts[i]); } + + return 0; } static void @@ -2774,7 +2776,7 @@ dpdk_copy_dp_packet_to_mbuf(struct rte_mempool *mp, struct dp_packet *pkt_orig) } /* Tx function. Transmit packets indefinitely */ -static void +static int dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) OVS_NO_THREAD_SAFETY_ANALYSIS { @@ -2793,6 +2795,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) uint32_t tx_failure = 0; uint32_t mtu_drops = 0; uint32_t qos_drops = 0; + int ret = 0; if (dev->type != DPDK_DEV_VHOST) { /* Check if QoS has been configured for this netdev. */ @@ -2826,7 +2829,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) if (OVS_LIKELY(txcnt)) { if (dev->type == DPDK_DEV_VHOST) { - __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt); + ret = __netdev_dpdk_vhost_send(netdev, qid, pkts, txcnt); } else { tx_failure += netdev_dpdk_eth_tx_burst(dev, qid, (struct rte_mbuf **)pkts, @@ -2843,6 +2846,8 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet_batch *batch) sw_stats->tx_qos_drops += qos_drops; rte_spinlock_unlock(&dev->stats_lock); } + + return ret; } static int @@ -2851,14 +2856,15 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid, bool concurrent_txq OVS_UNUSED) { + int ret = 0; if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) { - dpdk_do_tx_copy(netdev, qid, batch); + ret = dpdk_do_tx_copy(netdev, qid, batch); dp_packet_delete_batch(batch, true); } else { - __netdev_dpdk_vhost_send(netdev, qid, batch->packets, - dp_packet_batch_size(batch)); + ret = __netdev_dpdk_vhost_send(netdev, qid, batch->packets, + dp_packet_batch_size(batch)); } - return 0; + return ret; } static inline void @@ -5468,6 +5474,7 @@ static const struct netdev_class dpdk_vhost_class = { .construct = netdev_dpdk_vhost_construct, .destruct = netdev_dpdk_vhost_destruct, .send = netdev_dpdk_vhost_send, + .process_async = NULL, .get_carrier = netdev_dpdk_vhost_get_carrier, .get_stats = netdev_dpdk_vhost_get_stats, .get_custom_stats = netdev_dpdk_get_sw_custom_stats, @@ -5484,6 +5491,7 @@ static const struct netdev_class dpdk_vhost_client_class = { .destruct = netdev_dpdk_vhost_destruct, .set_config = netdev_dpdk_vhost_client_set_config, .send = netdev_dpdk_vhost_send, + .process_async = NULL, .get_carrier = netdev_dpdk_vhost_get_carrier, .get_stats = netdev_dpdk_vhost_get_stats, .get_custom_stats = netdev_dpdk_get_sw_custom_stats, diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h index b5420947d..a448328e7 100644 --- a/lib/netdev-provider.h +++ b/lib/netdev-provider.h @@ -384,7 +384,11 @@ struct netdev_class { * if it would always return EOPNOTSUPP anyhow. (This will prevent the * network device from being usefully used by the netdev-based "userspace * datapath". It will also prevent the OVS implementation of bonding from - * working properly over 'netdev'.) */ + * working properly over 'netdev'.) + * + * May return EINPROGRESS. This indicates that the netdev has more work to + * do, and needs to have process_async called before sending buffers is + * totally completed. */ int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch, bool concurrent_txq); @@ -402,6 +406,19 @@ struct netdev_class { * implement packet transmission through the 'send' member function. */ void (*send_wait)(struct netdev *netdev, int qid); + /* Performs asynchronous work required by the netdev to complete sending + * buffers. The work done in the process_async function is netdev specific, + * but could include freeing packets or updating port stats. + * + * If called with force = false, may return EINPROGRESS if the async call + * still hasn't completed, indicating process_async should be called on + * this netdev + qid again in the future. + * + * If called with force = true, can't return EINPROGRESS. Must handle stats + * updates and any freeing of buffers even if they haven't been sent yet. + */ + int (*process_async)(struct netdev *netdev, int qid, bool force); + /* Sets 'netdev''s Ethernet address to 'mac' */ int (*set_etheraddr)(struct netdev *netdev, const struct eth_addr mac); diff --git a/lib/netdev.c b/lib/netdev.c index 8305f6c42..e122441cf 100644 --- a/lib/netdev.c +++ b/lib/netdev.c @@ -892,7 +892,8 @@ netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch, } error = netdev->netdev_class->send(netdev, qid, batch, concurrent_txq); - if (!error) { + /* For async, treat netdev_send as called when -EINPROGRESS is returned. */ + if (!error || error == -EINPROGRESS) { COVERAGE_INC(netdev_sent); } return error; -- 2.32.0 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev