This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 549b5d3  Fix WeakHashMap implementation in SharedRateLimiterFactory 
(#1965)
549b5d3 is described below

commit 549b5d3f7481de6064b4dc77481e718529440f6b
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Tue Mar 16 11:35:53 2021 -0400

    Fix WeakHashMap implementation in SharedRateLimiterFactory (#1965)
    
    * Fix WeakHashMap implementation
    * Rename `update`/`report` methods to `updateAll`/`reportAll` and
      reference using method reference, and make them private, since they
      don't need to be any more visible than that
    * Reuse the implementation for operating on a copy of the
      `activeLimiters` for both `updateAll` and `reportAll`
    * Convert syncronized long to AtomicLong
    * Make SharedRateLimiter immune to system clock changes
    
    Co-authored-by: Christopher Tubbs <ctubb...@apache.org>
---
 .../util/ratelimit/SharedRateLimiterFactory.java   | 99 +++++++++++-----------
 1 file changed, 50 insertions(+), 49 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
index c24a3d2..c5c6890 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -18,10 +18,14 @@
  */
 package org.apache.accumulo.core.util.ratelimit;
 
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.util.threads.ThreadPools;
@@ -38,7 +42,8 @@ public class SharedRateLimiterFactory {
   private static final long UPDATE_RATE = 1000;
   private static SharedRateLimiterFactory instance = null;
   private final Logger log = 
LoggerFactory.getLogger(SharedRateLimiterFactory.class);
-  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new 
WeakHashMap<>();
+  private final WeakHashMap<String,WeakReference<SharedRateLimiter>> 
activeLimiters =
+      new WeakHashMap<>();
 
   private SharedRateLimiterFactory() {}
 
@@ -48,15 +53,13 @@ public class SharedRateLimiterFactory {
       instance = new SharedRateLimiterFactory();
 
       ScheduledThreadPoolExecutor svc = 
ThreadPools.createGeneralScheduledExecutorService(conf);
-      svc.scheduleWithFixedDelay(
-          Threads.createNamedRunnable("SharedRateLimiterFactory update 
polling", () -> {
-            instance.update();
-          }), UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
+      svc.scheduleWithFixedDelay(Threads
+          .createNamedRunnable("SharedRateLimiterFactory update polling", 
instance::updateAll),
+          UPDATE_RATE, UPDATE_RATE, TimeUnit.MILLISECONDS);
 
-      svc.scheduleWithFixedDelay(
-          Threads.createNamedRunnable("SharedRateLimiterFactory report 
polling", () -> {
-            instance.report();
-          }), REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
+      svc.scheduleWithFixedDelay(Threads
+          .createNamedRunnable("SharedRateLimiterFactory report polling", 
instance::reportAll),
+          REPORT_RATE, REPORT_RATE, TimeUnit.MILLISECONDS);
 
     }
     return instance;
@@ -86,57 +89,55 @@ public class SharedRateLimiterFactory {
    */
   public RateLimiter create(String name, RateProvider rateProvider) {
     synchronized (activeLimiters) {
-      if (activeLimiters.containsKey(name)) {
-        return activeLimiters.get(name);
-      } else {
-        long initialRate;
-        initialRate = rateProvider.getDesiredRate();
-        SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, 
initialRate);
-        activeLimiters.put(name, limiter);
-        return limiter;
+      var limiterRef = activeLimiters.get(name);
+      var limiter = limiterRef == null ? null : limiterRef.get();
+      if (limiter == null) {
+        limiter = new SharedRateLimiter(name, rateProvider, 
rateProvider.getDesiredRate());
+        activeLimiters.put(name, new WeakReference<>(limiter));
       }
+      return limiter;
     }
   }
 
-  /**
-   * Walk through all of the currently active RateLimiters, having each update 
its current rate.
-   * This is called periodically so that we can dynamically update as 
configuration changes.
-   */
-  protected void update() {
-    Map<String,SharedRateLimiter> limitersCopy;
+  private void copyAndThen(String actionName, Consumer<SharedRateLimiter> 
action) {
+    Map<String,SharedRateLimiter> limitersCopy = new HashMap<>();
+    // synchronize only for copy
     synchronized (activeLimiters) {
-      limitersCopy = Map.copyOf(activeLimiters);
+      activeLimiters.forEach((name, limiterRef) -> {
+        var limiter = limiterRef.get();
+        if (limiter != null) {
+          limitersCopy.put(name, limiter);
+        }
+      });
     }
-    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+    limitersCopy.forEach((name, limiter) -> {
       try {
-        entry.getValue().update();
-      } catch (Exception ex) {
-        log.error(String.format("Failed to update limiter %s", 
entry.getKey()), ex);
+        action.accept(limiter);
+      } catch (RuntimeException e) {
+        log.error("Failed to {} limiter {}", actionName, name, e);
       }
-    }
+    });
+  }
+
+  /**
+   * Walk through all of the currently active RateLimiters, having each update 
its current rate.
+   * This is called periodically so that we can dynamically update as 
configuration changes.
+   */
+  private void updateAll() {
+    copyAndThen("update", SharedRateLimiter::update);
   }
 
   /**
    * Walk through all of the currently active RateLimiters, having each report 
its activity to the
    * debug log.
    */
-  protected void report() {
-    Map<String,SharedRateLimiter> limitersCopy;
-    synchronized (activeLimiters) {
-      limitersCopy = Map.copyOf(activeLimiters);
-    }
-    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
-      try {
-        entry.getValue().report();
-      } catch (Exception ex) {
-        log.error(String.format("Failed to report limiter %s", 
entry.getKey()), ex);
-      }
-    }
+  private void reportAll() {
+    copyAndThen("report", SharedRateLimiter::report);
   }
 
   protected class SharedRateLimiter extends GuavaRateLimiter {
-    private volatile long permitsAcquired = 0;
-    private volatile long lastUpdate;
+    private AtomicLong permitsAcquired = new AtomicLong();
+    private AtomicLong lastUpdate = new AtomicLong();
 
     private final RateProvider rateProvider;
     private final String name;
@@ -145,13 +146,13 @@ public class SharedRateLimiterFactory {
       super(initialRate);
       this.name = name;
       this.rateProvider = rateProvider;
-      this.lastUpdate = System.currentTimeMillis();
+      this.lastUpdate.set(System.nanoTime());
     }
 
     @Override
     public void acquire(long permits) {
       super.acquire(permits);
-      permitsAcquired += permits;
+      permitsAcquired.addAndGet(permits);
     }
 
     /** Poll the callback, updating the current rate if necessary. */
@@ -166,14 +167,14 @@ public class SharedRateLimiterFactory {
     /** Report the current throughput and usage of this rate limiter to the 
debug log. */
     public void report() {
       if (log.isDebugEnabled()) {
-        long duration = System.currentTimeMillis() - lastUpdate;
+        long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastUpdate.get());
         if (duration == 0) {
           return;
         }
-        lastUpdate = System.currentTimeMillis();
+        lastUpdate.set(System.nanoTime());
 
-        long sum = permitsAcquired;
-        permitsAcquired = 0;
+        long sum = permitsAcquired.get();
+        permitsAcquired.set(0);
 
         if (sum > 0) {
           log.debug(String.format("RateLimiter '%s': %,d of %,d 
permits/second", name,

Reply via email to