On 29 Jun 2023, at 17:54, Ilya Maximets wrote:
> On 6/29/23 17:43, Eelco Chaudron wrote: >> >> >> On 22 Jun 2023, at 0:32, Ilya Maximets wrote: >> >>> Current implementation of meters in the userspace datapath takes >>> the meter lock for every packet batch. If more than one thread >>> hits the flow with the same meter, they will lock each other. >>> >>> Replace the critical section with atomic operations to avoid >>> interlocking. Meters themselves are RCU-protected, so it's safe >>> to access them without holding a lock. >>> >>> Implementation does the following: >>> >>> 1. Tries to advance the 'used' timer of the meter with atomic >>> compare+exchange if it's smaller than 'now'. >>> 2. If the timer change succeeds, atomically update band buckets. >>> 3. Atomically update packet statistics for a meter. >>> 4. Go over buckets and try to atomically subtract the amount of >>> packets or bytes, recording the highest exceeded band. >>> 5. Atomically update band statistics and drop packets. >>> >>> Bucket manipulations are implemented with atomic compare+exchange >>> operations with extra checks, because bucket size should never >>> exceed the maximum and it should never go below zero. >>> >>> Packet statistics may be momentarily inconsistent, i.e., number >>> of packets and the number of bytes may reflect different sets >>> of packets. But it should be eventually consistent. And the >>> difference at any given time should be in just few packets. >>> >>> For the sake of reduced code complexity PKTPS meter tries to push >>> packets through the band one by one, even though they all have >>> the same weight. This is also more fair if more than one thread >>> is passing packets through the same band at the same time. >>> Trying to predict the number of packets that can pass may also >>> cause extra atomic operations reducing the performance. >>> >>> This implementation shows similar performance to the previous one, >>> but should scale better with more threads hiting the same meter. >> >> This works looks great!! Some small comments below. Did limited testing and >> seems to work fine. >> >> Cheers, >> >> Eelco >> >>> Signed-off-by: Ilya Maximets <i.maxim...@ovn.org> >>> --- >>> >>> @Lin Huang, if you can try this change on your setup, that >>> would be great. >>> >>> NEWS | 2 + >>> lib/dpif-netdev.c | 250 +++++++++++++++++++++++++--------------------- >>> 2 files changed, 140 insertions(+), 112 deletions(-) >>> >>> diff --git a/NEWS b/NEWS >>> index 66d5a4ea3..4a2b7dbca 100644 >>> --- a/NEWS >>> +++ b/NEWS >>> @@ -37,6 +37,8 @@ Post-v3.1.0 >>> - SRv6 Tunnel Protocol >>> * Added support for userspace datapath (only). >>> - Userspace datapath: >>> + * Implementation of OpenFlow meters is now lockless allowing for >>> better >>> + multi-thread scalability. >>> * IP and L4 checksum offload support is now enabled by default for >>> interfaces that support it. See the 'status' column in the >>> 'interface' >>> table to check the status. >>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >>> index abe63412e..2fa556a62 100644 >>> --- a/lib/dpif-netdev.c >>> +++ b/lib/dpif-netdev.c >>> @@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct >>> dpcls_rule *); >>> struct dp_meter_band { >>> uint32_t rate; >>> uint32_t burst_size; >>> - uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for >>> KBPS) */ >>> - uint64_t packet_count; >>> - uint64_t byte_count; >>> + atomic_uint64_t bucket; /* In 1/1000 packets for PKTPS, >>> + * or in bits for KBPS. */ >>> + atomic_uint64_t packet_count; >>> + atomic_uint64_t byte_count; >>> }; >>> >>> struct dp_meter { >>> struct cmap_node node; >>> - struct ovs_mutex lock; >>> uint32_t id; >>> uint16_t flags; >>> uint16_t n_bands; >>> uint32_t max_delta_t; >>> - uint64_t used; >>> - uint64_t packet_count; >>> - uint64_t byte_count; >>> + atomic_uint64_t used; /* Time of a last use in milliseconds. */ >>> + atomic_uint64_t packet_count; >>> + atomic_uint64_t byte_count; >>> struct dp_meter_band bands[]; >>> }; >>> >>> @@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif * >>> dpif OVS_UNUSED, >>> features->max_color = 0; >>> } >>> >>> +/* Tries to atomically add 'n' to 'value' in terms of saturation >>> arithmetic, >>> + * i.e., if the result will be larger than 'max_value', will store >>> 'max_value' >>> + * instead. */ >>> +static void >>> +atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value) >>> +{ >>> + uint64_t current, new_value; >>> + >>> + atomic_read_relaxed(value, ¤t); >>> + do { >>> + new_value = current + n; >>> + new_value = MIN(new_value, max_value); >>> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, >>> + new_value)); >>> +} >>> + >>> +/* Tries to atomically subtract 'n' from 'value'. Does not perform the >>> + * operation and returns 'false' if the result will be less than >>> 'min_value'. >>> + * Otherwise, stores the result and returns 'true'. */ >>> +static bool >>> +atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value) >>> +{ >>> + uint64_t current; >>> + >>> + atomic_read_relaxed(value, ¤t); >>> + do { >>> + if (current < min_value + n) { >>> + return false; >>> + } >>> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, >>> + current - n)); >>> + return true; >>> +} >>> + >>> /* Applies the meter identified by 'meter_id' to 'packets_'. Packets >>> * that exceed a band are dropped in-place. */ >>> static void >>> dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_, >>> - uint32_t meter_id, long long int now) >>> + uint32_t meter_id, long long int now_ms) >>> { >>> - struct dp_meter *meter; >>> - struct dp_meter_band *band; >>> - struct dp_packet *packet; >>> - long long int long_delta_t; /* msec */ >>> - uint32_t delta_t; /* msec */ >>> const size_t cnt = dp_packet_batch_size(packets_); >>> - uint32_t bytes, volume; >>> - int exceeded_band[NETDEV_MAX_BURST]; >>> uint32_t exceeded_rate[NETDEV_MAX_BURST]; >>> - int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */ >>> + uint32_t exceeded_band[NETDEV_MAX_BURST]; >>> + uint64_t bytes, volume, meter_used, old; >>> + uint64_t band_packets[MAX_BANDS]; >>> + uint64_t band_bytes[MAX_BANDS]; >>> + struct dp_meter_band *band; >>> + struct dp_packet *packet; >>> + struct dp_meter *meter; >>> + bool exceeded = false; >>> >>> if (meter_id >= MAX_METERS) { >>> return; >>> @@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct >>> dp_packet_batch *packets_, >>> /* Initialize as zeroes. */ >>> memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate); >>> >>> - ovs_mutex_lock(&meter->lock); >>> - /* All packets will hit the meter at the same time. */ >>> - long_delta_t = now / 1000 - meter->used / 1000; /* msec */ >>> + atomic_read_relaxed(&meter->used, &meter_used); >>> + do { >>> + if (meter_used >= now_ms) { >>> + /* The '>' condition means that we have several threads >>> hitting the >>> + * same meter, and the other one already advanced the time. */ >>> + meter_used = now_ms; >>> + break; >>> + } >>> + } while (!atomic_compare_exchange_weak_relaxed(&meter->used, >>> + &meter_used, now_ms)); >>> >>> - if (long_delta_t < 0) { >>> - /* This condition means that we have several threads fighting for a >>> - meter lock, and the one who received the packets a bit later >>> wins. >>> - Assuming that all racing threads received packets at the same >>> time >>> - to avoid overflow. */ >>> - long_delta_t = 0; >>> - } >>> + /* Refill all buckets right away, since other threads may use them. */ >>> + if (meter_used < now_ms) { >>> + /* All packets will hit the meter at the same time. */ >>> + uint64_t delta_t = now_ms - meter_used; >>> + >>> + /* Make sure delta_t will not be too large, so that bucket will not >>> + * wrap around below. */ >>> + delta_t = MIN(delta_t, meter->max_delta_t); >>> >>> - /* Make sure delta_t will not be too large, so that bucket will not >>> - * wrap around below. */ >>> - delta_t = (long_delta_t > (long long int)meter->max_delta_t) >>> - ? meter->max_delta_t : (uint32_t)long_delta_t; >>> + for (int m = 0; m < meter->n_bands; m++) { >>> + band = &meter->bands[m]; >>> + /* Update band's bucket. We can't just use atomic add here, >>> + * because we should never add above the max capacity. */ >>> + atomic_sat_add(&band->bucket, delta_t * band->rate, >>> + band->burst_size * 1000ULL); >>> + } >>> + } >>> >>> /* Update meter stats. */ >>> - meter->used = now; >>> - meter->packet_count += cnt; >>> + atomic_add_relaxed(&meter->packet_count, cnt, &old); >>> bytes = 0; >>> DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >>> bytes += dp_packet_size(packet); >>> } >>> - meter->byte_count += bytes; >>> + atomic_add_relaxed(&meter->byte_count, bytes, &old); >>> >>> /* Meters can operate in terms of packets per second or kilobits per >>> * second. */ >>> if (meter->flags & OFPMF13_PKTPS) { >>> - /* Rate in packets/second, bucket 1/1000 packets. */ >>> - /* msec * packets/sec = 1/1000 packets. */ >>> + /* Rate in packets/second, bucket 1/1000 packets. >>> + * msec * packets/sec = 1/1000 packets. */ >>> volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */ >>> } else { >>> - /* Rate in kbps, bucket in bits. */ >>> - /* msec * kbps = bits */ >>> + /* Rate in kbps, bucket in bits. >>> + * msec * kbps = bits */ >>> volume = bytes * 8; >>> } >>> >>> - /* Update all bands and find the one hit with the highest rate for each >>> - * packet (if any). */ >>> - for (int m = 0; m < meter->n_bands; ++m) { >>> - uint64_t max_bucket_size; >>> - >>> + /* Find the band hit with the highest rate for each packet (if any). */ >>> + for (int m = 0; m < meter->n_bands; m++) { >>> band = &meter->bands[m]; >>> - max_bucket_size = band->burst_size * 1000ULL; >>> - /* Update band's bucket. */ >>> - band->bucket += (uint64_t) delta_t * band->rate; >>> - if (band->bucket > max_bucket_size) { >>> - band->bucket = max_bucket_size; >>> - } >>> >>> /* Drain the bucket for all the packets, if possible. */ >>> - if (band->bucket >= volume) { >>> - band->bucket -= volume; >>> - } else { >>> - int band_exceeded_pkt; >>> - >>> - /* Band limit hit, must process packet-by-packet. */ >>> - if (meter->flags & OFPMF13_PKTPS) { >>> - band_exceeded_pkt = band->bucket / 1000; >>> - band->bucket %= 1000; /* Remainder stays in bucket. */ >>> - >>> - /* Update the exceeding band for each exceeding packet. >>> - * (Only one band will be fired by a packet, and that >>> - * can be different for each packet.) */ >>> - for (int i = band_exceeded_pkt; i < cnt; i++) { >>> - if (band->rate > exceeded_rate[i]) { >>> - exceeded_rate[i] = band->rate; >>> - exceeded_band[i] = m; >>> - } >>> - } >>> - } else { >>> - /* Packet sizes differ, must process one-by-one. */ >>> - band_exceeded_pkt = cnt; >>> - DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >>> - uint32_t bits = dp_packet_size(packet) * 8; >>> - >>> - if (band->bucket >= bits) { >>> - band->bucket -= bits; >>> - } else { >>> - if (i < band_exceeded_pkt) { >>> - band_exceeded_pkt = i; >>> - } >>> - /* Update the exceeding band for the exceeding >>> packet. >>> - * (Only one band will be fired by a packet, and >>> that >>> - * can be different for each packet.) */ >>> - if (band->rate > exceeded_rate[i]) { >>> - exceeded_rate[i] = band->rate; >>> - exceeded_band[i] = m; >>> - } >>> - } >>> + if (atomic_bound_sub(&band->bucket, volume, 0)) { >>> + continue; >>> + } >>> + >>> + /* Band limit hit, must process packet-by-packet. */ >>> + DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >>> + uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS) >>> + ? 1000 : (dp_packet_size(packet) * 8); >>> + >>> + if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) { >>> + /* Update the exceeding band for the exceeding packet. >>> + * Only one band will be fired by a packet, and that can >>> + * be different for each packet. */ >>> + if (band->rate > exceeded_rate[i]) { >>> + exceeded_rate[i] = band->rate; >>> + exceeded_band[i] = m; >>> + exceeded = true; >>> } >>> } >>> - /* Remember the first exceeding packet. */ >>> - if (exceeded_pkt > band_exceeded_pkt) { >>> - exceeded_pkt = band_exceeded_pkt; >>> - } >>> } >>> } >>> >>> + /* No need to iterate over packets if there are no drops. */ >>> + if (!exceeded) { >>> + return; >>> + } >>> + >>> /* Fire the highest rate band exceeded by each packet, and drop >>> * packets if needed. */ >>> + >> >> Don’t think we need this newline. > > The comment above applies to the whole section below. Without an empty > line it looks like it's a comment for memsets, and it is not. > > I'd like to keep it, but can remove if you insist. :) It’s fine, it just triggered my OCD… >> >>> + memset(band_packets, 0, sizeof band_packets); >>> + memset(band_bytes, 0, sizeof band_bytes); >> >> Are these extra spaces for alignmgent on purpose? > > Yes, seemed easier to read this way. I’m fine with it, but it just stood out when reviewing. If I set the code style, I even like to do this for variable declarations (and sort by type); https://github.com/xdp-project/xdp-tools/blob/740c839806a02517da5bce7bd0ccaba908b3f675/xdp-dump/xdpdump.c#L963 >> >>> + >>> size_t j; >>> DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) { >>> - if (exceeded_band[j] >= 0) { >>> + uint32_t m = exceeded_band[j]; >>> + >>> + if (m != UINT32_MAX) { >>> /* Meter drop packet. */ >>> - band = &meter->bands[exceeded_band[j]]; >>> - band->packet_count += 1; >>> - band->byte_count += dp_packet_size(packet); >>> - COVERAGE_INC(datapath_drop_meter); >>> + band = &meter->bands[m]; >>> + band_packets[m]++; >>> + band_bytes[m] += dp_packet_size(packet); >> >> >> This code now looks like this (the diff is a mess to comment on): >> >> if (m != UINT32_MAX) { >> /* Meter drop packet. */ >> band = &meter->bands[m]; >> >> ! ^^^ This line can be removed as band is not used. > > True. Can remove. Thanks, now I see that even shows up with clang-analyze. I guess this is the only real change needed. So if you roll it in during commit time: Acked-by: Eelco Chaudron <echau...@redhat.com> >> >> band_packets[m]++; >> band_bytes[m] += dp_packet_size(packet); >> dp_packet_delete(packet); >> } else { >> /* Meter accepts packet. */ >> dp_packet_batch_refill(packets_, packet, j); >> } >> } >> >>> dp_packet_delete(packet); >>> } else { >>> /* Meter accepts packet. */ >>> @@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct >>> dp_packet_batch *packets_, >>> } >>> } >>> >>> - ovs_mutex_unlock(&meter->lock); >>> + for (int m = 0; m < meter->n_bands; m++) { >>> + if (!band_packets[m]) { >>> + continue; >>> + } >>> + band = &meter->bands[m]; >>> + atomic_add_relaxed(&band->packet_count, band_packets[m], &old); >>> + atomic_add_relaxed(&band->byte_count, band_bytes[m], &old); >> >> Are these extra spaces for alignment on purpose? > > Yep, same as with memset. This code a is a bit complex and alignment > makes it easier to read, IMO. Fine here, and yes code is complex enough for me to go over it multiple times ending up with a lot of questions that got all answered in the end ;). >> >>> + COVERAGE_ADD(datapath_drop_meter, band_packets[m]); >>> + } >>> } >>> >>> /* Meter set/get/del processing is still single-threaded. */ >>> @@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif, >>> ofproto_meter_id meter_id, >>> meter->flags = config->flags; >>> meter->n_bands = config->n_bands; >>> meter->max_delta_t = 0; >>> - meter->used = time_usec(); >>> meter->id = mid; >>> - ovs_mutex_init_adaptive(&meter->lock); >>> + atomic_init(&meter->used, time_msec()); >>> >>> /* set up bands */ >>> for (i = 0; i < config->n_bands; ++i) { >>> uint32_t band_max_delta_t; >>> + uint64_t bucket_size; >>> >>> /* Set burst size to a workable value if none specified. */ >>> if (config->bands[i].burst_size == 0) { >>> @@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif, >>> ofproto_meter_id meter_id, >>> meter->bands[i].rate = config->bands[i].rate; >>> meter->bands[i].burst_size = config->bands[i].burst_size; >>> /* Start with a full bucket. */ >>> - meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL; >>> + bucket_size = meter->bands[i].burst_size * 1000ULL; >>> + atomic_init(&meter->bands[i].bucket, bucket_size); >>> >>> /* Figure out max delta_t that is enough to fill any bucket. */ >>> - band_max_delta_t >>> - = meter->bands[i].bucket / meter->bands[i].rate; >>> + band_max_delta_t = bucket_size / meter->bands[i].rate; >>> if (band_max_delta_t > meter->max_delta_t) { >>> meter->max_delta_t = band_max_delta_t; >>> } >>> @@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif, >>> { >>> struct dp_netdev *dp = get_dp_netdev(dpif); >>> uint32_t meter_id = meter_id_.uint32; >>> - const struct dp_meter *meter; >>> + struct dp_meter *meter; >>> >>> if (meter_id >= MAX_METERS) { >>> return EFBIG; >>> @@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif, >>> if (stats) { >>> int i = 0; >>> >>> - ovs_mutex_lock(&meter->lock); >>> - >>> - stats->packet_in_count = meter->packet_count; >>> - stats->byte_in_count = meter->byte_count; >>> + atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count); >>> + atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count); >>> >>> for (i = 0; i < n_bands && i < meter->n_bands; ++i) { >>> - stats->bands[i].packet_count = meter->bands[i].packet_count; >>> - stats->bands[i].byte_count = meter->bands[i].byte_count; >>> + atomic_read_relaxed(&meter->bands[i].packet_count, >>> + &stats->bands[i].packet_count); >>> + atomic_read_relaxed(&meter->bands[i].byte_count, >>> + &stats->bands[i].byte_count); >>> } >>> - >>> - ovs_mutex_unlock(&meter->lock); >>> stats->n_bands = i; >>> } >>> >>> @@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >>> *packets_, >>> >>> case OVS_ACTION_ATTR_METER: >>> dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a), >>> - pmd->ctx.now); >>> + pmd->ctx.now / 1000); >>> break; >>> >>> case OVS_ACTION_ATTR_PUSH_VLAN: >>> -- >>> 2.40.1 >> _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev