> hi Ilya, > > On 12/14/2017 11:59 AM, Ilya Maximets wrote: > > This allows to collect packets from more than one RX burst and send > > them together with a configurable intervals. > > > > 'other_config:tx-flush-interval' can be used to configure time that a > > packet can wait in output batch for sending. > > > > dpif-netdev turned to microsecond resolution for time measuring to > > ensure desired resolution of 'tx-flush-interval'. > > > > Acked-by: Eelco Chaudron <echau...@redhat.com> > > Signed-off-by: Ilya Maximets <i.maxim...@samsung.com> > > --- > > lib/dpif-netdev.c | 149 +++++++++++++++++++++++++++++++++++++++----- > ------- > > vswitchd/vswitch.xml | 16 ++++++ > > 2 files changed, 131 insertions(+), 34 deletions(-) > > > > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index > > d7f6171..f5a7793 100644 > > --- a/lib/dpif-netdev.c > > +++ b/lib/dpif-netdev.c > > @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev); > > #define MAX_RECIRC_DEPTH 6 > > DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) > > > > +/* Use instant packet send by default. */ #define > > +DEFAULT_TX_FLUSH_INTERVAL 0 > > + > > /* Configuration parameters. */ > > enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow > table. */ > > enum { MAX_METERS = 65536 }; /* Maximum number of meters. */ > > @@ -178,12 +181,13 @@ struct emc_cache { > >
> > /* Simple non-wildcarding single-priority classifier. */ > > > > -/* Time in ms between successive optimizations of the dpcls subtable > > vector */ -#define DPCLS_OPTIMIZATION_INTERVAL 1000 > > +/* Time in microseconds between successive optimizations of the dpcls > > + * subtable vector */ > > +#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL > > > > -/* Time in ms of the interval in which rxq processing cycles used in > > - * rxq to pmd assignments is measured and stored. */ -#define > > PMD_RXQ_INTERVAL_LEN 10000 > > +/* Time in microseconds of the interval in which rxq processing > > +cycles used > > + * in rxq to pmd assignments is measured and stored. */ #define > > +PMD_RXQ_INTERVAL_LEN 10000000LL > > > > /* Number of intervals for which cycles are stored > > * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@ > > struct dp_netdev { > > struct hmap ports; > > struct seq *port_seq; /* Incremented whenever a port changes. > */ > > > > + /* The time that a packet can wait in output batch for sending. */ > > + atomic_uint32_t tx_flush_interval; > > + > > /* Meters. */ > > struct ovs_mutex meter_locks[N_METER_LOCKS]; > > struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7 > > +363,7 @@ enum rxq_cycles_counter_type { > > RXQ_N_CYCLES > > }; > > > > -#define XPS_TIMEOUT_MS 500LL > > +#define XPS_TIMEOUT 500000LL /* In microseconds. */ > > > > /* Contained by struct dp_netdev_port's 'rxqs' member. */ struct > > dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port { > > int qid; > > long long last_used; > > struct hmap_node node; > > + long long flush_time; > > struct dp_packet_batch output_pkts; }; > > > > @@ -612,6 +620,9 @@ struct dp_netdev_pmd_thread { > > * than 'cmap_count(dp->poll_threads)'. */ > > uint32_t static_tx_qid; > > > > + /* Number of filled output batches. */ > > + int n_output_batches; > > + > > struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and > 'tx_ports'. */ > > /* List of rx queues to poll. */ > > struct hmap poll_list OVS_GUARDED; @@ -705,8 +716,9 @@ static > > void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd, > > static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd, > > struct rxq_poll *poll) > > OVS_REQUIRES(pmd->port_mutex); > > -static void > > -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd); > > +static int > > +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd, > > + bool force); > > > > static void reconfigure_datapath(struct dp_netdev *dp) > > OVS_REQUIRES(dp->port_mutex); > > @@ -797,7 +809,7 @@ emc_cache_slow_sweep(struct emc_cache *flow_cache) > > static inline void pmd_thread_ctx_time_update(struct > > dp_netdev_pmd_thread *pmd) { > > - pmd->ctx.now = time_msec(); > > + pmd->ctx.now = time_usec(); > > } > > > > /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise. > > */ @@ -1297,6 +1309,7 @@ create_dp_netdev(const char *name, const struct > dpif_class *class, > > conntrack_init(&dp->conntrack); > > > > atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN); > > + atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL); > > > > cmap_init(&dp->poll_threads); > > > > @@ -2967,7 +2980,7 @@ dpif_netdev_execute(struct dpif *dpif, struct > dpif_execute *execute) > > dp_packet_batch_init_packet(&pp, execute->packet); > > dp_netdev_execute_actions(pmd, &pp, false, execute->flow, > > execute->actions, execute->actions_len); > > - dp_netdev_pmd_flush_output_packets(pmd); > > + dp_netdev_pmd_flush_output_packets(pmd, true); > > > > if (pmd->core_id == NON_PMD_CORE_ID) { > > ovs_mutex_unlock(&dp->non_pmd_mutex); > > @@ -3016,6 +3029,16 @@ dpif_netdev_set_config(struct dpif *dpif, const > struct smap *other_config) > > smap_get_ullong(other_config, "emc-insert-inv-prob", > > DEFAULT_EM_FLOW_INSERT_INV_PROB); > > uint32_t insert_min, cur_min; > > + uint32_t tx_flush_interval, cur_tx_flush_interval; > > + > > + tx_flush_interval = smap_get_int(other_config, "tx-flush-interval", > > + DEFAULT_TX_FLUSH_INTERVAL); > > + atomic_read_relaxed(&dp->tx_flush_interval, > &cur_tx_flush_interval); > > + if (tx_flush_interval != cur_tx_flush_interval) { > > + atomic_store_relaxed(&dp->tx_flush_interval, > tx_flush_interval); > > + VLOG_INFO("Flushing interval for tx queues set to %"PRIu32" > us", > > + tx_flush_interval); > > + } > > > > if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) { > > free(dp->pmd_cmask); > > @@ -3254,12 +3277,14 @@ dp_netdev_rxq_get_intrvl_cycles(struct > dp_netdev_rxq *rx, unsigned idx) > > return processing_cycles; > > } > > > > -static void > > +static int > > dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd, > > struct tx_port *p) { > > int tx_qid; > > + int output_cnt; > > bool dynamic_txqs; > > + uint32_t tx_flush_interval; > > > > dynamic_txqs = p->port->dynamic_txqs; > > if (dynamic_txqs) { > > @@ -3268,20 +3293,40 @@ dp_netdev_pmd_flush_output_on_port(struct > dp_netdev_pmd_thread *pmd, > > tx_qid = pmd->static_tx_qid; > > } > > > > + output_cnt = dp_packet_batch_size(&p->output_pkts); > > + ovs_assert(output_cnt > 0); > > + > > netdev_send(p->port->netdev, tx_qid, &p->output_pkts, > dynamic_txqs); > > dp_packet_batch_init(&p->output_pkts); > > + > > + /* Update time of the next flush. */ > > + atomic_read_relaxed(&pmd->dp->tx_flush_interval, > &tx_flush_interval); > > + p->flush_time = pmd->ctx.now + tx_flush_interval; > > + > > + ovs_assert(pmd->n_output_batches > 0); > > + pmd->n_output_batches--; > > + > > + return output_cnt; > > } > > > > -static void > > -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd) > > +static int > > +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd, > > + bool force) > > { > > struct tx_port *p; > > + int output_cnt = 0; > > + > > + if (!pmd->n_output_batches) { > > + return 0; > > + } > > > > HMAP_FOR_EACH (p, node, &pmd->send_port_cache) { > > - if (!dp_packet_batch_is_empty(&p->output_pkts)) { > > - dp_netdev_pmd_flush_output_on_port(pmd, p); > > + if (!dp_packet_batch_is_empty(&p->output_pkts) > > + && (force || pmd->ctx.now >= p->flush_time)) { > > + output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p); > > } > > } > > + return output_cnt; > > } > > > > static int > > @@ -3291,7 +3336,7 @@ dp_netdev_process_rxq_port(struct > > dp_netdev_pmd_thread *pmd, { > > struct dp_packet_batch batch; > > int error; > > - int batch_cnt = 0; > > + int batch_cnt = 0, output_cnt = 0; > > > > dp_packet_batch_init(&batch); > > error = netdev_rxq_recv(rx, &batch); @@ -3301,7 +3346,7 @@ > > dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, > > > > batch_cnt = batch.count; > > dp_netdev_input(pmd, &batch, port_no); > > - dp_netdev_pmd_flush_output_packets(pmd); > > + output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false); > > } else if (error != EAGAIN && error != EOPNOTSUPP) { > > static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, > > 5); > > > > @@ -3309,7 +3354,7 @@ dp_netdev_process_rxq_port(struct > dp_netdev_pmd_thread *pmd, > > netdev_rxq_get_name(rx), ovs_strerror(error)); > > } > > > > - return batch_cnt; > > + return batch_cnt + output_cnt; > > } > > > > static struct tx_port * > > @@ -3932,7 +3977,8 @@ dpif_netdev_run(struct dpif *dpif) > > struct dp_netdev *dp = get_dp_netdev(dpif); > > struct dp_netdev_pmd_thread *non_pmd; > > uint64_t new_tnl_seq; > > - int process_packets = 0; > > + int process_packets; > > + bool need_to_flush = true; > > > > ovs_mutex_lock(&dp->port_mutex); > > non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3952,11 > > +3998,25 @@ dpif_netdev_run(struct dpif *dpif) > > process_packets > > ? PMD_CYCLES_PROCESSING > > : PMD_CYCLES_IDLE); > > + if (process_packets) { > > + need_to_flush = false; > > + } > > } > > } > > } > > + if (need_to_flush) { > > + /* We didn't receive anything in the process loop. > > + * Check if we need to send something. > > + * There was no time updates on current iteration. */ > > + pmd_thread_ctx_time_update(non_pmd); > > + process_packets = > dp_netdev_pmd_flush_output_packets(non_pmd, > > + > false); > > + cycles_count_intermediate(non_pmd, NULL, process_packets > > + ? > PMD_CYCLES_PROCESSING > > + : > PMD_CYCLES_IDLE); > > + } > > + > > cycles_count_end(non_pmd, PMD_CYCLES_IDLE); > > - pmd_thread_ctx_time_update(non_pmd); > > dpif_netdev_xps_revalidate_pmd(non_pmd, false); > > ovs_mutex_unlock(&dp->non_pmd_mutex); > > > > @@ -4007,6 +4067,8 @@ pmd_free_cached_ports(struct > > dp_netdev_pmd_thread *pmd) { > > struct tx_port *tx_port_cached; > > > > + /* Flush all the queued packets. */ > > + dp_netdev_pmd_flush_output_packets(pmd, true); > > /* Free all used tx queue ids. */ > > dpif_netdev_xps_revalidate_pmd(pmd, true); > > > > @@ -4105,7 +4167,6 @@ pmd_thread_main(void *f_) > > bool exiting; > > int poll_cnt; > > int i; > > - int process_packets = 0; > > > > poll_list = NULL; > > > > @@ -4135,6 +4196,9 @@ reload: > > > > cycles_count_start(pmd); > > for (;;) { > > + int process_packets; > > + bool need_to_flush = true; > > + > > for (i = 0; i < poll_cnt; i++) { > > process_packets = > > dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx, > > @@ -4142,6 +4206,20 @@ reload: > > cycles_count_intermediate(pmd, poll_list[i].rxq, > > process_packets ? > PMD_CYCLES_PROCESSING > > : > > PMD_CYCLES_IDLE); > > + if (process_packets) { > > + need_to_flush = false; > > + } > > + } > > + > > + if (need_to_flush) { > > + /* We didn't receive anything in the process loop. > > + * Check if we need to send something. > > + * There was no time updates on current iteration. */ > > + pmd_thread_ctx_time_update(pmd); > > + process_packets = dp_netdev_pmd_flush_output_packets(pmd, > false); > > + cycles_count_intermediate(pmd, NULL, > > + process_packets ? > PMD_CYCLES_PROCESSING > > + : > > + PMD_CYCLES_IDLE); > > I noticed the processing cycles for an rxq are not counted here. It means > those processing cycles to tx pkts will no longer be reflected in the rxq > to pmd assignment (or any rxq stats). I realize the tx processing cycles > are now shared so we will have some inaccuracy anyway but for an > individual rxq that has to send to vhost, it could be a significant % of > it's measured cycles. > > OTOH, this code seems like it would only hit when there are low rates (no > packets on any queue during the last poll)? so I'm not sure how > significant it would be in the overall rxq-pmd assignment. e.g. if the rxq > is measured as using 2% or 7% of a pmd it's probably fine for rxq-pmd > assignment, whereas 20% or 70% could create a very imbalanced > distribution. > > If it was significant for rxq-pmd assignment, I'm thinking the best way > would be to add in the cycles required to tx flush each port to each rxq > that has packets in the flush. It's over counting rather than under > counting but we can't assume any shared batching after a new assignment. > > Let me know what you think? Do you think it would only impact the rxq > measurements during low traffic rates? > > Kevin. Good catch, I was going to push this today but I'll hold off until this issue is resolved, I don’t want an existing feature such as the rxq balancing being negatively impacted upon if we can avoid it. Ian > > > } > > > > if (lc++ > 1024) { > > @@ -4150,9 +4228,6 @@ reload: > > lc = 0; > > > > coverage_try_clear(); > > - /* It's possible that the time was not updated on current > > - * iteration, if there were no received packets. */ > > - pmd_thread_ctx_time_update(pmd); > > dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt); > > if (!ovsrcu_try_quiesce()) { > > emc_cache_slow_sweep(&pmd->flow_cache); > > @@ -4238,7 +4313,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct > dp_packet_batch *packets_, > > memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate); > > > > /* All packets will hit the meter at the same time. */ > > - long_delta_t = (now - meter->used); /* msec */ > > + long_delta_t = (now - meter->used) / 1000; /* msec */ > > > > /* Make sure delta_t will not be too large, so that bucket will not > > * wrap around below. */ > > @@ -4394,7 +4469,7 @@ dpif_netdev_meter_set(struct dpif *dpif, > ofproto_meter_id *meter_id, > > meter->flags = config->flags; > > meter->n_bands = config->n_bands; > > meter->max_delta_t = 0; > > - meter->used = time_msec(); > > + meter->used = time_usec(); > > > > /* set up bands */ > > for (i = 0; i < config->n_bands; ++i) { @@ -4592,6 +4667,7 @@ > > dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct > dp_netdev *dp, > > pmd->core_id = core_id; > > pmd->numa_id = numa_id; > > pmd->need_reload = false; > > + pmd->n_output_batches = 0; > > > > ovs_refcount_init(&pmd->ref_cnt); > > latch_init(&pmd->exit_latch); > > @@ -4779,6 +4855,7 @@ dp_netdev_add_port_tx_to_pmd(struct > > dp_netdev_pmd_thread *pmd, > > > > tx->port = port; > > tx->qid = -1; > > + tx->flush_time = 0LL; > > dp_packet_batch_init(&tx->output_pkts); > > > > hmap_insert(&pmd->tx_ports, &tx->node, > > hash_port_no(tx->port->port_no)); @@ -4942,7 +5019,7 @@ > packet_batch_per_flow_execute(struct packet_batch_per_flow *batch, > > struct dp_netdev_flow *flow = batch->flow; > > > > dp_netdev_flow_used(flow, batch->array.count, batch->byte_count, > > - batch->tcp_flags, pmd->ctx.now); > > + batch->tcp_flags, pmd->ctx.now / 1000); > > > > actions = dp_netdev_flow_get_actions(flow); > > > > @@ -5317,7 +5394,7 @@ dpif_netdev_xps_revalidate_pmd(const struct > dp_netdev_pmd_thread *pmd, > > continue; > > } > > interval = pmd->ctx.now - tx->last_used; > > - if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) { > > + if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) { > > port = tx->port; > > ovs_mutex_lock(&port->txq_used_mutex); > > port->txq_used[tx->qid]--; @@ -5338,7 +5415,7 @@ > > dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd, > > interval = pmd->ctx.now - tx->last_used; > > tx->last_used = pmd->ctx.now; > > > > - if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) { > > + if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) { > > return tx->qid; > > } > > > > @@ -5470,12 +5547,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch > *packets_, > > dp_netdev_pmd_flush_output_on_port(pmd, p); > > } > > #endif > > - if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts) > > - + dp_packet_batch_size(packets_) > > NETDEV_MAX_BURST)) { > > - /* Some packets was generated while input batch > processing. > > - * Flush here to avoid overflow. */ > > + if (dp_packet_batch_size(&p->output_pkts) > > + + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) { > > + /* Flush here to avoid overflow. */ > > dp_netdev_pmd_flush_output_on_port(pmd, p); > > } > > + > > + if (dp_packet_batch_is_empty(&p->output_pkts)) { > > + pmd->n_output_batches++; > > + } > > + > > DP_PACKET_BATCH_FOR_EACH (packet, packets_) { > > dp_packet_batch_add(&p->output_pkts, packet); > > } > > @@ -5717,7 +5798,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch > *packets_, > > conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type, > force, > > commit, zone, setmark, setlabel, aux->flow- > >tp_src, > > aux->flow->tp_dst, helper, > nat_action_info_ref, > > - pmd->ctx.now); > > + pmd->ctx.now / 1000); > > break; > > } > > > > diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index > > 21ffaf5..bc6b1be 100644 > > --- a/vswitchd/vswitch.xml > > +++ b/vswitchd/vswitch.xml > > @@ -359,6 +359,22 @@ > > </p> > > </column> > > > > + <column name="other_config" key="tx-flush-interval" > > + type='{"type": "integer", > > + "minInteger": 0, "maxInteger": 1000000}'> > > + <p> > > + Specifies the time in microseconds that a packet can wait in > output > > + batch for sending i.e. amount of time that packet can spend > in an > > + intermediate output queue before sending to netdev. > > + This option can be used to configure balance between > throughput > > + and latency. Lower values decreases latency while higher > values > > + may be useful to achieve higher performance. > > + </p> > > + <p> > > + Defaults to 0 i.e. instant packet sending (latency > optimized). > > + </p> > > + </column> > > + > > <column name="other_config" key="n-handler-threads" > > type='{"type": "integer", "minInteger": 1}'> > > <p> > > _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev