On 6/29/23 17:43, Eelco Chaudron wrote: > > > On 22 Jun 2023, at 0:32, Ilya Maximets wrote: > >> Current implementation of meters in the userspace datapath takes >> the meter lock for every packet batch. If more than one thread >> hits the flow with the same meter, they will lock each other. >> >> Replace the critical section with atomic operations to avoid >> interlocking. Meters themselves are RCU-protected, so it's safe >> to access them without holding a lock. >> >> Implementation does the following: >> >> 1. Tries to advance the 'used' timer of the meter with atomic >> compare+exchange if it's smaller than 'now'. >> 2. If the timer change succeeds, atomically update band buckets. >> 3. Atomically update packet statistics for a meter. >> 4. Go over buckets and try to atomically subtract the amount of >> packets or bytes, recording the highest exceeded band. >> 5. Atomically update band statistics and drop packets. >> >> Bucket manipulations are implemented with atomic compare+exchange >> operations with extra checks, because bucket size should never >> exceed the maximum and it should never go below zero. >> >> Packet statistics may be momentarily inconsistent, i.e., number >> of packets and the number of bytes may reflect different sets >> of packets. But it should be eventually consistent. And the >> difference at any given time should be in just few packets. >> >> For the sake of reduced code complexity PKTPS meter tries to push >> packets through the band one by one, even though they all have >> the same weight. This is also more fair if more than one thread >> is passing packets through the same band at the same time. >> Trying to predict the number of packets that can pass may also >> cause extra atomic operations reducing the performance. >> >> This implementation shows similar performance to the previous one, >> but should scale better with more threads hiting the same meter. > > This works looks great!! Some small comments below. Did limited testing and > seems to work fine. > > Cheers, > > Eelco > >> Signed-off-by: Ilya Maximets <i.maxim...@ovn.org> >> --- >> >> @Lin Huang, if you can try this change on your setup, that >> would be great. >> >> NEWS | 2 + >> lib/dpif-netdev.c | 250 +++++++++++++++++++++++++--------------------- >> 2 files changed, 140 insertions(+), 112 deletions(-) >> >> diff --git a/NEWS b/NEWS >> index 66d5a4ea3..4a2b7dbca 100644 >> --- a/NEWS >> +++ b/NEWS >> @@ -37,6 +37,8 @@ Post-v3.1.0 >> - SRv6 Tunnel Protocol >> * Added support for userspace datapath (only). >> - Userspace datapath: >> + * Implementation of OpenFlow meters is now lockless allowing for better >> + multi-thread scalability. >> * IP and L4 checksum offload support is now enabled by default for >> interfaces that support it. See the 'status' column in the >> 'interface' >> table to check the status. >> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c >> index abe63412e..2fa556a62 100644 >> --- a/lib/dpif-netdev.c >> +++ b/lib/dpif-netdev.c >> @@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct >> dpcls_rule *); >> struct dp_meter_band { >> uint32_t rate; >> uint32_t burst_size; >> - uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for >> KBPS) */ >> - uint64_t packet_count; >> - uint64_t byte_count; >> + atomic_uint64_t bucket; /* In 1/1000 packets for PKTPS, >> + * or in bits for KBPS. */ >> + atomic_uint64_t packet_count; >> + atomic_uint64_t byte_count; >> }; >> >> struct dp_meter { >> struct cmap_node node; >> - struct ovs_mutex lock; >> uint32_t id; >> uint16_t flags; >> uint16_t n_bands; >> uint32_t max_delta_t; >> - uint64_t used; >> - uint64_t packet_count; >> - uint64_t byte_count; >> + atomic_uint64_t used; /* Time of a last use in milliseconds. */ >> + atomic_uint64_t packet_count; >> + atomic_uint64_t byte_count; >> struct dp_meter_band bands[]; >> }; >> >> @@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif * >> dpif OVS_UNUSED, >> features->max_color = 0; >> } >> >> +/* Tries to atomically add 'n' to 'value' in terms of saturation arithmetic, >> + * i.e., if the result will be larger than 'max_value', will store >> 'max_value' >> + * instead. */ >> +static void >> +atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value) >> +{ >> + uint64_t current, new_value; >> + >> + atomic_read_relaxed(value, ¤t); >> + do { >> + new_value = current + n; >> + new_value = MIN(new_value, max_value); >> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, >> + new_value)); >> +} >> + >> +/* Tries to atomically subtract 'n' from 'value'. Does not perform the >> + * operation and returns 'false' if the result will be less than >> 'min_value'. >> + * Otherwise, stores the result and returns 'true'. */ >> +static bool >> +atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value) >> +{ >> + uint64_t current; >> + >> + atomic_read_relaxed(value, ¤t); >> + do { >> + if (current < min_value + n) { >> + return false; >> + } >> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t, >> + current - n)); >> + return true; >> +} >> + >> /* Applies the meter identified by 'meter_id' to 'packets_'. Packets >> * that exceed a band are dropped in-place. */ >> static void >> dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_, >> - uint32_t meter_id, long long int now) >> + uint32_t meter_id, long long int now_ms) >> { >> - struct dp_meter *meter; >> - struct dp_meter_band *band; >> - struct dp_packet *packet; >> - long long int long_delta_t; /* msec */ >> - uint32_t delta_t; /* msec */ >> const size_t cnt = dp_packet_batch_size(packets_); >> - uint32_t bytes, volume; >> - int exceeded_band[NETDEV_MAX_BURST]; >> uint32_t exceeded_rate[NETDEV_MAX_BURST]; >> - int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */ >> + uint32_t exceeded_band[NETDEV_MAX_BURST]; >> + uint64_t bytes, volume, meter_used, old; >> + uint64_t band_packets[MAX_BANDS]; >> + uint64_t band_bytes[MAX_BANDS]; >> + struct dp_meter_band *band; >> + struct dp_packet *packet; >> + struct dp_meter *meter; >> + bool exceeded = false; >> >> if (meter_id >= MAX_METERS) { >> return; >> @@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct >> dp_packet_batch *packets_, >> /* Initialize as zeroes. */ >> memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate); >> >> - ovs_mutex_lock(&meter->lock); >> - /* All packets will hit the meter at the same time. */ >> - long_delta_t = now / 1000 - meter->used / 1000; /* msec */ >> + atomic_read_relaxed(&meter->used, &meter_used); >> + do { >> + if (meter_used >= now_ms) { >> + /* The '>' condition means that we have several threads hitting >> the >> + * same meter, and the other one already advanced the time. */ >> + meter_used = now_ms; >> + break; >> + } >> + } while (!atomic_compare_exchange_weak_relaxed(&meter->used, >> + &meter_used, now_ms)); >> >> - if (long_delta_t < 0) { >> - /* This condition means that we have several threads fighting for a >> - meter lock, and the one who received the packets a bit later >> wins. >> - Assuming that all racing threads received packets at the same >> time >> - to avoid overflow. */ >> - long_delta_t = 0; >> - } >> + /* Refill all buckets right away, since other threads may use them. */ >> + if (meter_used < now_ms) { >> + /* All packets will hit the meter at the same time. */ >> + uint64_t delta_t = now_ms - meter_used; >> + >> + /* Make sure delta_t will not be too large, so that bucket will not >> + * wrap around below. */ >> + delta_t = MIN(delta_t, meter->max_delta_t); >> >> - /* Make sure delta_t will not be too large, so that bucket will not >> - * wrap around below. */ >> - delta_t = (long_delta_t > (long long int)meter->max_delta_t) >> - ? meter->max_delta_t : (uint32_t)long_delta_t; >> + for (int m = 0; m < meter->n_bands; m++) { >> + band = &meter->bands[m]; >> + /* Update band's bucket. We can't just use atomic add here, >> + * because we should never add above the max capacity. */ >> + atomic_sat_add(&band->bucket, delta_t * band->rate, >> + band->burst_size * 1000ULL); >> + } >> + } >> >> /* Update meter stats. */ >> - meter->used = now; >> - meter->packet_count += cnt; >> + atomic_add_relaxed(&meter->packet_count, cnt, &old); >> bytes = 0; >> DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >> bytes += dp_packet_size(packet); >> } >> - meter->byte_count += bytes; >> + atomic_add_relaxed(&meter->byte_count, bytes, &old); >> >> /* Meters can operate in terms of packets per second or kilobits per >> * second. */ >> if (meter->flags & OFPMF13_PKTPS) { >> - /* Rate in packets/second, bucket 1/1000 packets. */ >> - /* msec * packets/sec = 1/1000 packets. */ >> + /* Rate in packets/second, bucket 1/1000 packets. >> + * msec * packets/sec = 1/1000 packets. */ >> volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */ >> } else { >> - /* Rate in kbps, bucket in bits. */ >> - /* msec * kbps = bits */ >> + /* Rate in kbps, bucket in bits. >> + * msec * kbps = bits */ >> volume = bytes * 8; >> } >> >> - /* Update all bands and find the one hit with the highest rate for each >> - * packet (if any). */ >> - for (int m = 0; m < meter->n_bands; ++m) { >> - uint64_t max_bucket_size; >> - >> + /* Find the band hit with the highest rate for each packet (if any). */ >> + for (int m = 0; m < meter->n_bands; m++) { >> band = &meter->bands[m]; >> - max_bucket_size = band->burst_size * 1000ULL; >> - /* Update band's bucket. */ >> - band->bucket += (uint64_t) delta_t * band->rate; >> - if (band->bucket > max_bucket_size) { >> - band->bucket = max_bucket_size; >> - } >> >> /* Drain the bucket for all the packets, if possible. */ >> - if (band->bucket >= volume) { >> - band->bucket -= volume; >> - } else { >> - int band_exceeded_pkt; >> - >> - /* Band limit hit, must process packet-by-packet. */ >> - if (meter->flags & OFPMF13_PKTPS) { >> - band_exceeded_pkt = band->bucket / 1000; >> - band->bucket %= 1000; /* Remainder stays in bucket. */ >> - >> - /* Update the exceeding band for each exceeding packet. >> - * (Only one band will be fired by a packet, and that >> - * can be different for each packet.) */ >> - for (int i = band_exceeded_pkt; i < cnt; i++) { >> - if (band->rate > exceeded_rate[i]) { >> - exceeded_rate[i] = band->rate; >> - exceeded_band[i] = m; >> - } >> - } >> - } else { >> - /* Packet sizes differ, must process one-by-one. */ >> - band_exceeded_pkt = cnt; >> - DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >> - uint32_t bits = dp_packet_size(packet) * 8; >> - >> - if (band->bucket >= bits) { >> - band->bucket -= bits; >> - } else { >> - if (i < band_exceeded_pkt) { >> - band_exceeded_pkt = i; >> - } >> - /* Update the exceeding band for the exceeding >> packet. >> - * (Only one band will be fired by a packet, and >> that >> - * can be different for each packet.) */ >> - if (band->rate > exceeded_rate[i]) { >> - exceeded_rate[i] = band->rate; >> - exceeded_band[i] = m; >> - } >> - } >> + if (atomic_bound_sub(&band->bucket, volume, 0)) { >> + continue; >> + } >> + >> + /* Band limit hit, must process packet-by-packet. */ >> + DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) { >> + uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS) >> + ? 1000 : (dp_packet_size(packet) * 8); >> + >> + if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) { >> + /* Update the exceeding band for the exceeding packet. >> + * Only one band will be fired by a packet, and that can >> + * be different for each packet. */ >> + if (band->rate > exceeded_rate[i]) { >> + exceeded_rate[i] = band->rate; >> + exceeded_band[i] = m; >> + exceeded = true; >> } >> } >> - /* Remember the first exceeding packet. */ >> - if (exceeded_pkt > band_exceeded_pkt) { >> - exceeded_pkt = band_exceeded_pkt; >> - } >> } >> } >> >> + /* No need to iterate over packets if there are no drops. */ >> + if (!exceeded) { >> + return; >> + } >> + >> /* Fire the highest rate band exceeded by each packet, and drop >> * packets if needed. */ >> + > > Don’t think we need this newline.
The comment above applies to the whole section below. Without an empty line it looks like it's a comment for memsets, and it is not. I'd like to keep it, but can remove if you insist. :) > >> + memset(band_packets, 0, sizeof band_packets); >> + memset(band_bytes, 0, sizeof band_bytes); > > Are these extra spaces for alignment on purpose? Yes, seemed easier to read this way. > >> + >> size_t j; >> DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) { >> - if (exceeded_band[j] >= 0) { >> + uint32_t m = exceeded_band[j]; >> + >> + if (m != UINT32_MAX) { >> /* Meter drop packet. */ >> - band = &meter->bands[exceeded_band[j]]; >> - band->packet_count += 1; >> - band->byte_count += dp_packet_size(packet); >> - COVERAGE_INC(datapath_drop_meter); >> + band = &meter->bands[m]; >> + band_packets[m]++; >> + band_bytes[m] += dp_packet_size(packet); > > > This code now looks like this (the diff is a mess to comment on): > > if (m != UINT32_MAX) { > /* Meter drop packet. */ > band = &meter->bands[m]; > > ! ^^^ This line can be removed as band is not used. True. Can remove. > > band_packets[m]++; > band_bytes[m] += dp_packet_size(packet); > dp_packet_delete(packet); > } else { > /* Meter accepts packet. */ > dp_packet_batch_refill(packets_, packet, j); > } > } > >> dp_packet_delete(packet); >> } else { >> /* Meter accepts packet. */ >> @@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct >> dp_packet_batch *packets_, >> } >> } >> >> - ovs_mutex_unlock(&meter->lock); >> + for (int m = 0; m < meter->n_bands; m++) { >> + if (!band_packets[m]) { >> + continue; >> + } >> + band = &meter->bands[m]; >> + atomic_add_relaxed(&band->packet_count, band_packets[m], &old); >> + atomic_add_relaxed(&band->byte_count, band_bytes[m], &old); > > Are these extra spaces for alignment on purpose? Yep, same as with memset. This code a is a bit complex and alignment makes it easier to read, IMO. > >> + COVERAGE_ADD(datapath_drop_meter, band_packets[m]); >> + } >> } >> >> /* Meter set/get/del processing is still single-threaded. */ >> @@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif, >> ofproto_meter_id meter_id, >> meter->flags = config->flags; >> meter->n_bands = config->n_bands; >> meter->max_delta_t = 0; >> - meter->used = time_usec(); >> meter->id = mid; >> - ovs_mutex_init_adaptive(&meter->lock); >> + atomic_init(&meter->used, time_msec()); >> >> /* set up bands */ >> for (i = 0; i < config->n_bands; ++i) { >> uint32_t band_max_delta_t; >> + uint64_t bucket_size; >> >> /* Set burst size to a workable value if none specified. */ >> if (config->bands[i].burst_size == 0) { >> @@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif, >> ofproto_meter_id meter_id, >> meter->bands[i].rate = config->bands[i].rate; >> meter->bands[i].burst_size = config->bands[i].burst_size; >> /* Start with a full bucket. */ >> - meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL; >> + bucket_size = meter->bands[i].burst_size * 1000ULL; >> + atomic_init(&meter->bands[i].bucket, bucket_size); >> >> /* Figure out max delta_t that is enough to fill any bucket. */ >> - band_max_delta_t >> - = meter->bands[i].bucket / meter->bands[i].rate; >> + band_max_delta_t = bucket_size / meter->bands[i].rate; >> if (band_max_delta_t > meter->max_delta_t) { >> meter->max_delta_t = band_max_delta_t; >> } >> @@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif, >> { >> struct dp_netdev *dp = get_dp_netdev(dpif); >> uint32_t meter_id = meter_id_.uint32; >> - const struct dp_meter *meter; >> + struct dp_meter *meter; >> >> if (meter_id >= MAX_METERS) { >> return EFBIG; >> @@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif, >> if (stats) { >> int i = 0; >> >> - ovs_mutex_lock(&meter->lock); >> - >> - stats->packet_in_count = meter->packet_count; >> - stats->byte_in_count = meter->byte_count; >> + atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count); >> + atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count); >> >> for (i = 0; i < n_bands && i < meter->n_bands; ++i) { >> - stats->bands[i].packet_count = meter->bands[i].packet_count; >> - stats->bands[i].byte_count = meter->bands[i].byte_count; >> + atomic_read_relaxed(&meter->bands[i].packet_count, >> + &stats->bands[i].packet_count); >> + atomic_read_relaxed(&meter->bands[i].byte_count, >> + &stats->bands[i].byte_count); >> } >> - >> - ovs_mutex_unlock(&meter->lock); >> stats->n_bands = i; >> } >> >> @@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch >> *packets_, >> >> case OVS_ACTION_ATTR_METER: >> dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a), >> - pmd->ctx.now); >> + pmd->ctx.now / 1000); >> break; >> >> case OVS_ACTION_ATTR_PUSH_VLAN: >> -- >> 2.40.1 > _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev