This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cbe124841f [improve][broker] Optimize AsyncTokenBucket overflow 
solution further to reduce fallback to BigInteger (#25269)
4cbe124841f is described below

commit 4cbe124841fdc92ab671b1e438a86522a70bd622
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 26 21:14:48 2026 +0200

    [improve][broker] Optimize AsyncTokenBucket overflow solution further to 
reduce fallback to BigInteger (#25269)
---
 .../apache/pulsar/broker/qos/AsyncTokenBucket.java | 130 ++++++++++++++-------
 .../pulsar/broker/qos/AsyncTokenBucketTest.java    |   2 +
 2 files changed, 93 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index f7fc0031ccd..37b7d090522 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -103,6 +103,11 @@ public abstract class AsyncTokenBucket {
      * which has a complex solution to prevent the CAS loop content problem.
      */
     private final LongAdder pendingConsumedTokens = new LongAdder();
+    /**
+     * Cached pre-reduced rate parameters. Invalidated whenever {@link 
#getRate()} or
+     * {@link #getRatePeriodNanos()} returns a different value (relevant for 
dynamic-rate buckets).
+     */
+    private volatile RateParameters rateParameters;
 
     protected AsyncTokenBucket(MonotonicClock clockSource, long 
addTokensResolutionNanos) {
         this.clockSource = clockSource;
@@ -122,6 +127,18 @@ public abstract class AsyncTokenBucket {
 
     protected abstract long getTargetAmountOfTokensAfterThrottling();
 
+    private RateParameters resolveRateParameters() {
+        long rate = getRate();
+        long ratePeriodNanos = getRatePeriodNanos();
+        RateParameters current = rateParameters;
+        if (current != null && current.rate == rate && current.ratePeriodNanos 
== ratePeriodNanos) {
+            return current;
+        }
+        RateParameters updated = new RateParameters(rate, ratePeriodNanos);
+        rateParameters = updated;
+        return updated;
+    }
+
     /**
      * Consumes tokens and possibly updates the token balance. New tokens are 
calculated if the last new token
      * calculation occurred more than addTokensResolutionNanos nanoseconds 
ago. When new tokens are added, the
@@ -203,13 +220,12 @@ public abstract class AsyncTokenBucket {
             newTokens = 0;
         } else {
             long durationNanos = currentNanos - previousLastNanos + 
REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
-            long currentRate = getRate();
-            long currentRatePeriodNanos = getRatePeriodNanos();
+            RateParameters rp = resolveRateParameters();
             // new tokens is the amount of tokens that are created in the 
duration since the last update
             // with the configured rate
-            newTokens = safeMulDivFloor(durationNanos, currentRate, 
currentRatePeriodNanos);
+            newTokens = rp.calculateTokens(durationNanos);
             // carry forward the remainder nanos so that the rounding error is 
eliminated
-            long consumedNanos = safeMulDivFloor(newTokens, 
currentRatePeriodNanos, currentRate);
+            long consumedNanos = rp.calculateDuration(newTokens);
             long remainderNanos = durationNanos >= consumedNanos ? 
durationNanos - consumedNanos : 0;
             if (remainderNanos > 0) {
                 REMAINDER_NANOS_UPDATER.addAndGet(this, remainderNanos);
@@ -277,41 +293,8 @@ public abstract class AsyncTokenBucket {
         } catch (ArithmeticException e) {
             needTokens = Long.MAX_VALUE;
         }
-        return safeMulDivFloor(needTokens, getRatePeriodNanos(), getRate());
-    }
-
-    private static long safeMulDivFloor(long multiplicand, long multiplier, 
long divisor) {
-        if (multiplicand < 0 || multiplier < 0) {
-            throw new IllegalArgumentException("multiplicand and multiplier 
must be >= 0");
-        }
-        if (divisor <= 0) {
-            throw new IllegalArgumentException("divisor must be > 0");
-        }
-        if (multiplicand == 0 || multiplier == 0) {
-            return 0;
-        }
-        // Fast path
-        // Check if multiplication fits in a 64-bit value
-        // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul 
instruction),
-        // avoiding the cost of a division-based overflow check.
-        // It returns the upper 64 bits of the full 128-bit multiplication 
result.
-        // When the result is 0, the product fits in 64 bits.
-        if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
-            long product = multiplicand * multiplier;
-            if (product >= 0) {
-                // product fits in signed 64-bit
-                return product / divisor;
-            }
-            // product is in [2^63, 2^64): fits unsigned but not signed
-            long result = Long.divideUnsigned(product, divisor);
-            // cap at Long.MAX_VALUE if result itself overflows signed long
-            return result >= 0 ? result : Long.MAX_VALUE;
-        }
-        // Fallback to BigInteger division
-        BigInteger result = BigInteger.valueOf(multiplicand)
-                .multiply(BigInteger.valueOf(multiplier))
-                .divide(BigInteger.valueOf(divisor));
-        return result.bitLength() < Long.SIZE ? result.longValue() : 
Long.MAX_VALUE;
+        RateParameters rp = resolveRateParameters();
+        return rp.calculateDuration(needTokens);
     }
 
     /**
@@ -342,4 +325,73 @@ public abstract class AsyncTokenBucket {
         return tokens() > 0;
     }
 
+    /**
+     * Holds pre-computed rate parameters where {@code rate} and {@code 
ratePeriodNanos} have been
+     * divided by their highest common power of ten. This reduction keeps the 
operands smaller and
+     * avoids overflow in {@link #safeMulDivFloor(long, long, long)} without 
changing the result of
+     * any integer floor-division (dividing numerator and denominator by the 
same factor preserves
+     * the quotient). The instance is cached and reused as long as the rate 
and period are unchanged.
+     */
+    static final class RateParameters {
+        final long rate;
+        final long ratePeriodNanos;
+        final long reducedRate;
+        final long reducedRatePeriod;
+
+        RateParameters(long rate, long ratePeriodNanos) {
+            this.rate = rate;
+            this.ratePeriodNanos = ratePeriodNanos;
+            long r = rate;
+            long p = ratePeriodNanos;
+            while (r % 10 == 0 && p % 10 == 0) {
+                r /= 10;
+                p /= 10;
+            }
+            this.reducedRate = r;
+            this.reducedRatePeriod = p;
+        }
+
+        public long calculateTokens(long durationNanos) {
+            return safeMulDivFloor(durationNanos, reducedRate, 
reducedRatePeriod);
+        }
+
+        public long calculateDuration(long tokens) {
+            return safeMulDivFloor(tokens, reducedRatePeriod, reducedRate);
+        }
+
+        private static long safeMulDivFloor(long multiplicand, long 
multiplier, long divisor) {
+            if (multiplicand < 0 || multiplier < 0) {
+                throw new IllegalArgumentException("multiplicand and 
multiplier must be >= 0");
+            }
+            if (divisor <= 0) {
+                throw new IllegalArgumentException("divisor must be > 0");
+            }
+            if (multiplicand == 0 || multiplier == 0) {
+                return 0;
+            }
+            // Fast path
+            // Check if multiplication fits in a 64-bit value
+            // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul 
instruction),
+            // avoiding the cost of a division-based overflow check.
+            // It returns the upper 64 bits of the full 128-bit multiplication 
result.
+            // When the result is 0, the product fits in 64 bits.
+            if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
+                long product = multiplicand * multiplier;
+                if (product >= 0) {
+                    // product fits in signed 64-bit
+                    return product / divisor;
+                }
+                // product is in [2^63, 2^64): fits unsigned but not signed
+                long result = Long.divideUnsigned(product, divisor);
+                // cap at Long.MAX_VALUE if result itself overflows signed long
+                return result >= 0 ? result : Long.MAX_VALUE;
+            }
+            // Fallback to BigInteger division
+            BigInteger result = BigInteger.valueOf(multiplicand)
+                    .multiply(BigInteger.valueOf(multiplier))
+                    .divide(BigInteger.valueOf(divisor));
+            return result.bitLength() < Long.SIZE ? result.longValue() : 
Long.MAX_VALUE;
+        }
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
index f709cb65448..8f47b948aca 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
@@ -205,6 +205,8 @@ public class AsyncTokenBucketTest {
                 {1_000_000_000L},
                 {1_500_000_000L},
                 {2_000_000_000L},
+                {100_000_000_000L},
+                {Long.MAX_VALUE / 1_000_000_000L * 1_000_000_000L},
                 {Long.MAX_VALUE / 100L},
                 {Long.MAX_VALUE / 10L},
                 {Long.MAX_VALUE / 9L},

Reply via email to