I have not tested yet

However, I would have expected something max latency config. to be specific to 
netdev-dpdk port types

This type of code also seems to intersect with present and future QoS 
considerations in netdev-dpdk



-----Original Message-----
From: Ilya Maximets <i.maxim...@samsung.com>
Date: Wednesday, July 26, 2017 at 8:21 AM
To: "ovs-dev@openvswitch.org" <ovs-dev@openvswitch.org>, Bhanuprakash Bodireddy 
<bhanuprakash.bodire...@intel.com>
Cc: Heetae Ahn <heetae82....@samsung.com>, Ben Pfaff <b...@ovn.org>, Antonio 
Fischetti <antonio.fische...@intel.com>, Eelco Chaudron <echau...@redhat.com>, 
Ciara Loftus <ciara.lof...@intel.com>, Kevin Traynor <ktray...@redhat.com>, 
Darrell Ball <db...@vmware.com>, Ilya Maximets <i.maxim...@samsung.com>
Subject: [PATCH RFC v2 4/4] dpif-netdev: Time based output batching.

    This allows to collect packets from more than one RX burst
    and send them together with a configurable maximum latency.
    
    'other_config:output-max-latency' can be used to configure
    time that a packet can wait in output batch for sending.
    
    Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
    ---
    
    millisecon granularity is used for now. Can be easily switched to use
    microseconds instead.
    
     lib/dpif-netdev.c    | 97 
+++++++++++++++++++++++++++++++++++++++++++---------
     vswitchd/vswitch.xml | 15 ++++++++
     2 files changed, 95 insertions(+), 17 deletions(-)
    
    diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
    index 07c7dad..e5f8a3d 100644
    --- a/lib/dpif-netdev.c
    +++ b/lib/dpif-netdev.c
    @@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
     #define MAX_RECIRC_DEPTH 5
     DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
     
    +/* Use instant packet send by default. */
    +#define DEFAULT_OUTPUT_MAX_LATENCY 0
    +
     /* Configuration parameters. */
     enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. 
*/
     enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
    @@ -261,6 +264,9 @@ struct dp_netdev {
         struct hmap ports;
         struct seq *port_seq;       /* Incremented whenever a port changes. */
     
    +    /* The time that a packet can wait in output batch for sending. */
    +    atomic_uint32_t output_max_latency;
    +
         /* Meters. */
         struct ovs_mutex meter_locks[N_METER_LOCKS];
         struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
    @@ -498,6 +504,7 @@ struct tx_port {
         int qid;
         long long last_used;
         struct hmap_node node;
    +    long long output_time;
         struct dp_packet_batch output_pkts;
     };
     
    @@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
          * than 'cmap_count(dp->poll_threads)'. */
         const int static_tx_qid;
     
    +    /* Number of filled output batches. */
    +    int n_output_batches;
    +
         struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 
'tx_ports'. */
         /* List of rx queues to poll. */
         struct hmap poll_list OVS_GUARDED;
    @@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct 
dp_netdev_pmd_thread *pmd,
     static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                            struct rxq_poll *poll)
         OVS_REQUIRES(pmd->port_mutex);
    -static void
    +static int
     dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    -                                   long long now);
    +                                   long long now, bool force);
     static void reconfigure_datapath(struct dp_netdev *dp)
         OVS_REQUIRES(dp->port_mutex);
     static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
    @@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const struct 
dpif_class *class,
         conntrack_init(&dp->conntrack);
     
         atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
    +    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
     
         cmap_init(&dp->poll_threads);
         ovs_mutex_init_recursive(&dp->non_pmd_mutex);
    @@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct 
dpif_execute *execute)
         dp_packet_batch_init_packet(&pp, execute->packet);
         dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                                   execute->actions, execute->actions_len, now);
    -    dp_netdev_pmd_flush_output_packets(pmd, now);
    +    dp_netdev_pmd_flush_output_packets(pmd, now, true);
     
         if (pmd->core_id == NON_PMD_CORE_ID) {
             ovs_mutex_unlock(&dp->non_pmd_mutex);
    @@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif, const 
struct smap *other_config)
             smap_get_ullong(other_config, "emc-insert-inv-prob",
                             DEFAULT_EM_FLOW_INSERT_INV_PROB);
         uint32_t insert_min, cur_min;
    +    uint32_t output_max_latency, cur_max_latency;
    +
    +    output_max_latency = smap_get_int(other_config, "output-max-latency",
    +                                      DEFAULT_OUTPUT_MAX_LATENCY);
    +    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
    +    if (output_max_latency != cur_max_latency) {
    +        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
    +        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
    +                  output_max_latency);
    +    }
     
         if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
             free(dp->pmd_cmask);
    @@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct 
dp_netdev_pmd_thread *pmd,
         non_atomic_ullong_add(&pmd->cycles.n[type], interval);
     }
     
    -static void
    +static int
     dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                        struct tx_port *p, long long now)
     {
         int tx_qid;
    +    int output_cnt;
         bool dynamic_txqs;
     
         dynamic_txqs = p->port->dynamic_txqs;
    @@ -3106,21 +3128,39 @@ dp_netdev_pmd_flush_output_on_port(struct 
dp_netdev_pmd_thread *pmd,
             tx_qid = pmd->static_tx_qid;
         }
     
    +    output_cnt = dp_packet_batch_size(&p->output_pkts);
         netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
         dp_packet_batch_init(&p->output_pkts);
    +
    +    if (output_cnt) {
    +        ovs_assert(pmd->n_output_batches > 0);
    +        pmd->n_output_batches--;
    +    }
    +    return output_cnt;
     }
     
    -static void
    +static int
     dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
    -                                   long long now)
    +                                   long long now, bool force)
     {
         struct tx_port *p;
    +    int output_cnt = 0;
    +
    +    if (!pmd->n_output_batches) {
    +        return 0;
    +    }
    +
    +    if (!now) {
    +        now = time_msec();
    +    }
     
         HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
    -        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
    -            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
    +        if (!dp_packet_batch_is_empty(&p->output_pkts)
    +            && (force || p->output_time <= now)) {
    +            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
         }
    +    return output_cnt;
     }
     
     static int
    @@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct 
