Current implementation of meters in the userspace datapath takes
the meter lock for every packet batch.  If more than one thread
hits the flow with the same meter, they will lock each other.

Replace the critical section with atomic operations to avoid
interlocking.  Meters themselves are RCU-protected, so it's safe
to access them without holding a lock.

Implementation does the following:

 1. Tries to advance the 'used' timer of the meter with atomic
    compare+exchange if it's smaller than 'now'.
 2. If the timer change succeeds, atomically update band buckets.
 3. Atomically update packet statistics for a meter.
 4. Go over buckets and try to atomically subtract the amount of
    packets or bytes, recording the highest exceeded band.
 5. Atomically update band statistics and drop packets.

Bucket manipulations are implemented with atomic compare+exchange
operations with extra checks, because bucket size should never
exceed the maximum and it should never go below zero.

Packet statistics may be momentarily inconsistent, i.e., number
of packets and the number of bytes may reflect different sets
of packets.  But it should be eventually consistent.  And the
difference at any given time should be in just few packets.

For the sake of reduced code complexity PKTPS meter tries to push
packets through the band one by one, even though they all have
the same weight.  This is also more fair if more than one thread
is passing packets through the same band at the same time.
Trying to predict the number of packets that can pass may also
cause extra atomic operations reducing the performance.

This implementation shows similar performance to the previous one,
but should scale better with more threads hiting the same meter.

Signed-off-by: Ilya Maximets <i.maxim...@ovn.org>
---

@Lin Huang, if you can try this change on your setup, that
would be great.

 NEWS              |   2 +
 lib/dpif-netdev.c | 250 +++++++++++++++++++++++++---------------------
 2 files changed, 140 insertions(+), 112 deletions(-)

diff --git a/NEWS b/NEWS
index 66d5a4ea3..4a2b7dbca 100644
--- a/NEWS
+++ b/NEWS
@@ -37,6 +37,8 @@ Post-v3.1.0
    - SRv6 Tunnel Protocol
      * Added support for userspace datapath (only).
    - Userspace datapath:
+     * Implementation of OpenFlow meters is now lockless allowing for better
+       multi-thread scalability.
      * IP and L4 checksum offload support is now enabled by default for
        interfaces that support it.  See the 'status' column in the 'interface'
        table to check the status.
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index abe63412e..2fa556a62 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct 
dpcls_rule *);
 struct dp_meter_band {
     uint32_t rate;
     uint32_t burst_size;
-    uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for KBPS) */
-    uint64_t packet_count;
-    uint64_t byte_count;
+    atomic_uint64_t bucket;          /* In 1/1000 packets for PKTPS,
+                                      * or in bits for KBPS. */
+    atomic_uint64_t packet_count;
+    atomic_uint64_t byte_count;
 };
 
 struct dp_meter {
     struct cmap_node node;
-    struct ovs_mutex lock;
     uint32_t id;
     uint16_t flags;
     uint16_t n_bands;
     uint32_t max_delta_t;
-    uint64_t used;
-    uint64_t packet_count;
-    uint64_t byte_count;
+    atomic_uint64_t used;  /* Time of a last use in milliseconds. */
+    atomic_uint64_t packet_count;
+    atomic_uint64_t byte_count;
     struct dp_meter_band bands[];
 };
 
@@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif * dpif 
OVS_UNUSED,
     features->max_color = 0;
 }
 
