On 29 Jun 2023, at 17:54, Ilya Maximets wrote:

> On 6/29/23 17:43, Eelco Chaudron wrote:
>>
>>
>> On 22 Jun 2023, at 0:32, Ilya Maximets wrote:
>>
>>> Current implementation of meters in the userspace datapath takes
>>> the meter lock for every packet batch.  If more than one thread
>>> hits the flow with the same meter, they will lock each other.
>>>
>>> Replace the critical section with atomic operations to avoid
>>> interlocking.  Meters themselves are RCU-protected, so it's safe
>>> to access them without holding a lock.
>>>
>>> Implementation does the following:
>>>
>>>  1. Tries to advance the 'used' timer of the meter with atomic
>>>     compare+exchange if it's smaller than 'now'.
>>>  2. If the timer change succeeds, atomically update band buckets.
>>>  3. Atomically update packet statistics for a meter.
>>>  4. Go over buckets and try to atomically subtract the amount of
>>>     packets or bytes, recording the highest exceeded band.
>>>  5. Atomically update band statistics and drop packets.
>>>
>>> Bucket manipulations are implemented with atomic compare+exchange
>>> operations with extra checks, because bucket size should never
>>> exceed the maximum and it should never go below zero.
>>>
>>> Packet statistics may be momentarily inconsistent, i.e., number
>>> of packets and the number of bytes may reflect different sets
>>> of packets.  But it should be eventually consistent.  And the
>>> difference at any given time should be in just few packets.
>>>
>>> For the sake of reduced code complexity PKTPS meter tries to push
>>> packets through the band one by one, even though they all have
>>> the same weight.  This is also more fair if more than one thread
>>> is passing packets through the same band at the same time.
>>> Trying to predict the number of packets that can pass may also
>>> cause extra atomic operations reducing the performance.
>>>
>>> This implementation shows similar performance to the previous one,
>>> but should scale better with more threads hiting the same meter.
>>
>> This works looks great!! Some small comments below. Did limited testing and 
>> seems to work fine.
>>
>> Cheers,
>>
>> Eelco
>>
>>> Signed-off-by: Ilya Maximets <i.maxim...@ovn.org>
>>> ---
>>>
>>> @Lin Huang, if you can try this change on your setup, that
>>> would be great.
>>>
>>>  NEWS              |   2 +
>>>  lib/dpif-netdev.c | 250 +++++++++++++++++++++++++---------------------
>>>  2 files changed, 140 insertions(+), 112 deletions(-)
>>>
>>> diff --git a/NEWS b/NEWS
>>> index 66d5a4ea3..4a2b7dbca 100644
>>> --- a/NEWS
>>> +++ b/NEWS
>>> @@ -37,6 +37,8 @@ Post-v3.1.0
>>>     - SRv6 Tunnel Protocol
>>>       * Added support for userspace datapath (only).
>>>     - Userspace datapath:
>>> +     * Implementation of OpenFlow meters is now lockless allowing for 
>>> better
>>> +       multi-thread scalability.
>>>       * IP and L4 checksum offload support is now enabled by default for
>>>         interfaces that support it.  See the 'status' column in the 
>>> 'interface'
>>>         table to check the status.
>>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>>> index abe63412e..2fa556a62 100644
>>> --- a/lib/dpif-netdev.c
>>> +++ b/lib/dpif-netdev.c
>>> @@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct 
>>> dpcls_rule *);
>>>  struct dp_meter_band {
>>>      uint32_t rate;
>>>      uint32_t burst_size;
>>> -    uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for 
>>> KBPS) */
>>> -    uint64_t packet_count;
>>> -    uint64_t byte_count;
>>> +    atomic_uint64_t bucket;          /* In 1/1000 packets for PKTPS,
>>> +                                      * or in bits for KBPS. */
>>> +    atomic_uint64_t packet_count;
>>> +    atomic_uint64_t byte_count;
>>>  };
>>>
>>>  struct dp_meter {
>>>      struct cmap_node node;
>>> -    struct ovs_mutex lock;
>>>      uint32_t id;
>>>      uint16_t flags;
>>>      uint16_t n_bands;
>>>      uint32_t max_delta_t;
>>> -    uint64_t used;
>>> -    uint64_t packet_count;
>>> -    uint64_t byte_count;
>>> +    atomic_uint64_t used;  /* Time of a last use in milliseconds. */
>>> +    atomic_uint64_t packet_count;
>>> +    atomic_uint64_t byte_count;
>>>      struct dp_meter_band bands[];
>>>  };
>>>
>>> @@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif * 
>>> dpif OVS_UNUSED,
>>>      features->max_color = 0;
>>>  }
>>>
>>> +/* Tries to atomically add 'n' to 'value' in terms of saturation 
>>> arithmetic,
>>> + * i.e., if the result will be larger than 'max_value', will store 
>>> 'max_value'
>>> + * instead. */
>>> +static void
>>> +atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value)
>>> +{
>>> +    uint64_t current, new_value;
>>> +
>>> +    atomic_read_relaxed(value, &current);
>>> +    do {
>>> +        new_value = current + n;
>>> +        new_value = MIN(new_value, max_value);
>>> +    } while (!atomic_compare_exchange_weak_relaxed(value, &current,
>>> +                                                   new_value));
>>> +}
>>> +
>>> +/* Tries to atomically subtract 'n' from 'value'.  Does not perform the
>>> + * operation and returns 'false' if the result will be less than 
>>> 'min_value'.
>>> + * Otherwise, stores the result and returns 'true'. */
>>> +static bool
>>> +atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value)
>>> +{
>>> +    uint64_t current;
>>> +
>>> +    atomic_read_relaxed(value, &current);
>>> +    do {
>>> +        if (current < min_value + n) {
>>> +            return false;
>>> +        }
>>> +    } while (!atomic_compare_exchange_weak_relaxed(value, &current,
>>> +                                                   current - n));
>>> +    return true;
>>> +}
>>> +
>>>  /* Applies the meter identified by 'meter_id' to 'packets_'.  Packets
>>>   * that exceed a band are dropped in-place. */
>>>  static void
>>>  dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
>>> -                    uint32_t meter_id, long long int now)
>>> +                    uint32_t meter_id, long long int now_ms)
>>>  {
>>> -    struct dp_meter *meter;
>>> -    struct dp_meter_band *band;
>>> -    struct dp_packet *packet;
>>> -    long long int long_delta_t; /* msec */
>>> -    uint32_t delta_t; /* msec */
>>>      const size_t cnt = dp_packet_batch_size(packets_);
>>> -    uint32_t bytes, volume;
>>> -    int exceeded_band[NETDEV_MAX_BURST];
>>>      uint32_t exceeded_rate[NETDEV_MAX_BURST];
>>> -    int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */
>>> +    uint32_t exceeded_band[NETDEV_MAX_BURST];
>>> +    uint64_t bytes, volume, meter_used, old;
>>> +    uint64_t band_packets[MAX_BANDS];
>>> +    uint64_t band_bytes[MAX_BANDS];
>>> +    struct dp_meter_band *band;
>>> +    struct dp_packet *packet;
>>> +    struct dp_meter *meter;
>>> +    bool exceeded = false;
>>>
>>>      if (meter_id >= MAX_METERS) {
>>>          return;
>>> @@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct 
>>> dp_packet_batch *packets_,
>>>      /* Initialize as zeroes. */
>>>      memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>>>
>>> -    ovs_mutex_lock(&meter->lock);
>>> -    /* All packets will hit the meter at the same time. */
>>> -    long_delta_t = now / 1000 - meter->used / 1000; /* msec */
>>> +    atomic_read_relaxed(&meter->used, &meter_used);
>>> +    do {
>>> +        if (meter_used >= now_ms) {
>>> +            /* The '>' condition means that we have several threads 
>>> hitting the
>>> +             * same meter, and the other one already advanced the time. */
>>> +            meter_used = now_ms;
>>> +            break;
>>> +        }
>>> +    } while (!atomic_compare_exchange_weak_relaxed(&meter->used,
>>> +                                                   &meter_used, now_ms));
>>>
>>> -    if (long_delta_t < 0) {
>>> -        /* This condition means that we have several threads fighting for a
>>> -           meter lock, and the one who received the packets a bit later 
>>> wins.
>>> -           Assuming that all racing threads received packets at the same 
>>> time
>>> -           to avoid overflow. */
>>> -        long_delta_t = 0;
>>> -    }
>>> +    /* Refill all buckets right away, since other threads may use them. */
>>> +    if (meter_used < now_ms) {
>>> +        /* All packets will hit the meter at the same time. */
>>> +        uint64_t delta_t = now_ms - meter_used;
>>> +
>>> +        /* Make sure delta_t will not be too large, so that bucket will not
>>> +         * wrap around below. */
>>> +        delta_t = MIN(delta_t, meter->max_delta_t);
>>>
>>> -    /* Make sure delta_t will not be too large, so that bucket will not
>>> -     * wrap around below. */
>>> -    delta_t = (long_delta_t > (long long int)meter->max_delta_t)
>>> -        ? meter->max_delta_t : (uint32_t)long_delta_t;
>>> +        for (int m = 0; m < meter->n_bands; m++) {
>>> +            band = &meter->bands[m];
>>> +            /* Update band's bucket.  We can't just use atomic add here,
>>> +             * because we should never add above the max capacity. */
>>> +            atomic_sat_add(&band->bucket, delta_t * band->rate,
>>> +                           band->burst_size * 1000ULL);
>>> +        }
>>> +    }
>>>
>>>      /* Update meter stats. */
>>> -    meter->used = now;
>>> -    meter->packet_count += cnt;
>>> +    atomic_add_relaxed(&meter->packet_count, cnt, &old);
>>>      bytes = 0;
>>>      DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>>>          bytes += dp_packet_size(packet);
>>>      }
>>> -    meter->byte_count += bytes;
>>> +    atomic_add_relaxed(&meter->byte_count, bytes, &old);
>>>
>>>      /* Meters can operate in terms of packets per second or kilobits per
>>>       * second. */
>>>      if (meter->flags & OFPMF13_PKTPS) {
>>> -        /* Rate in packets/second, bucket 1/1000 packets. */
>>> -        /* msec * packets/sec = 1/1000 packets. */
>>> +        /* Rate in packets/second, bucket 1/1000 packets.
>>> +         * msec * packets/sec = 1/1000 packets. */
>>>          volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */
>>>      } else {
>>> -        /* Rate in kbps, bucket in bits. */
>>> -        /* msec * kbps = bits */
>>> +        /* Rate in kbps, bucket in bits.
>>> +         * msec * kbps = bits */
>>>          volume = bytes * 8;
>>>      }
>>>
>>> -    /* Update all bands and find the one hit with the highest rate for each
>>> -     * packet (if any). */
>>> -    for (int m = 0; m < meter->n_bands; ++m) {
>>> -        uint64_t max_bucket_size;
>>> -
>>> +    /* Find the band hit with the highest rate for each packet (if any). */
>>> +    for (int m = 0; m < meter->n_bands; m++) {
>>>          band = &meter->bands[m];
>>> -        max_bucket_size = band->burst_size * 1000ULL;
>>> -        /* Update band's bucket. */
>>> -        band->bucket += (uint64_t) delta_t * band->rate;
>>> -        if (band->bucket > max_bucket_size) {
>>> -            band->bucket = max_bucket_size;
>>> -        }
>>>
>>>          /* Drain the bucket for all the packets, if possible. */
>>> -        if (band->bucket >= volume) {
>>> -            band->bucket -= volume;
>>> -        } else {
>>> -            int band_exceeded_pkt;
>>> -
>>> -            /* Band limit hit, must process packet-by-packet. */
>>> -            if (meter->flags & OFPMF13_PKTPS) {
>>> -                band_exceeded_pkt = band->bucket / 1000;
>>> -                band->bucket %= 1000; /* Remainder stays in bucket. */
>>> -
>>> -                /* Update the exceeding band for each exceeding packet.
>>> -                 * (Only one band will be fired by a packet, and that
>>> -                 * can be different for each packet.) */
>>> -                for (int i = band_exceeded_pkt; i < cnt; i++) {
>>> -                    if (band->rate > exceeded_rate[i]) {
>>> -                        exceeded_rate[i] = band->rate;
>>> -                        exceeded_band[i] = m;
>>> -                    }
>>> -                }
>>> -            } else {
>>> -                /* Packet sizes differ, must process one-by-one. */
>>> -                band_exceeded_pkt = cnt;
>>> -                DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>>> -                    uint32_t bits = dp_packet_size(packet) * 8;
>>> -
>>> -                    if (band->bucket >= bits) {
>>> -                        band->bucket -= bits;
>>> -                    } else {
>>> -                        if (i < band_exceeded_pkt) {
>>> -                            band_exceeded_pkt = i;
>>> -                        }
>>> -                        /* Update the exceeding band for the exceeding 
>>> packet.
>>> -                         * (Only one band will be fired by a packet, and 
>>> that
>>> -                         * can be different for each packet.) */
>>> -                        if (band->rate > exceeded_rate[i]) {
>>> -                            exceeded_rate[i] = band->rate;
>>> -                            exceeded_band[i] = m;
>>> -                        }
>>> -                    }
>>> +        if (atomic_bound_sub(&band->bucket, volume, 0)) {
>>> +            continue;
>>> +        }
>>> +
>>> +        /* Band limit hit, must process packet-by-packet. */
>>> +        DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>>> +            uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS)
>>> +                                     ? 1000 : (dp_packet_size(packet) * 8);
>>> +
>>> +            if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) {
>>> +                /* Update the exceeding band for the exceeding packet.
>>> +                 * Only one band will be fired by a packet, and that can
>>> +                 * be different for each packet. */
>>> +                if (band->rate > exceeded_rate[i]) {
>>> +                    exceeded_rate[i] = band->rate;
>>> +                    exceeded_band[i] = m;
>>> +                    exceeded = true;
>>>                  }
>>>              }
>>> -            /* Remember the first exceeding packet. */
>>> -            if (exceeded_pkt > band_exceeded_pkt) {
>>> -                exceeded_pkt = band_exceeded_pkt;
>>> -            }
>>>          }
>>>      }
>>>
>>> +    /* No need to iterate over packets if there are no drops. */
>>> +    if (!exceeded) {
>>> +        return;
>>> +    }
>>> +
>>>      /* Fire the highest rate band exceeded by each packet, and drop
>>>       * packets if needed. */
>>> +
>>
>> Don’t think we need this newline.
>
> The comment above applies to the whole section below.  Without an empty
> line it looks like it's a comment for memsets, and it is not.
>
> I'd like to keep it, but can remove if you insist. :)

It’s fine, it just triggered my OCD…

>>
>>> +    memset(band_packets, 0, sizeof band_packets);
>>> +    memset(band_bytes,   0, sizeof band_bytes);
>>
>> Are these extra spaces for alignmgent on purpose?
>
> Yes, seemed easier to read this way.

I’m fine with it, but it just stood out when reviewing. If I set the code 
style, I even like to do this for variable declarations (and sort by type); 
https://github.com/xdp-project/xdp-tools/blob/740c839806a02517da5bce7bd0ccaba908b3f675/xdp-dump/xdpdump.c#L963

>>
>>> +
>>>      size_t j;
>>>      DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
>>> -        if (exceeded_band[j] >= 0) {
>>> +        uint32_t m = exceeded_band[j];
>>> +
>>> +        if (m != UINT32_MAX) {
>>>              /* Meter drop packet. */
>>> -            band = &meter->bands[exceeded_band[j]];
>>> -            band->packet_count += 1;
>>> -            band->byte_count += dp_packet_size(packet);
>>> -            COVERAGE_INC(datapath_drop_meter);
>>> +            band = &meter->bands[m];
>>> +            band_packets[m]++;
>>> +            band_bytes[m] += dp_packet_size(packet);
>>
>>
>> This code now looks like this (the diff is a mess to comment on):
>>
>>         if (m != UINT32_MAX) {
>>             /* Meter drop packet. */
>>             band = &meter->bands[m];
>>
>> !           ^^^ This line can be removed as band is not used.
>
> True.  Can remove.

Thanks, now I see that even shows up with clang-analyze.

I guess this is the only real change needed. So if you roll it in during commit 
time:

Acked-by: Eelco Chaudron <echau...@redhat.com>

>>
>>             band_packets[m]++;
>>             band_bytes[m] += dp_packet_size(packet);
>>             dp_packet_delete(packet);
>>         } else {
>>             /* Meter accepts packet. */
>>             dp_packet_batch_refill(packets_, packet, j);
>>         }
>>     }
>>
>>>              dp_packet_delete(packet);
>>>          } else {
>>>              /* Meter accepts packet. */
>>> @@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct 
>>> dp_packet_batch *packets_,
>>>          }
>>>      }
>>>
>>> -    ovs_mutex_unlock(&meter->lock);
>>> +    for (int m = 0; m < meter->n_bands; m++) {
>>> +        if (!band_packets[m]) {
>>> +            continue;
>>> +        }
>>> +        band = &meter->bands[m];
>>> +        atomic_add_relaxed(&band->packet_count, band_packets[m], &old);
>>> +        atomic_add_relaxed(&band->byte_count,   band_bytes[m],   &old);
>>
>> Are these extra spaces for alignment on purpose?
>
> Yep, same as with memset.  This code a is a bit complex and alignment
> makes it easier to read, IMO.

Fine here, and yes code is complex enough for me to go over it multiple times 
ending up with a lot of questions that got all answered in the end ;).

