> hi Ilya,
> 
> On 12/14/2017 11:59 AM, Ilya Maximets wrote:
> > This allows to collect packets from more than one RX burst and send
> > them together with a configurable intervals.
> >
> > 'other_config:tx-flush-interval' can be used to configure time that a
> > packet can wait in output batch for sending.
> >
> > dpif-netdev turned to microsecond resolution for time measuring to
> > ensure desired resolution of 'tx-flush-interval'.
> >
> > Acked-by: Eelco Chaudron <echau...@redhat.com>
> > Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
> > ---
> >  lib/dpif-netdev.c    | 149 +++++++++++++++++++++++++++++++++++++++-----
> -------
> >  vswitchd/vswitch.xml |  16 ++++++
> >  2 files changed, 131 insertions(+), 34 deletions(-)
> >
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index
> > d7f6171..f5a7793 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -85,6 +85,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
> >  #define MAX_RECIRC_DEPTH 6
> >  DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
> >
> > +/* Use instant packet send by default. */ #define
> > +DEFAULT_TX_FLUSH_INTERVAL 0
> > +
> >  /* Configuration parameters. */
> >  enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow
> table. */
> >  enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
> > @@ -178,12 +181,13 @@ struct emc_cache {
> >  

> >  /* Simple non-wildcarding single-priority classifier. */
> >
> > -/* Time in ms between successive optimizations of the dpcls subtable
> > vector */ -#define DPCLS_OPTIMIZATION_INTERVAL 1000
> > +/* Time in microseconds between successive optimizations of the dpcls
> > + * subtable vector */
> > +#define DPCLS_OPTIMIZATION_INTERVAL 1000000LL
> >
> > -/* Time in ms of the interval in which rxq processing cycles used in
> > - * rxq to pmd assignments is measured and stored. */ -#define
> > PMD_RXQ_INTERVAL_LEN 10000
> > +/* Time in microseconds of the interval in which rxq processing
> > +cycles used
> > + * in rxq to pmd assignments is measured and stored. */ #define
> > +PMD_RXQ_INTERVAL_LEN 10000000LL
> >
> >  /* Number of intervals for which cycles are stored
> >   * and used during rxq to pmd assignment. */ @@ -270,6 +274,9 @@
> > struct dp_netdev {
> >      struct hmap ports;
> >      struct seq *port_seq;       /* Incremented whenever a port changes.
> */
> >
> > +    /* The time that a packet can wait in output batch for sending. */
> > +    atomic_uint32_t tx_flush_interval;
> > +
> >      /* Meters. */
> >      struct ovs_mutex meter_locks[N_METER_LOCKS];
> >      struct dp_meter *meters[MAX_METERS]; /* Meter bands. */ @@ -356,7
> > +363,7 @@ enum rxq_cycles_counter_type {
> >      RXQ_N_CYCLES
> >  };
> >
> > -#define XPS_TIMEOUT_MS 500LL
> > +#define XPS_TIMEOUT 500000LL    /* In microseconds. */
> >
> >  /* Contained by struct dp_netdev_port's 'rxqs' member.  */  struct
> > dp_netdev_rxq { @@ -526,6 +533,7 @@ struct tx_port {
> >      int qid;
> >      long long last_used;
> >      struct hmap_node node;
> > +    long long flush_time;
> >      struct dp_packet_batch output_pkts;  };
> >
> > @@ -612,6 +620,9 @@ struct dp_netdev_pmd_thread {
> >       * than 'cmap_count(dp->poll_threads)'. */
> >      uint32_t static_tx_qid;
> >
> > +    /* Number of filled output batches. */
> > +    int n_output_batches;
> > +
> >      struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and
> 'tx_ports'. */
> >      /* List of rx queues to poll. */
> >      struct hmap poll_list OVS_GUARDED; @@ -705,8 +716,9 @@ static
> > void dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
> > static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
> >                                         struct rxq_poll *poll)
> >      OVS_REQUIRES(pmd->port_mutex);
> > -static void
> > -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd);
> > +static int
> > +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> > +                                   bool force);
> >
> >  static void reconfigure_datapath(struct dp_netdev *dp)
> >      OVS_REQUIRES(dp->port_mutex);
> > @@ -797,7 +809,7 @@ emc_cache_slow_sweep(struct emc_cache *flow_cache)
> > static inline void  pmd_thread_ctx_time_update(struct
> > dp_netdev_pmd_thread *pmd)  {
> > -    pmd->ctx.now = time_msec();
> > +    pmd->ctx.now = time_usec();
> >  }
> >
> >  /* Returns true if 'dpif' is a netdev or dummy dpif, false otherwise.
> > */ @@ -1297,6 +1309,7 @@ create_dp_netdev(const char *name, const struct
> dpif_class *class,
> >      conntrack_init(&dp->conntrack);
> >
> >      atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
> > +    atomic_init(&dp->tx_flush_interval, DEFAULT_TX_FLUSH_INTERVAL);
> >
> >      cmap_init(&dp->poll_threads);
> >
> > @@ -2967,7 +2980,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
> dpif_execute *execute)
> >      dp_packet_batch_init_packet(&pp, execute->packet);
> >      dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
> >                                execute->actions, execute->actions_len);
> > -    dp_netdev_pmd_flush_output_packets(pmd);
> > +    dp_netdev_pmd_flush_output_packets(pmd, true);
> >
> >      if (pmd->core_id == NON_PMD_CORE_ID) {
> >          ovs_mutex_unlock(&dp->non_pmd_mutex);
> > @@ -3016,6 +3029,16 @@ dpif_netdev_set_config(struct dpif *dpif, const
> struct smap *other_config)
> >          smap_get_ullong(other_config, "emc-insert-inv-prob",
> >                          DEFAULT_EM_FLOW_INSERT_INV_PROB);
> >      uint32_t insert_min, cur_min;
> > +    uint32_t tx_flush_interval, cur_tx_flush_interval;
> > +
> > +    tx_flush_interval = smap_get_int(other_config, "tx-flush-interval",
> > +                                     DEFAULT_TX_FLUSH_INTERVAL);
> > +    atomic_read_relaxed(&dp->tx_flush_interval,
> &cur_tx_flush_interval);
> > +    if (tx_flush_interval != cur_tx_flush_interval) {
> > +        atomic_store_relaxed(&dp->tx_flush_interval,
> tx_flush_interval);
> > +        VLOG_INFO("Flushing interval for tx queues set to %"PRIu32"
> us",
> > +                  tx_flush_interval);
> > +    }
> >
> >      if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
> >          free(dp->pmd_cmask);
> > @@ -3254,12 +3277,14 @@ dp_netdev_rxq_get_intrvl_cycles(struct
> dp_netdev_rxq *rx, unsigned idx)
> >      return processing_cycles;
> >  }
> >
> > -static void
> > +static int
> >  dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
> >                                     struct tx_port *p)  {
> >      int tx_qid;
> > +    int output_cnt;
> >      bool dynamic_txqs;
> > +    uint32_t tx_flush_interval;
> >
> >      dynamic_txqs = p->port->dynamic_txqs;
> >      if (dynamic_txqs) {
> > @@ -3268,20 +3293,40 @@ dp_netdev_pmd_flush_output_on_port(struct
> dp_netdev_pmd_thread *pmd,
> >          tx_qid = pmd->static_tx_qid;
> >      }
> >
> > +    output_cnt = dp_packet_batch_size(&p->output_pkts);
> > +    ovs_assert(output_cnt > 0);
> > +
> >      netdev_send(p->port->netdev, tx_qid, &p->output_pkts,
> dynamic_txqs);
> >      dp_packet_batch_init(&p->output_pkts);
> > +
> > +    /* Update time of the next flush. */
> > +    atomic_read_relaxed(&pmd->dp->tx_flush_interval,
> &tx_flush_interval);
> > +    p->flush_time = pmd->ctx.now + tx_flush_interval;
> > +
> > +    ovs_assert(pmd->n_output_batches > 0);
> > +    pmd->n_output_batches--;
> > +
> > +    return output_cnt;
> >  }
> >
> > -static void
> > -dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd)
> > +static int
> > +dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
> > +                                   bool force)
> >  {
> >      struct tx_port *p;
> > +    int output_cnt = 0;
> > +
> > +    if (!pmd->n_output_batches) {
> > +        return 0;
> > +    }
> >
> >      HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
> > -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
> > -            dp_netdev_pmd_flush_output_on_port(pmd, p);
> > +        if (!dp_packet_batch_is_empty(&p->output_pkts)
> > +            && (force || pmd->ctx.now >= p->flush_time)) {
> > +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p);
> >          }
> >      }
> > +    return output_cnt;
> >  }
> >
> >  static int
> > @@ -3291,7 +3336,7 @@ dp_netdev_process_rxq_port(struct
> > dp_netdev_pmd_thread *pmd,  {
> >      struct dp_packet_batch batch;
> >      int error;
> > -    int batch_cnt = 0;
> > +    int batch_cnt = 0, output_cnt = 0;
> >
> >      dp_packet_batch_init(&batch);
> >      error = netdev_rxq_recv(rx, &batch); @@ -3301,7 +3346,7 @@
> > dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
> >
> >          batch_cnt = batch.count;
> >          dp_netdev_input(pmd, &batch, port_no);
> > -        dp_netdev_pmd_flush_output_packets(pmd);
> > +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, false);
> >      } else if (error != EAGAIN && error != EOPNOTSUPP) {
> >          static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1,
> > 5);
> >
> > @@ -3309,7 +3354,7 @@ dp_netdev_process_rxq_port(struct
> dp_netdev_pmd_thread *pmd,
> >                      netdev_rxq_get_name(rx), ovs_strerror(error));
> >      }
> >
> > -    return batch_cnt;
> > +    return batch_cnt + output_cnt;
> >  }
> >
> >  static struct tx_port *
> > @@ -3932,7 +3977,8 @@ dpif_netdev_run(struct dpif *dpif)
> >      struct dp_netdev *dp = get_dp_netdev(dpif);
> >      struct dp_netdev_pmd_thread *non_pmd;
> >      uint64_t new_tnl_seq;
> > -    int process_packets = 0;
> > +    int process_packets;
> > +    bool need_to_flush = true;
> >
> >      ovs_mutex_lock(&dp->port_mutex);
> >      non_pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); @@ -3952,11
> > +3998,25 @@ dpif_netdev_run(struct dpif *dpif)
> >                                                process_packets
> >                                                ? PMD_CYCLES_PROCESSING
> >                                                : PMD_CYCLES_IDLE);
> > +                    if (process_packets) {
> > +                        need_to_flush = false;
> > +                    }
> >                  }
> >              }
> >          }
> > +        if (need_to_flush) {
> > +            /* We didn't receive anything in the process loop.
> > +             * Check if we need to send something.
> > +             * There was no time updates on current iteration. */
> > +            pmd_thread_ctx_time_update(non_pmd);
> > +            process_packets =
> dp_netdev_pmd_flush_output_packets(non_pmd,
> > +
> false);
> > +            cycles_count_intermediate(non_pmd, NULL, process_packets
> > +                                                     ?
> PMD_CYCLES_PROCESSING
> > +                                                     :
> PMD_CYCLES_IDLE);
> > +        }
> > +
> >          cycles_count_end(non_pmd, PMD_CYCLES_IDLE);
> > -        pmd_thread_ctx_time_update(non_pmd);
> >          dpif_netdev_xps_revalidate_pmd(non_pmd, false);
> >          ovs_mutex_unlock(&dp->non_pmd_mutex);
> >
> > @@ -4007,6 +4067,8 @@ pmd_free_cached_ports(struct
> > dp_netdev_pmd_thread *pmd)  {
> >      struct tx_port *tx_port_cached;
> >
> > +    /* Flush all the queued packets. */
> > +    dp_netdev_pmd_flush_output_packets(pmd, true);
> >      /* Free all used tx queue ids. */
> >      dpif_netdev_xps_revalidate_pmd(pmd, true);
> >
> > @@ -4105,7 +4167,6 @@ pmd_thread_main(void *f_)
> >      bool exiting;
> >      int poll_cnt;
> >      int i;
> > -    int process_packets = 0;
> >
> >      poll_list = NULL;
> >
> > @@ -4135,6 +4196,9 @@ reload:
> >
> >      cycles_count_start(pmd);
> >      for (;;) {
> > +        int process_packets;
> > +        bool need_to_flush = true;
> > +
> >          for (i = 0; i < poll_cnt; i++) {
> >              process_packets =
> >                  dp_netdev_process_rxq_port(pmd, poll_list[i].rxq->rx,
> > @@ -4142,6 +4206,20 @@ reload:
> >              cycles_count_intermediate(pmd, poll_list[i].rxq,
> >                                        process_packets ?
> PMD_CYCLES_PROCESSING
> >                                                        :
> > PMD_CYCLES_IDLE);
> > +            if (process_packets) {
> > +                need_to_flush = false;
> > +            }
> > +        }
> > +
> > +        if (need_to_flush) {
> > +            /* We didn't receive anything in the process loop.
> > +             * Check if we need to send something.
> > +             * There was no time updates on current iteration. */
> > +            pmd_thread_ctx_time_update(pmd);
> > +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
> false);
> > +            cycles_count_intermediate(pmd, NULL,
> > +                                      process_packets ?
> PMD_CYCLES_PROCESSING
> > +                                                      :
> > + PMD_CYCLES_IDLE);
> 
> I noticed the processing cycles for an rxq are not counted here. It means
> those processing cycles to tx pkts will no longer be reflected in the rxq
> to pmd assignment (or any rxq stats). I realize the tx processing cycles
> are now shared so we will have some inaccuracy anyway but for an
> individual rxq that has to send to vhost, it could be a significant % of
> it's measured cycles.
> 
> OTOH, this code seems like it would only hit when there are low rates (no
> packets on any queue during the last poll)? so I'm not sure how
> significant it would be in the overall rxq-pmd assignment. e.g. if the rxq
> is measured as using 2% or 7% of a pmd it's probably fine for rxq-pmd
> assignment, whereas 20% or 70% could create a very imbalanced
> distribution.
> 
> If it was significant for rxq-pmd assignment, I'm thinking the best way
> would be to add in the cycles required to tx flush each port to each rxq
> that has packets in the flush. It's over counting rather than under
> counting but we can't assume any shared batching after a new assignment.
> 
> Let me know what you think? Do you think it would only impact the rxq
> measurements during low traffic rates?
> 
> Kevin.

Good catch,

I was going to push this today but I'll hold off until this issue is resolved, 
I don’t want an existing feature such as the rxq balancing being negatively 
impacted upon if we can avoid it.

Ian
> 
> >          }
> >
> >          if (lc++ > 1024) {
> > @@ -4150,9 +4228,6 @@ reload:
> >              lc = 0;
> >
> >              coverage_try_clear();
> > -            /* It's possible that the time was not updated on current
> > -             * iteration, if there were no received packets. */
> > -            pmd_thread_ctx_time_update(pmd);
> >              dp_netdev_pmd_try_optimize(pmd, poll_list, poll_cnt);
> >              if (!ovsrcu_try_quiesce()) {
> >                  emc_cache_slow_sweep(&pmd->flow_cache);
> > @@ -4238,7 +4313,7 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct
> dp_packet_batch *packets_,
> >      memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
> >
> >      /* All packets will hit the meter at the same time. */
> > -    long_delta_t = (now - meter->used); /* msec */
> > +    long_delta_t = (now - meter->used) / 1000; /* msec */
> >
> >      /* Make sure delta_t will not be too large, so that bucket will not
> >       * wrap around below. */
> > @@ -4394,7 +4469,7 @@ dpif_netdev_meter_set(struct dpif *dpif,
> ofproto_meter_id *meter_id,
> >          meter->flags = config->flags;
> >          meter->n_bands = config->n_bands;
> >          meter->max_delta_t = 0;
> > -        meter->used = time_msec();
> > +        meter->used = time_usec();
> >
> >          /* set up bands */
> >          for (i = 0; i < config->n_bands; ++i) { @@ -4592,6 +4667,7 @@
> > dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct
> dp_netdev *dp,
> >      pmd->core_id = core_id;
> >      pmd->numa_id = numa_id;
> >      pmd->need_reload = false;
> > +    pmd->n_output_batches = 0;
> >
> >      ovs_refcount_init(&pmd->ref_cnt);
> >      latch_init(&pmd->exit_latch);
> > @@ -4779,6 +4855,7 @@ dp_netdev_add_port_tx_to_pmd(struct
> > dp_netdev_pmd_thread *pmd,
> >
> >      tx->port = port;
> >      tx->qid = -1;
> > +    tx->flush_time = 0LL;
> >      dp_packet_batch_init(&tx->output_pkts);
> >
> >      hmap_insert(&pmd->tx_ports, &tx->node,
> > hash_port_no(tx->port->port_no)); @@ -4942,7 +5019,7 @@
> packet_batch_per_flow_execute(struct packet_batch_per_flow *batch,
> >      struct dp_netdev_flow *flow = batch->flow;
> >
> >      dp_netdev_flow_used(flow, batch->array.count, batch->byte_count,
> > -                        batch->tcp_flags, pmd->ctx.now);
> > +                        batch->tcp_flags, pmd->ctx.now / 1000);
> >
> >      actions = dp_netdev_flow_get_actions(flow);
> >
> > @@ -5317,7 +5394,7 @@ dpif_netdev_xps_revalidate_pmd(const struct
> dp_netdev_pmd_thread *pmd,
> >              continue;
> >          }
> >          interval = pmd->ctx.now - tx->last_used;
> > -        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
> > +        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT)) {
> >              port = tx->port;
> >              ovs_mutex_lock(&port->txq_used_mutex);
> >              port->txq_used[tx->qid]--; @@ -5338,7 +5415,7 @@
> > dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
> >      interval = pmd->ctx.now - tx->last_used;
> >      tx->last_used = pmd->ctx.now;
> >
> > -    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
> > +    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT)) {
> >          return tx->qid;
> >      }
> >
> > @@ -5470,12 +5547,16 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
> >                  dp_netdev_pmd_flush_output_on_port(pmd, p);
> >              }
> >  #endif
> > -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
> > -                       + dp_packet_batch_size(packets_) >
> NETDEV_MAX_BURST)) {
> > -                /* Some packets was generated while input batch
> processing.
> > -                 * Flush here to avoid overflow. */
> > +            if (dp_packet_batch_size(&p->output_pkts)
> > +                + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
> > +                /* Flush here to avoid overflow. */
> >                  dp_netdev_pmd_flush_output_on_port(pmd, p);
> >              }
> > +
> > +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
> > +                pmd->n_output_batches++;
> > +            }
> > +
> >              DP_PACKET_BATCH_FOR_EACH (packet, packets_) {
> >                  dp_packet_batch_add(&p->output_pkts, packet);
> >              }
> > @@ -5717,7 +5798,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
> *packets_,
> >          conntrack_execute(&dp->conntrack, packets_, aux->flow->dl_type,
> force,
> >                            commit, zone, setmark, setlabel, aux->flow-
> >tp_src,
> >                            aux->flow->tp_dst, helper,
> nat_action_info_ref,
> > -                          pmd->ctx.now);
> > +                          pmd->ctx.now / 1000);
> >          break;
> >      }
> >
> > diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml index
> > 21ffaf5..bc6b1be 100644
> > --- a/vswitchd/vswitch.xml
> > +++ b/vswitchd/vswitch.xml
> > @@ -359,6 +359,22 @@
> >          </p>
> >        </column>
> >
> > +      <column name="other_config" key="tx-flush-interval"
> > +              type='{"type": "integer",
> > +                     "minInteger": 0, "maxInteger": 1000000}'>
> > +        <p>
> > +          Specifies the time in microseconds that a packet can wait in
> output
> > +          batch for sending i.e. amount of time that packet can spend
> in an
> > +          intermediate output queue before sending to netdev.
> > +          This option can be used to configure balance between
> throughput
> > +          and latency. Lower values decreases latency while higher
> values
> > +          may be useful to achieve higher performance.
> > +        </p>
> > +        <p>
> > +          Defaults to 0 i.e. instant packet sending (latency
> optimized).
> > +        </p>
> > +      </column>
> > +
> >        <column name="other_config" key="n-handler-threads"
> >                type='{"type": "integer", "minInteger": 1}'>
> >          <p>
> >

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to