+/* Tries to atomically add 'n' to 'value' in terms of saturation arithmetic,
+ * i.e., if the result will be larger than 'max_value', will store 'max_value'
+ * instead. */
+static void
+atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value)
+{
+    uint64_t current, new_value;
+
+    atomic_read_relaxed(value, &current);
+    do {
+        new_value = current + n;
+        new_value = MIN(new_value, max_value);
+    } while (!atomic_compare_exchange_weak_relaxed(value, &current,
+                                                   new_value));
+}
+
+/* Tries to atomically subtract 'n' from 'value'.  Does not perform the
+ * operation and returns 'false' if the result will be less than 'min_value'.
+ * Otherwise, stores the result and returns 'true'. */
+static bool
+atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value)
+{
+    uint64_t current;
+
+    atomic_read_relaxed(value, &current);
+    do {
+        if (current < min_value + n) {
+            return false;
+        }
+    } while (!atomic_compare_exchange_weak_relaxed(value, &current,
+                                                   current - n));
+    return true;
+}
+
 /* Applies the meter identified by 'meter_id' to 'packets_'.  Packets
  * that exceed a band are dropped in-place. */
 static void
 dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
-                    uint32_t meter_id, long long int now)
+                    uint32_t meter_id, long long int now_ms)
 {
-    struct dp_meter *meter;
-    struct dp_meter_band *band;
-    struct dp_packet *packet;
-    long long int long_delta_t; /* msec */
-    uint32_t delta_t; /* msec */
     const size_t cnt = dp_packet_batch_size(packets_);
-    uint32_t bytes, volume;
-    int exceeded_band[NETDEV_MAX_BURST];
     uint32_t exceeded_rate[NETDEV_MAX_BURST];
-    int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */
+    uint32_t exceeded_band[NETDEV_MAX_BURST];
+    uint64_t bytes, volume, meter_used, old;
+    uint64_t band_packets[MAX_BANDS];
+    uint64_t band_bytes[MAX_BANDS];
+    struct dp_meter_band *band;
+    struct dp_packet *packet;
+    struct dp_meter *meter;
+    bool exceeded = false;
 
     if (meter_id >= MAX_METERS) {
         return;
@@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct 
dp_packet_batch *packets_,
     /* Initialize as zeroes. */
     memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
 
-    ovs_mutex_lock(&meter->lock);
-    /* All packets will hit the meter at the same time. */
-    long_delta_t = now / 1000 - meter->used / 1000; /* msec */
+    atomic_read_relaxed(&meter->used, &meter_used);
+    do {
+        if (meter_used >= now_ms) {
+            /* The '>' condition means that we have several threads hitting the
+             * same meter, and the other one already advanced the time. */
+            meter_used = now_ms;
+            break;
+        }
+    } while (!atomic_compare_exchange_weak_relaxed(&meter->used,
+                                                   &meter_used, now_ms));
 
-    if (long_delta_t < 0) {
-        /* This condition means that we have several threads fighting for a
-           meter lock, and the one who received the packets a bit later wins.
-           Assuming that all racing threads received packets at the same time
-           to avoid overflow. */
-        long_delta_t = 0;
-    }
+    /* Refill all buckets right away, since other threads may use them. */
+    if (meter_used < now_ms) {
+        /* All packets will hit the meter at the same time. */
+        uint64_t delta_t = now_ms - meter_used;
+
+        /* Make sure delta_t will not be too large, so that bucket will not
+         * wrap around below. */
+        delta_t = MIN(delta_t, meter->max_delta_t);
 
-    /* Make sure delta_t will not be too large, so that bucket will not
-     * wrap around below. */
-    delta_t = (long_delta_t > (long long int)meter->max_delta_t)
-        ? meter->max_delta_t : (uint32_t)long_delta_t;
+        for (int m = 0; m < meter->n_bands; m++) {
+            band = &meter->bands[m];
+            /* Update band's bucket.  We can't just use atomic add here,
+             * because we should never add above the max capacity. */
+            atomic_sat_add(&band->bucket, delta_t * band->rate,
+                           band->burst_size * 1000ULL);
+        }
+    }
 
     /* Update meter stats. */
-    meter->used = now;
-    meter->packet_count += cnt;
+    atomic_add_relaxed(&meter->packet_count, cnt, &old);
     bytes = 0;
     DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
         bytes += dp_packet_size(packet);
     }
-    meter->byte_count += bytes;
+    atomic_add_relaxed(&meter->byte_count, bytes, &old);
 
     /* Meters can operate in terms of packets per second or kilobits per
      * second. */
     if (meter->flags & OFPMF13_PKTPS) {
-        /* Rate in packets/second, bucket 1/1000 packets. */
-        /* msec * packets/sec = 1/1000 packets. */
+        /* Rate in packets/second, bucket 1/1000 packets.
+         * msec * packets/sec = 1/1000 packets. */
         volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */
     } else {
-        /* Rate in kbps, bucket in bits. */
-        /* msec * kbps = bits */
+        /* Rate in kbps, bucket in bits.
+         * msec * kbps = bits */
         volume = bytes * 8;
     }
 
-    /* Update all bands and find the one hit with the highest rate for each
-     * packet (if any). */
-    for (int m = 0; m < meter->n_bands; ++m) {
-        uint64_t max_bucket_size;
-
+    /* Find the band hit with the highest rate for each packet (if any). */
+    for (int m = 0; m < meter->n_bands; m++) {
         band = &meter->bands[m];
-        max_bucket_size = band->burst_size * 1000ULL;
-        /* Update band's bucket. */
-        band->bucket += (uint64_t) delta_t * band->rate;
-        if (band->bucket > max_bucket_size) {
-            band->bucket = max_bucket_size;
-        }
 
         /* Drain the bucket for all the packets, if possible. */
-        if (band->bucket >= volume) {
-            band->bucket -= volume;
-        } else {
-            int band_exceeded_pkt;
-
-            /* Band limit hit, must process packet-by-packet. */
-            if (meter->flags & OFPMF13_PKTPS) {
-                band_exceeded_pkt = band->bucket / 1000;
-                band->bucket %= 1000; /* Remainder stays in bucket. */
-
-                /* Update the exceeding band for each exceeding packet.
-                 * (Only one band will be fired by a packet, and that
-                 * can be different for each packet.) */
-                for (int i = band_exceeded_pkt; i < cnt; i++) {
-                    if (band->rate > exceeded_rate[i]) {
-                        exceeded_rate[i] = band->rate;
-                        exceeded_band[i] = m;
-                    }
-                }
-            } else {
-                /* Packet sizes differ, must process one-by-one. */
-                band_exceeded_pkt = cnt;
-                DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
-                    uint32_t bits = dp_packet_size(packet) * 8;
-
-                    if (band->bucket >= bits) {
-                        band->bucket -= bits;
-                    } else {
-                        if (i < band_exceeded_pkt) {
-                            band_exceeded_pkt = i;
-                        }
-                        /* Update the exceeding band for the exceeding packet.
-                         * (Only one band will be fired by a packet, and that
-                         * can be different for each packet.) */
-                        if (band->rate > exceeded_rate[i]) {
-                            exceeded_rate[i] = band->rate;
-                            exceeded_band[i] = m;
-                        }
-                    }
+        if (atomic_bound_sub(&band->bucket, volume, 0)) {
+            continue;
+        }
+
+        /* Band limit hit, must process packet-by-packet. */
+        DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
+            uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS)
+                                     ? 1000 : (dp_packet_size(packet) * 8);
+
+            if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) {
+                /* Update the exceeding band for the exceeding packet.
+                 * Only one band will be fired by a packet, and that can
+                 * be different for each packet. */
+                if (band->rate > exceeded_rate[i]) {
+                    exceeded_rate[i] = band->rate;
+                    exceeded_band[i] = m;
+                    exceeded = true;
                 }
             }
-            /* Remember the first exceeding packet. */
-            if (exceeded_pkt > band_exceeded_pkt) {
-                exceeded_pkt = band_exceeded_pkt;
-            }
         }
     }
 
+    /* No need to iterate over packets if there are no drops. */
+    if (!exceeded) {
+        return;
+    }
+
     /* Fire the highest rate band exceeded by each packet, and drop
      * packets if needed. */
+
+    memset(band_packets, 0, sizeof band_packets);
+    memset(band_bytes,   0, sizeof band_bytes);
+
     size_t j;
     DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
-        if (exceeded_band[j] >= 0) {
+        uint32_t m = exceeded_band[j];
+
+        if (m != UINT32_MAX) {
             /* Meter drop packet. */
-            band = &meter->bands[exceeded_band[j]];
-            band->packet_count += 1;
-            band->byte_count += dp_packet_size(packet);
-            COVERAGE_INC(datapath_drop_meter);
+            band = &meter->bands[m];
+            band_packets[m]++;
+            band_bytes[m] += dp_packet_size(packet);
             dp_packet_delete(packet);
         } else {
             /* Meter accepts packet. */
@@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct 
dp_packet_batch *packets_,
         }
     }
 
-    ovs_mutex_unlock(&meter->lock);
+    for (int m = 0; m < meter->n_bands; m++) {
+        if (!band_packets[m]) {
+            continue;
+        }
+        band = &meter->bands[m];
+        atomic_add_relaxed(&band->packet_count, band_packets[m], &old);
+        atomic_add_relaxed(&band->byte_count,   band_bytes[m],   &old);
+        COVERAGE_ADD(datapath_drop_meter, band_packets[m]);
+    }
 }
 
 /* Meter set/get/del processing is still single-threaded. */
