This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 549b5d3 Fix WeakHashMap implementation in SharedRateLimiterFactory (#1965) 549b5d3 is described below commit 549b5d3f7481de6064b4dc77481e718529440f6b Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Tue Mar 16 11:35:53 2021 -0400 Fix WeakHashMap implementation in SharedRateLimiterFactory (#1965) * Fix WeakHashMap implementation * Rename `update`/`report` methods to `updateAll`/`reportAll` and reference using method reference, and make them private, since they don't need to be any more visible than that * Reuse the implementation for operating on a copy of the `activeLimiters` for both `updateAll` and `reportAll` * Convert syncronized long to AtomicLong * Make SharedRateLimiter immune to system clock changes Co-authored-by: Christopher Tubbs <ctubb...@apache.org> --- .../util/ratelimit/SharedRateLimiterFactory.java | 99 +++++++++++----------- 1 file changed, 50 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java index c24a3d2..c5c6890 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java @@ -18,10 +18,14 @@ */ package org.apache.accumulo.core.util.ratelimit; +import java.lang.ref.WeakReference; +import java.util.HashMap; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -38,7 +42,8 @@ public class SharedRateLimiterFactory { private static final long UPDATE_RATE = 1000; private static SharedRateLimiterFactory instance = null; private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class); - private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>(); + private final WeakHashMap<String,WeakReference<SharedRateLimiter>> activeLimiters = + new WeakHashMap<>(); private SharedRateLimiterFactory() {} @@ -48,15 +53,13 @@ public class SharedRateLimiterFactory { instance = new SharedRateLimiterFactory(); ScheduledThreadPoolExecutor svc = ThreadPools.createGeneralScheduledExecutorService(conf); - svc.scheduleWithFixedDelay( - Threads.createNamedRunnable("SharedRateLimiterFactory update polling", () -> { - instance.update(); - }), UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS); + svc.scheduleWithFixedDelay(Threads + .createNamedRunnable("SharedRateLimiterFactory update polling", instance::updateAll), + UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS); - svc.scheduleWithFixedDelay( - Threads.createNamedRunnable("SharedRateLimiterFactory report polling", () -> { - instance.report(); - }), REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS); + svc.scheduleWithFixedDelay(Threads + .createNamedRunnable("SharedRateLimiterFactory report polling", instance::reportAll), + REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS); } return instance; @@ -86,57 +89,55 @@ public class SharedRateLimiterFactory { */ public RateLimiter create(String name, RateProvider rateProvider) { synchronized (activeLimiters) { - if (activeLimiters.containsKey(name)) { - return activeLimiters.get(name); - } else { - long initialRate; - initialRate = rateProvider.getDesiredRate(); - SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate); - activeLimiters.put(name, limiter); - return limiter; + var limiterRef = activeLimiters.get(name); + var limiter = limiterRef == null ? null : limiterRef.get(); + if (limiter == null) { + limiter = new SharedRateLimiter(name, rateProvider, rateProvider.getDesiredRate()); + activeLimiters.put(name, new WeakReference<>(limiter)); } + return limiter; } } - /** - * Walk through all of the currently active RateLimiters, having each update its current rate. - * This is called periodically so that we can dynamically update as configuration changes. - */ - protected void update() { - Map<String,SharedRateLimiter> limitersCopy; + private void copyAndThen(String actionName, Consumer<SharedRateLimiter> action) { + Map<String,SharedRateLimiter> limitersCopy = new HashMap<>(); + // synchronize only for copy synchronized (activeLimiters) { - limitersCopy = Map.copyOf(activeLimiters); + activeLimiters.forEach((name, limiterRef) -> { + var limiter = limiterRef.get(); + if (limiter != null) { + limitersCopy.put(name, limiter); + } + }); } - for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) { + limitersCopy.forEach((name, limiter) -> { try { - entry.getValue().update(); - } catch (Exception ex) { - log.error(String.format("Failed to update limiter %s", entry.getKey()), ex); + action.accept(limiter); + } catch (RuntimeException e) { + log.error("Failed to {} limiter {}", actionName, name, e); } - } + }); + } + + /** + * Walk through all of the currently active RateLimiters, having each update its current rate. + * This is called periodically so that we can dynamically update as configuration changes. + */ + private void updateAll() { + copyAndThen("update", SharedRateLimiter::update); } /** * Walk through all of the currently active RateLimiters, having each report its activity to the * debug log. */ - protected void report() { - Map<String,SharedRateLimiter> limitersCopy; - synchronized (activeLimiters) { - limitersCopy = Map.copyOf(activeLimiters); - } - for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) { - try { - entry.getValue().report(); - } catch (Exception ex) { - log.error(String.format("Failed to report limiter %s", entry.getKey()), ex); - } - } + private void reportAll() { + copyAndThen("report", SharedRateLimiter::report); } protected class SharedRateLimiter extends GuavaRateLimiter { - private volatile long permitsAcquired = 0; - private volatile long lastUpdate; + private AtomicLong permitsAcquired = new AtomicLong(); + private AtomicLong lastUpdate = new AtomicLong(); private final RateProvider rateProvider; private final String name; @@ -145,13 +146,13 @@ public class SharedRateLimiterFactory { super(initialRate); this.name = name; this.rateProvider = rateProvider; - this.lastUpdate = System.currentTimeMillis(); + this.lastUpdate.set(System.nanoTime()); } @Override public void acquire(long permits) { super.acquire(permits); - permitsAcquired += permits; + permitsAcquired.addAndGet(permits); } /** Poll the callback, updating the current rate if necessary. */ @@ -166,14 +167,14 @@ public class SharedRateLimiterFactory { /** Report the current throughput and usage of this rate limiter to the debug log. */ public void report() { if (log.isDebugEnabled()) { - long duration = System.currentTimeMillis() - lastUpdate; + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - lastUpdate.get()); if (duration == 0) { return; } - lastUpdate = System.currentTimeMillis(); + lastUpdate.set(System.nanoTime()); - long sum = permitsAcquired; - permitsAcquired = 0; + long sum = permitsAcquired.get(); + permitsAcquired.set(0); if (sum > 0) { log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name,