This allows to collect packets from more than one RX burst
and send them together with a configurable maximum latency.

'other_config:output-max-latency' can be used to configure
time that a packet can wait in output batch for sending.

Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
---

millisecon granularity is used for now. Can be easily switched to use
microseconds instead.

 lib/dpif-netdev.c    | 97 +++++++++++++++++++++++++++++++++++++++++++---------
 vswitchd/vswitch.xml | 15 ++++++++
 2 files changed, 95 insertions(+), 17 deletions(-)

diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index 07c7dad..e5f8a3d 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -84,6 +84,9 @@ VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 #define MAX_RECIRC_DEPTH 5
 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
 
+/* Use instant packet send by default. */
+#define DEFAULT_OUTPUT_MAX_LATENCY 0
+
 /* Configuration parameters. */
 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
 enum { MAX_METERS = 65536 };    /* Maximum number of meters. */
@@ -261,6 +264,9 @@ struct dp_netdev {
     struct hmap ports;
     struct seq *port_seq;       /* Incremented whenever a port changes. */
 
+    /* The time that a packet can wait in output batch for sending. */
+    atomic_uint32_t output_max_latency;
+
     /* Meters. */
     struct ovs_mutex meter_locks[N_METER_LOCKS];
     struct dp_meter *meters[MAX_METERS]; /* Meter bands. */
@@ -498,6 +504,7 @@ struct tx_port {
     int qid;
     long long last_used;
     struct hmap_node node;
+    long long output_time;
     struct dp_packet_batch output_pkts;
 };
 
@@ -570,6 +577,9 @@ struct dp_netdev_pmd_thread {
      * than 'cmap_count(dp->poll_threads)'. */
     const int static_tx_qid;
 
+    /* Number of filled output batches. */
+    int n_output_batches;
+
     struct ovs_mutex port_mutex;    /* Mutex for 'poll_list' and 'tx_ports'. */
     /* List of rx queues to poll. */
     struct hmap poll_list OVS_GUARDED;
@@ -663,9 +673,9 @@ static void dp_netdev_add_rxq_to_pmd(struct 
dp_netdev_pmd_thread *pmd,
 static void dp_netdev_del_rxq_from_pmd(struct dp_netdev_pmd_thread *pmd,
                                        struct rxq_poll *poll)
     OVS_REQUIRES(pmd->port_mutex);
-static void
+static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
-                                   long long now);
+                                   long long now, bool force);
 static void reconfigure_datapath(struct dp_netdev *dp)
     OVS_REQUIRES(dp->port_mutex);
 static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
@@ -1188,6 +1198,7 @@ create_dp_netdev(const char *name, const struct 
dpif_class *class,
     conntrack_init(&dp->conntrack);
 
     atomic_init(&dp->emc_insert_min, DEFAULT_EM_FLOW_INSERT_MIN);
+    atomic_init(&dp->output_max_latency, DEFAULT_OUTPUT_MAX_LATENCY);
 
     cmap_init(&dp->poll_threads);
     ovs_mutex_init_recursive(&dp->non_pmd_mutex);
@@ -2843,7 +2854,7 @@ dpif_netdev_execute(struct dpif *dpif, struct 
dpif_execute *execute)
     dp_packet_batch_init_packet(&pp, execute->packet);
     dp_netdev_execute_actions(pmd, &pp, false, execute->flow,
                               execute->actions, execute->actions_len, now);
-    dp_netdev_pmd_flush_output_packets(pmd, now);
+    dp_netdev_pmd_flush_output_packets(pmd, now, true);
 
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_unlock(&dp->non_pmd_mutex);
@@ -2892,6 +2903,16 @@ dpif_netdev_set_config(struct dpif *dpif, const struct 
smap *other_config)
         smap_get_ullong(other_config, "emc-insert-inv-prob",
                         DEFAULT_EM_FLOW_INSERT_INV_PROB);
     uint32_t insert_min, cur_min;
+    uint32_t output_max_latency, cur_max_latency;
+
+    output_max_latency = smap_get_int(other_config, "output-max-latency",
+                                      DEFAULT_OUTPUT_MAX_LATENCY);
+    atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
+    if (output_max_latency != cur_max_latency) {
+        atomic_store_relaxed(&dp->output_max_latency, output_max_latency);
+        VLOG_INFO("Output maximum latency set to %"PRIu32" ms",
+                  output_max_latency);
+    }
 
     if (!nullable_string_is_equal(dp->pmd_cmask, cmask)) {
         free(dp->pmd_cmask);
@@ -3092,11 +3113,12 @@ cycles_count_intermediate(struct dp_netdev_pmd_thread 
*pmd,
     non_atomic_ullong_add(&pmd->cycles.n[type], interval);
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_on_port(struct dp_netdev_pmd_thread *pmd,
                                    struct tx_port *p, long long now)
 {
     int tx_qid;
+    int output_cnt;
     bool dynamic_txqs;
 
     dynamic_txqs = p->port->dynamic_txqs;
@@ -3106,21 +3128,39 @@ dp_netdev_pmd_flush_output_on_port(struct 
dp_netdev_pmd_thread *pmd,
         tx_qid = pmd->static_tx_qid;
     }
 
+    output_cnt = dp_packet_batch_size(&p->output_pkts);
     netdev_send(p->port->netdev, tx_qid, &p->output_pkts, dynamic_txqs);
     dp_packet_batch_init(&p->output_pkts);
+
+    if (output_cnt) {
+        ovs_assert(pmd->n_output_batches > 0);
+        pmd->n_output_batches--;
+    }
+    return output_cnt;
 }
 
-static void
+static int
 dp_netdev_pmd_flush_output_packets(struct dp_netdev_pmd_thread *pmd,
-                                   long long now)
+                                   long long now, bool force)
 {
     struct tx_port *p;
+    int output_cnt = 0;
+
+    if (!pmd->n_output_batches) {
+        return 0;
+    }
+
+    if (!now) {
+        now = time_msec();
+    }
 
     HMAP_FOR_EACH (p, node, &pmd->send_port_cache) {
-        if (!dp_packet_batch_is_empty(&p->output_pkts)) {
-            dp_netdev_pmd_flush_output_on_port(pmd, p, now);
+        if (!dp_packet_batch_is_empty(&p->output_pkts)
+            && (force || p->output_time <= now)) {
+            output_cnt += dp_netdev_pmd_flush_output_on_port(pmd, p, now);
         }
     }
+    return output_cnt;
 }
 
 static int
@@ -3130,7 +3170,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread 
*pmd,
 {
     struct dp_packet_batch batch;
     int error;
-    int batch_cnt = 0;
+    int batch_cnt = 0, output_cnt = 0;
 
     dp_packet_batch_init(&batch);
     error = netdev_rxq_recv(rx, &batch);
@@ -3141,7 +3181,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread 
*pmd,
 
         batch_cnt = batch.count;
         dp_netdev_input(pmd, &batch, port_no, now);
-        dp_netdev_pmd_flush_output_packets(pmd, now);
+        output_cnt = dp_netdev_pmd_flush_output_packets(pmd, now, false);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -3149,7 +3189,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread 
*pmd,
                     netdev_rxq_get_name(rx), ovs_strerror(error));
     }
 
-    return batch_cnt;
+    return batch_cnt + output_cnt;
 }
 
 static struct tx_port *
@@ -3685,6 +3725,8 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
 {
     struct tx_port *tx_port_cached;
 
+    /* Flush all the queued packets. */
+    dp_netdev_pmd_flush_output_packets(pmd, 0, true);
     /* Free all used tx queue ids. */
     dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
 
@@ -3759,7 +3801,6 @@ pmd_thread_main(void *f_)
     bool exiting;
     int poll_cnt;
     int i;
-    int process_packets = 0;
 
     poll_list = NULL;
 
@@ -3788,8 +3829,10 @@ reload:
 
     cycles_count_start(pmd);
     for (;;) {
+        int process_packets = 0;
+
         for (i = 0; i < poll_cnt; i++) {
-            process_packets =
+            process_packets +=
                 dp_netdev_process_rxq_port(pmd, poll_list[i].rx,
                                            poll_list[i].port_no);
             cycles_count_intermediate(pmd,
@@ -3797,6 +3840,16 @@ reload:
                                                       : PMD_CYCLES_IDLE);
         }
 
+        if (!process_packets) {
+            /* We didn't receive anything in the process loop.
+             * Check if we need to send something. */
+            process_packets = dp_netdev_pmd_flush_output_packets(pmd,
+                                                                 0, false);
+            cycles_count_intermediate(pmd,
+                                      process_packets ? PMD_CYCLES_PROCESSING
+                                                      : PMD_CYCLES_IDLE);
+        }
+
         if (lc++ > 1024) {
             bool reload;
 
@@ -4234,6 +4287,8 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, 
struct dp_netdev *dp,
     pmd->numa_id = numa_id;
     pmd->need_reload = false;
 
+    pmd->n_output_batches = 0;
+
     *CONST_CAST(int *, &pmd->static_tx_qid) = cmap_count(&dp->poll_threads);
 
     ovs_refcount_init(&pmd->ref_cnt);
@@ -4418,6 +4473,7 @@ dp_netdev_add_port_tx_to_pmd(struct dp_netdev_pmd_thread 
*pmd,
 
     tx->port = port;
     tx->qid = -1;
+    tx->output_time = 0LL;
     dp_packet_batch_init(&tx->output_pkts);
 
     hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
@@ -5094,11 +5150,18 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
*packets_,
                 dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
 #endif
+            if (dp_packet_batch_is_empty(&p->output_pkts)) {
+                uint32_t cur_max_latency;
+
+                atomic_read_relaxed(&dp->output_max_latency, &cur_max_latency);
+                p->output_time = now + cur_max_latency;
 
-            if (OVS_UNLIKELY(dp_packet_batch_size(&p->output_pkts)
-                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST)) {
-                /* Some packets was generated while input batch processing.
-                 * Flush here to avoid overflow. */
+                if (dp_packet_batch_size(packets_)) {
+                    pmd->n_output_batches++;
+                }
+            } else if (dp_packet_batch_size(&p->output_pkts)
+                       + dp_packet_batch_size(packets_) > NETDEV_MAX_BURST) {
+                /* Flush here to avoid overflow. */
                 dp_netdev_pmd_flush_output_on_port(pmd, p, now);
             }
 
diff --git a/vswitchd/vswitch.xml b/vswitchd/vswitch.xml
index 074535b..23930f0 100644
--- a/vswitchd/vswitch.xml
+++ b/vswitchd/vswitch.xml
@@ -344,6 +344,21 @@
         </p>
       </column>
 
+      <column name="other_config" key="output-max-latency"
+              type='{"type": "integer", "minInteger": 0, "maxInteger": 1000}'>
+        <p>
+          Specifies the time in milliseconds that a packet can wait in output
+          batch for sending i.e. amount of time that packet can spend in an
+          intermediate output queue before sending to netdev.
+          This option can be used to configure balance between throughput
+          and latency. Lower values decreases latency while higher values
+          may be useful to achieve higher performance.
+        </p>
+        <p>
+          Defaults to 0 i.e. instant packet sending (latency optimized).
+        </p>
+      </column>
+
       <column name="other_config" key="n-handler-threads"
               type='{"type": "integer", "minInteger": 1}'>
         <p>
-- 
2.7.4

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to