@@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif, 
ofproto_meter_id meter_id,
     meter->flags = config->flags;
     meter->n_bands = config->n_bands;
     meter->max_delta_t = 0;
-    meter->used = time_usec();
     meter->id = mid;
-    ovs_mutex_init_adaptive(&meter->lock);
+    atomic_init(&meter->used, time_msec());
 
     /* set up bands */
     for (i = 0; i < config->n_bands; ++i) {
         uint32_t band_max_delta_t;
+        uint64_t bucket_size;
 
         /* Set burst size to a workable value if none specified. */
         if (config->bands[i].burst_size == 0) {
@@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif, 
ofproto_meter_id meter_id,
         meter->bands[i].rate = config->bands[i].rate;
         meter->bands[i].burst_size = config->bands[i].burst_size;
         /* Start with a full bucket. */
-        meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL;
+        bucket_size = meter->bands[i].burst_size * 1000ULL;
+        atomic_init(&meter->bands[i].bucket, bucket_size);
 
         /* Figure out max delta_t that is enough to fill any bucket. */
-        band_max_delta_t
-            = meter->bands[i].bucket / meter->bands[i].rate;
+        band_max_delta_t = bucket_size / meter->bands[i].rate;
         if (band_max_delta_t > meter->max_delta_t) {
             meter->max_delta_t = band_max_delta_t;
         }
@@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif,
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     uint32_t meter_id = meter_id_.uint32;
-    const struct dp_meter *meter;
+    struct dp_meter *meter;
 
     if (meter_id >= MAX_METERS) {
         return EFBIG;
@@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif,
     if (stats) {
         int i = 0;
 
-        ovs_mutex_lock(&meter->lock);
-
-        stats->packet_in_count = meter->packet_count;
-        stats->byte_in_count = meter->byte_count;
+        atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count);
+        atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count);
 
         for (i = 0; i < n_bands && i < meter->n_bands; ++i) {
-            stats->bands[i].packet_count = meter->bands[i].packet_count;
-            stats->bands[i].byte_count = meter->bands[i].byte_count;
+            atomic_read_relaxed(&meter->bands[i].packet_count,
+                                &stats->bands[i].packet_count);
+            atomic_read_relaxed(&meter->bands[i].byte_count,
+                                &stats->bands[i].byte_count);
         }
-
-        ovs_mutex_unlock(&meter->lock);
         stats->n_bands = i;
     }
 
@@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
*packets_,
 
     case OVS_ACTION_ATTR_METER:
         dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a),
-                            pmd->ctx.now);
+                            pmd->ctx.now / 1000);
         break;
 
     case OVS_ACTION_ATTR_PUSH_VLAN:
-- 
2.40.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to