dp_netdev_pmd_thread *pmd,
     {
         struct dp_packet_batch batch;
         int error;
    -    int batch_cnt = 0;
    +    int batch_cnt = 0, output_cnt = 0;
     
         dp_packet_batch_init(&batch);
         error = netdev_rxq_recv(rx, &batch);
    @@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct 
dp_netdev_pmd_thread *pmd,
     
             batch_cnt = batch.count;
             dp_netdev_input(pmd, &batch, port_no, now);
    -        dp_netdev_pmd_flush_output_packets(pmd, now);
    +        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
         } else if (error != EAGAIN && error != EOPNOTSUPP) {
             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
     
    @@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct 
dp_netdev_pmd_thread *pmd,
                         netdev_rxq_get_name(rx), ovs_strerror(error));
         }
     
    -    return batch_cnt;
    +    return batch_cnt + output_cnt;
     }
     
     static struct tx_port *
    @@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread 
*pmd)
     {
         struct tx_port *tx_port_cached;
     
    +    /* Flush all the queued packets. */
    +    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
         /* Free all used tx queue ids. */
         dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
     
    @@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
         bool exiting;
         int poll_cnt;
         int i;
    -    int process_packets = 0;
     
         poll_list = NULL;
     
    @@ -3788,8 +3829,10 @@ reload:
     
         cycles_count_start(pmd);
         for (;;) {
    +        int process_packets = 0;
    +
             for (i = 0; i < poll_cnt; i++) {
    -            process_packets =
    +            process_packets +=
                     dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
                                                poll_list[i].port_no);
                 cycles_count_intermediate(pmd,
    @@ -3797,6 +3840,16 @@ reload:
                                                           : PMD_CYCLES_IDLE);
             }
     
    +        if (!process_packets) {
    +            /* We didn't receive anything in the process loop.
    +             * Check if we need to send something. */
    +            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
    +                                                                 0, false);
    +            cycles_count_intermediate(pmd,
    +                                      process_packets ? 
PMD_CYCLES_PROCESSING
    +                                                      : PMD_CYCLES_IDLE);
    +        }
    +
             if (lc++ > 1024) {
                 bool reload;
     
    @@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread 
*pmd, struct dp_netdev *dp,
         pmd->numa_id = numa_id;
         pmd->need_reload = false;
     
    +    pmd->n_output_batches = 0;
    +
         *CONST_CAST(int *, &pmd->static_tx_qid) = 
cmap_count(&dp->poll_threads);
     
         ovs_refcount_init(&pmd->ref_cnt);
    @@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct 
dp_netdev_pmd_thread *pmd,
     
         tx->port = port;
         tx->qid = -1;
    +    tx->output_time = 0LL;
         dp_packet_batch_init(&tx->output_pkts);
     
         hmap_insert(&pmd->tx_ports, &tx->node, 
hash_port_no(tx->port->port_no));
    @@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
*packets_,
                     dp_netdev_pmd_flush_output_on_port(pmd, p, now);
                 }
     #endif
    +            if (dp_packet_batch_is_empty(&p->output_pkts)) {
    +                uint32_t cur_max_latency;
    +
    +                atomic_read_relaxed(&dp->output_max_latency, 
&cur_max_latency);
    +                p->output_time = now + cur_max_latency;
     
    -            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
    -                       + dp_packet_batch_size(packets_) > 
NETDEV_MAX_BURST)) {
    -                /* Some packets was generated while input batch processing.
    -                 * Flush here to avoid overflow. */
    +                if (dp_packet_batch_size(packets_)) {
    +                    pmd->n_output_batches++;
    +                }
    +            } else if (dp_packet_batch_size(&p->output_pkts)
    +                       + dp_packet_batch_size(packets_) > 
NETDEV_MAX_BURST) {
    +                /* Flush here to avoid overflow. */
                     dp_netdev_pmd_flush_output_on_port(pmd, p, now);
                 }
     
    diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
    index 074535b..23930f0 100644
    --- a/vswitchd/vswitch.xml
    +++ b/vswitchd/vswitch.xml
    @@ -344,6 +344,21 @@
             </p>
           </column>
     
    +      <column name="other_config" key="output-max-latency"
    +              type='{"type": "integer", "minInteger": 0, "maxInteger": 
1000}'>
    +        <p>
    +          Specifies the time in milliseconds that a packet can wait in 
output
    +          batch for sending i.e. amount of time that packet can spend in an
    +          intermediate output queue before sending to netdev.
    +          This option can be used to configure balance between throughput
    +          and latency. Lower values decreases latency while higher values
    +          may be useful to achieve higher performance.
    +        </p>
    +        <p>
    +          Defaults to 0 i.e. instant packet sending (latency optimized).
    +        </p>
    +      </column>
    +
           <column name="other_config" key="n-handler-threads"
                   type='{"type": "integer", "minInteger": 1}'>
             <p>
    -- 
    2.7.4
    
    

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to