>>
>>> +        COVERAGE_ADD(datapath_drop_meter, band_packets[m]);
>>> +    }
>>>  }
>>>
>>>  /* Meter set/get/del processing is still single-threaded. */
>>> @@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif, 
>>> ofproto_meter_id meter_id,
>>>      meter->flags = config->flags;
>>>      meter->n_bands = config->n_bands;
>>>      meter->max_delta_t = 0;
>>> -    meter->used = time_usec();
>>>      meter->id = mid;
>>> -    ovs_mutex_init_adaptive(&meter->lock);
>>> +    atomic_init(&meter->used, time_msec());
>>>
>>>      /* set up bands */
>>>      for (i = 0; i < config->n_bands; ++i) {
>>>          uint32_t band_max_delta_t;
>>> +        uint64_t bucket_size;
>>>
>>>          /* Set burst size to a workable value if none specified. */
>>>          if (config->bands[i].burst_size == 0) {
>>> @@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif, 
>>> ofproto_meter_id meter_id,
>>>          meter->bands[i].rate = config->bands[i].rate;
>>>          meter->bands[i].burst_size = config->bands[i].burst_size;
>>>          /* Start with a full bucket. */
>>> -        meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL;
>>> +        bucket_size = meter->bands[i].burst_size * 1000ULL;
>>> +        atomic_init(&meter->bands[i].bucket, bucket_size);
>>>
>>>          /* Figure out max delta_t that is enough to fill any bucket. */
>>> -        band_max_delta_t
>>> -            = meter->bands[i].bucket / meter->bands[i].rate;
>>> +        band_max_delta_t = bucket_size / meter->bands[i].rate;
>>>          if (band_max_delta_t > meter->max_delta_t) {
>>>              meter->max_delta_t = band_max_delta_t;
>>>          }
>>> @@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif,
>>>  {
>>>      struct dp_netdev *dp = get_dp_netdev(dpif);
>>>      uint32_t meter_id = meter_id_.uint32;
>>> -    const struct dp_meter *meter;
>>> +    struct dp_meter *meter;
>>>
>>>      if (meter_id >= MAX_METERS) {
>>>          return EFBIG;
>>> @@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif,
>>>      if (stats) {
>>>          int i = 0;
>>>
>>> -        ovs_mutex_lock(&meter->lock);
>>> -
>>> -        stats->packet_in_count = meter->packet_count;
>>> -        stats->byte_in_count = meter->byte_count;
>>> +        atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count);
>>> +        atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count);
>>>
>>>          for (i = 0; i < n_bands && i < meter->n_bands; ++i) {
>>> -            stats->bands[i].packet_count = meter->bands[i].packet_count;
>>> -            stats->bands[i].byte_count = meter->bands[i].byte_count;
>>> +            atomic_read_relaxed(&meter->bands[i].packet_count,
>>> +                                &stats->bands[i].packet_count);
>>> +            atomic_read_relaxed(&meter->bands[i].byte_count,
>>> +                                &stats->bands[i].byte_count);
>>>          }
>>> -
>>> -        ovs_mutex_unlock(&meter->lock);
>>>          stats->n_bands = i;
>>>      }
>>>
>>> @@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
>>> *packets_,
>>>
>>>      case OVS_ACTION_ATTR_METER:
>>>          dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a),
>>> -                            pmd->ctx.now);
>>> +                            pmd->ctx.now / 1000);
>>>          break;
>>>
>>>      case OVS_ACTION_ATTR_PUSH_VLAN:
>>> -- 
>>> 2.40.1
>>

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to