On 22 Jun 2023, at 0:32, Ilya Maximets wrote:
> Current implementation of meters in the userspace datapath takes > the meter lock for every packet batch. If more than one thread > hits the flow with the same meter, they will lock each other. > > Replace the critical section with atomic operations to avoid > interlocking. Meters themselves are RCU-protected, so it's safe > to access them without holding a lock. > > Implementation does the following: > > 1. Tries to advance the 'used' timer of the meter with atomic > compare+exchange if it's smaller than 'now'. > 2. If the timer change succeeds, atomically update band buckets. > 3. Atomically update packet statistics for a meter. > 4. Go over buckets and try to atomically subtract the amount of > packets or bytes, recording the highest exceeded band. > 5. Atomically update band statistics and drop packets. > > Bucket manipulations are implemented with atomic compare+exchange > operations with extra checks, because bucket size should never > exceed the maximum and it should never go below zero. > > Packet statistics may be momentarily inconsistent, i.e., number > of packets and the number of bytes may reflect different sets > of packets. But it should be eventually consistent. And the > difference at any given time should be in just few packets. > > For the sake of reduced code complexity PKTPS meter tries to push > packets through the band one by one, even though they all have > the same weight. This is also more fair if more than one thread > is passing packets through the same band at the same time. > Trying to predict the number of packets that can pass may also > cause extra atomic operations reducing the performance. > > This implementation shows similar performance to the previous one, > but should scale better with more threads hiting the same meter. This works looks great!! Some small comments below. Did limited testing and seems to work fine. Cheers, Eelco > Signed-off-by: Ilya Maximets <i.maxim...@ovn.org> > --- > > @Lin Huang, if you can try this change on your setup, that > would be great. > > NEWS | 2 + > lib/dpif-netdev.c | 250 +++++++++++++++++++++++++--------------------- > 2 files changed, 140 insertions(+), 112 deletions(-) > > diff --git a/NEWS b/NEWS > index 66d5a4ea3..4a2b7dbca 100644 > --- a/NEWS > +++ b/NEWS > @@ -37,6 +37,8 @@ Post-v3.1.0 > - SRv6 Tunnel Protocol > * Added support for userspace datapath (only). > - Userspace datapath: > + * Implementation of OpenFlow meters is now lockless allowing for better > + multi-thread scalability. > * IP and L4 checksum offload support is now enabled by default for > interfaces that support it. See the 'status' column in the > 'interface' > table to check the status. > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c > index abe63412e..2fa556a62 100644 > --- a/lib/dpif-netdev.c > +++ b/lib/dpif-netdev.c > @@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct > dpcls_rule *); > struct dp_meter_band { > uint32_t rate; > uint32_t burst_size; > - uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for KBPS) > */ > - uint64_t packet_count; > - uint64_t byte_count; > + atomic_uint64_t bucket; /* In 1/1000 packets for PKTPS, > + * or in bits for KBPS. */ > + atomic_uint64_t packet_count; > + atomic_uint64_t byte_count; > }; > > struct dp_meter { > struct cmap_node node; > - struct ovs_mutex lock; > uint32_t id; > uint16_t flags; > uint16_t n_bands; > uint32_t max_delta_t; > - uint64_t used; > - uint64_t packet_count; > - uint64_t byte_count; > + atomic_uint64_t used; /* Time of a last use in milliseconds. */ > + atomic_uint64_t packet_count; > + atomic_uint64_t byte_count; > struct dp_meter_band bands[]; > }; > > @@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif * > dpif OVS_UNUSED, > features->max_color = 0; > } > > +/* Tries to atomically add 'n' to 'value' in terms of saturation arithmetic, > + * i.e., if the result will be larger than 'max_value', will store > 'max_value' > + * instead. */ > +static void > +atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value) > +{ > + uint64_t current, new_value; > + > + atomic_read_relaxed(value, ¤t); > + do { > + new_value = current + n; > + new_value = MIN(new_value, max_value); > + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, > + new_value)); > +} > + > +/* Tries to atomically subtract 'n' from 'value'. Does not perform the > + * operation and returns 'false' if the result will be less than 'min_value'. > + * Otherwise, stores the result and returns 'true'. */ > +static bool > +atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value) > +{ > + uint64_t current; > + > + atomic_read_relaxed(value, ¤t); > + do { > + if (current < min_value + n) { > + return false; > + } > + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, > + current - n)); > + return true; > +} > + > /* Applies the meter identified by 'meter_id' to 'packets_'. Packets > * that exceed a band are dropped in-place. */ > static void > dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_, > - uint32_t meter_id, long long int now) > + uint32_t meter_id, long long int now_ms) > { > - struct dp_meter *meter; > - struct dp_meter_band *band; > - struct dp_packet *packet; > - long long int long_delta_t; /* msec */ > - uint32_t delta_t; /* msec */ > const size_t cnt = dp_packet_batch_size(packets_); > - uint32_t bytes, volume; > - int exceeded_band[NETDEV_MAX_BURST]; > uint32_t exceeded_rate[NETDEV_MAX_BURST]; > - int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */ > + uint32_t exceeded_band[NETDEV_MAX_BURST]; > + uint64_t bytes, volume, meter_used, old; > + uint64_t band_packets[MAX_BANDS]; > + uint64_t band_bytes[MAX_BANDS]; > + struct dp_meter_band *band; > + struct dp_packet *packet; > + struct dp_meter *meter; > + bool exceeded = false; > > if (meter_id >= MAX_METERS) { > return; > @@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct > dp_packet_batch *packets_, > /* Initialize as zeroes. */ > memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate); > > - ovs_mutex_lock(&meter->lock); > - /* All packets will hit the meter at the same time. */ > - long_delta_t = now / 1000 - meter->used / 1000; /* msec */ > + atomic_read_relaxed(&meter->used, &meter_used); > + do { > + if (meter_used >= now_ms) { > + /* The '>' condition means that we have several threads hitting > the > + * same meter, and the other one already advanced the time. */ > + meter_used = now_ms; > + break; > + } > + } while (!atomic_compare_exchange_weak_relaxed(&meter->used, > + &meter_used, now_ms)); > > - if (long_delta_t < 0) { > - /* This condition means that we have several threads fighting for a > - meter lock, and the one who received the packets a bit later wins. > - Assuming that all racing threads received packets at the same time > - to avoid overflow. */ > - long_delta_t = 0; > - } > + /* Refill all buckets right away, since other threads may use them. */ > + if (meter_used < now_ms) { > + /* All packets will hit the meter at the same time. */ > + uint64_t delta_t = now_ms - meter_used; > + > + /* Make sure delta_t will not be too large, so that bucket will not > + * wrap around below. */ > + delta_t = MIN(delta_t, meter->max_delta_t); > > - /* Make sure delta_t will not be too large, so that bucket will not > - * wrap around below. */ > - delta_t = (long_delta_t > (long long int)meter->max_delta_t) > - ? meter->max_delta_t : (uint32_t)long_delta_t; > + for (int m = 0; m < meter->n_bands; m++) { > + band = &meter->bands[m]; > + /* Update band's bucket. We can't just use atomic add here, > + * because we should never add above the max capacity. */ > + atomic_sat_add(&band->bucket, delta_t * band->rate, > + band->burst_size * 1000ULL); > + } > + } > > /* Update meter stats. */ > - meter->used = now; > - meter->packet_count += cnt; > + atomic_add_relaxed(&meter->packet_count, cnt, &old); > bytes = 0; > DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { > bytes += dp_packet_size(packet); > } > - meter->byte_count += bytes; > + atomic_add_relaxed(&meter->byte_count, bytes, &old); > > /* Meters can operate in terms of packets per second or kilobits per > * second. */ > if (meter->flags & OFPMF13_PKTPS) { > - /* Rate in packets/second, bucket 1/1000 packets. */ > - /* msec * packets/sec = 1/1000 packets. */ > + /* Rate in packets/second, bucket 1/1000 packets. > + * msec * packets/sec = 1/1000 packets. */ > volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */ > } else { > - /* Rate in kbps, bucket in bits. */ > - /* msec * kbps = bits */ > + /* Rate in kbps, bucket in bits. > + * msec * kbps = bits */ > volume = bytes * 8; > } > > - /* Update all bands and find the one hit with the highest rate for each > - * packet (if any). */ > - for (int m = 0; m < meter->n_bands; ++m) { > - uint64_t max_bucket_size; > - > + /* Find the band hit with the highest rate for each packet (if any). */ > + for (int m = 0; m < meter->n_bands; m++) { > band = &meter->bands[m]; > - max_bucket_size = band->burst_size * 1000ULL; > - /* Update band's bucket. */ > - band->bucket += (uint64_t) delta_t * band->rate; > - if (band->bucket > max_bucket_size) { > - band->bucket = max_bucket_size; > - } > > /* Drain the bucket for all the packets, if possible. */ > - if (band->bucket >= volume) { > - band->bucket -= volume; > - } else { > - int band_exceeded_pkt; > - > - /* Band limit hit, must process packet-by-packet. */ > - if (meter->flags & OFPMF13_PKTPS) { > - band_exceeded_pkt = band->bucket / 1000; > - band->bucket %= 1000; /* Remainder stays in bucket. */ > - > - /* Update the exceeding band for each exceeding packet. > - * (Only one band will be fired by a packet, and that > - * can be different for each packet.) */ > - for (int i = band_exceeded_pkt; i < cnt; i++) { > - if (band->rate > exceeded_rate[i]) { > - exceeded_rate[i] = band->rate; > - exceeded_band[i] = m; > - } > - } > - } else { > - /* Packet sizes differ, must process one-by-one. */ > - band_exceeded_pkt = cnt; > - DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { > - uint32_t bits = dp_packet_size(packet) * 8; > - > - if (band->bucket >= bits) { > - band->bucket -= bits; > - } else { > - if (i < band_exceeded_pkt) { > - band_exceeded_pkt = i; > - } > - /* Update the exceeding band for the exceeding > packet. > - * (Only one band will be fired by a packet, and that > - * can be different for each packet.) */ > - if (band->rate > exceeded_rate[i]) { > - exceeded_rate[i] = band->rate; > - exceeded_band[i] = m; > - } > - } > + if (atomic_bound_sub(&band->bucket, volume, 0)) { > + continue; > + } > + > + /* Band limit hit, must process packet-by-packet. */ > + DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { > + uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS) > + ? 1000 : (dp_packet_size(packet) * 8); > + > + if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) { > + /* Update the exceeding band for the exceeding packet. > + * Only one band will be fired by a packet, and that can > + * be different for each packet. */ > + if (band->rate > exceeded_rate[i]) { > + exceeded_rate[i] = band->rate; > + exceeded_band[i] = m; > + exceeded = true; > } > } > - /* Remember the first exceeding packet. */ > - if (exceeded_pkt > band_exceeded_pkt) { > - exceeded_pkt = band_exceeded_pkt; > - } > } > } > > + /* No need to iterate over packets if there are no drops. */ > + if (!exceeded) { > + return; > + } > + > /* Fire the highest rate band exceeded by each packet, and drop > * packets if needed. */ > + Don’t think we need this newline. > + memset(band_packets, 0, sizeof band_packets); > + memset(band_bytes, 0, sizeof band_bytes); Are these extra spaces for alignment on purpose? > + > size_t j; > DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) { > - if (exceeded_band[j] >= 0) { > + uint32_t m = exceeded_band[j]; > + > + if (m != UINT32_MAX) { > /* Meter drop packet. */ > - band = &meter->bands[exceeded_band[j]]; > - band->packet_count += 1; > - band->byte_count += dp_packet_size(packet); > - COVERAGE_INC(datapath_drop_meter); > + band = &meter->bands[m]; > + band_packets[m]++; > + band_bytes[m] += dp_packet_size(packet); This code now looks like this (the diff is a mess to comment on): if (m != UINT32_MAX) { /* Meter drop packet. */ band = &meter->bands[m]; ! ^^^ This line can be removed as band is not used. band_packets[m]++; band_bytes[m] += dp_packet_size(packet); dp_packet_delete(packet); } else { /* Meter accepts packet. */ dp_packet_batch_refill(packets_, packet, j); } } > dp_packet_delete(packet); > } else { > /* Meter accepts packet. */ > @@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct > dp_packet_batch *packets_, > } > } > > - ovs_mutex_unlock(&meter->lock); > + for (int m = 0; m < meter->n_bands; m++) { > + if (!band_packets[m]) { > + continue; > + } > + band = &meter->bands[m]; > + atomic_add_relaxed(&band->packet_count, band_packets[m], &old); > + atomic_add_relaxed(&band->byte_count, band_bytes[m], &old); Are these extra spaces for alignment on purpose? > + COVERAGE_ADD(datapath_drop_meter, band_packets[m]); > + } > } > > /* Meter set/get/del processing is still single-threaded. */ > @@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif, > ofproto_meter_id meter_id, > meter->flags = config->flags; > meter->n_bands = config->n_bands; > meter->max_delta_t = 0; > - meter->used = time_usec(); > meter->id = mid; > - ovs_mutex_init_adaptive(&meter->lock); > + atomic_init(&meter->used, time_msec()); > > /* set up bands */ > for (i = 0; i < config->n_bands; ++i) { > uint32_t band_max_delta_t; > + uint64_t bucket_size; > > /* Set burst size to a workable value if none specified. */ > if (config->bands[i].burst_size == 0) { > @@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif, > ofproto_meter_id meter_id, > meter->bands[i].rate = config->bands[i].rate; > meter->bands[i].burst_size = config->bands[i].burst_size; > /* Start with a full bucket. */ > - meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL; > + bucket_size = meter->bands[i].burst_size * 1000ULL; > + atomic_init(&meter->bands[i].bucket, bucket_size); > > /* Figure out max delta_t that is enough to fill any bucket. */ > - band_max_delta_t > - = meter->bands[i].bucket / meter->bands[i].rate; > + band_max_delta_t = bucket_size / meter->bands[i].rate; > if (band_max_delta_t > meter->max_delta_t) { > meter->max_delta_t = band_max_delta_t; > } > @@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif, > { > struct dp_netdev *dp = get_dp_netdev(dpif); > uint32_t meter_id = meter_id_.uint32; > - const struct dp_meter *meter; > + struct dp_meter *meter; > > if (meter_id >= MAX_METERS) { > return EFBIG; > @@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif, > if (stats) { > int i = 0; > > - ovs_mutex_lock(&meter->lock); > - > - stats->packet_in_count = meter->packet_count; > - stats->byte_in_count = meter->byte_count; > + atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count); > + atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count); > > for (i = 0; i < n_bands && i < meter->n_bands; ++i) { > - stats->bands[i].packet_count = meter->bands[i].packet_count; > - stats->bands[i].byte_count = meter->bands[i].byte_count; > + atomic_read_relaxed(&meter->bands[i].packet_count, > + &stats->bands[i].packet_count); > + atomic_read_relaxed(&meter->bands[i].byte_count, > + &stats->bands[i].byte_count); > } > - > - ovs_mutex_unlock(&meter->lock); > stats->n_bands = i; > } > > @@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch > *packets_, > > case OVS_ACTION_ATTR_METER: > dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a), > - pmd->ctx.now); > + pmd->ctx.now / 1000); > break; > > case OVS_ACTION_ATTR_PUSH_VLAN: > -- > 2.40.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev