This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4cbe124841f [improve][broker] Optimize AsyncTokenBucket overflow
solution further to reduce fallback to BigInteger (#25269)
4cbe124841f is described below
commit 4cbe124841fdc92ab671b1e438a86522a70bd622
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Feb 26 21:14:48 2026 +0200
[improve][broker] Optimize AsyncTokenBucket overflow solution further to
reduce fallback to BigInteger (#25269)
---
.../apache/pulsar/broker/qos/AsyncTokenBucket.java | 130 ++++++++++++++-------
.../pulsar/broker/qos/AsyncTokenBucketTest.java | 2 +
2 files changed, 93 insertions(+), 39 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index f7fc0031ccd..37b7d090522 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -103,6 +103,11 @@ public abstract class AsyncTokenBucket {
* which has a complex solution to prevent the CAS loop content problem.
*/
private final LongAdder pendingConsumedTokens = new LongAdder();
+ /**
+ * Cached pre-reduced rate parameters. Invalidated whenever {@link
#getRate()} or
+ * {@link #getRatePeriodNanos()} returns a different value (relevant for
dynamic-rate buckets).
+ */
+ private volatile RateParameters rateParameters;
protected AsyncTokenBucket(MonotonicClock clockSource, long
addTokensResolutionNanos) {
this.clockSource = clockSource;
@@ -122,6 +127,18 @@ public abstract class AsyncTokenBucket {
protected abstract long getTargetAmountOfTokensAfterThrottling();
+ private RateParameters resolveRateParameters() {
+ long rate = getRate();
+ long ratePeriodNanos = getRatePeriodNanos();
+ RateParameters current = rateParameters;
+ if (current != null && current.rate == rate && current.ratePeriodNanos
== ratePeriodNanos) {
+ return current;
+ }
+ RateParameters updated = new RateParameters(rate, ratePeriodNanos);
+ rateParameters = updated;
+ return updated;
+ }
+
/**
* Consumes tokens and possibly updates the token balance. New tokens are
calculated if the last new token
* calculation occurred more than addTokensResolutionNanos nanoseconds
ago. When new tokens are added, the
@@ -203,13 +220,12 @@ public abstract class AsyncTokenBucket {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos +
REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
- long currentRate = getRate();
- long currentRatePeriodNanos = getRatePeriodNanos();
+ RateParameters rp = resolveRateParameters();
// new tokens is the amount of tokens that are created in the
duration since the last update
// with the configured rate
- newTokens = safeMulDivFloor(durationNanos, currentRate,
currentRatePeriodNanos);
+ newTokens = rp.calculateTokens(durationNanos);
// carry forward the remainder nanos so that the rounding error is
eliminated
- long consumedNanos = safeMulDivFloor(newTokens,
currentRatePeriodNanos, currentRate);
+ long consumedNanos = rp.calculateDuration(newTokens);
long remainderNanos = durationNanos >= consumedNanos ?
durationNanos - consumedNanos : 0;
if (remainderNanos > 0) {
REMAINDER_NANOS_UPDATER.addAndGet(this, remainderNanos);
@@ -277,41 +293,8 @@ public abstract class AsyncTokenBucket {
} catch (ArithmeticException e) {
needTokens = Long.MAX_VALUE;
}
- return safeMulDivFloor(needTokens, getRatePeriodNanos(), getRate());
- }
-
- private static long safeMulDivFloor(long multiplicand, long multiplier,
long divisor) {
- if (multiplicand < 0 || multiplier < 0) {
- throw new IllegalArgumentException("multiplicand and multiplier
must be >= 0");
- }
- if (divisor <= 0) {
- throw new IllegalArgumentException("divisor must be > 0");
- }
- if (multiplicand == 0 || multiplier == 0) {
- return 0;
- }
- // Fast path
- // Check if multiplication fits in a 64-bit value
- // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul
instruction),
- // avoiding the cost of a division-based overflow check.
- // It returns the upper 64 bits of the full 128-bit multiplication
result.
- // When the result is 0, the product fits in 64 bits.
- if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
- long product = multiplicand * multiplier;
- if (product >= 0) {
- // product fits in signed 64-bit
- return product / divisor;
- }
- // product is in [2^63, 2^64): fits unsigned but not signed
- long result = Long.divideUnsigned(product, divisor);
- // cap at Long.MAX_VALUE if result itself overflows signed long
- return result >= 0 ? result : Long.MAX_VALUE;
- }
- // Fallback to BigInteger division
- BigInteger result = BigInteger.valueOf(multiplicand)
- .multiply(BigInteger.valueOf(multiplier))
- .divide(BigInteger.valueOf(divisor));
- return result.bitLength() < Long.SIZE ? result.longValue() :
Long.MAX_VALUE;
+ RateParameters rp = resolveRateParameters();
+ return rp.calculateDuration(needTokens);
}
/**
@@ -342,4 +325,73 @@ public abstract class AsyncTokenBucket {
return tokens() > 0;
}
+ /**
+ * Holds pre-computed rate parameters where {@code rate} and {@code
ratePeriodNanos} have been
+ * divided by their highest common power of ten. This reduction keeps the
operands smaller and
+ * avoids overflow in {@link #safeMulDivFloor(long, long, long)} without
changing the result of
+ * any integer floor-division (dividing numerator and denominator by the
same factor preserves
+ * the quotient). The instance is cached and reused as long as the rate
and period are unchanged.
+ */
+ static final class RateParameters {
+ final long rate;
+ final long ratePeriodNanos;
+ final long reducedRate;
+ final long reducedRatePeriod;
+
+ RateParameters(long rate, long ratePeriodNanos) {
+ this.rate = rate;
+ this.ratePeriodNanos = ratePeriodNanos;
+ long r = rate;
+ long p = ratePeriodNanos;
+ while (r % 10 == 0 && p % 10 == 0) {
+ r /= 10;
+ p /= 10;
+ }
+ this.reducedRate = r;
+ this.reducedRatePeriod = p;
+ }
+
+ public long calculateTokens(long durationNanos) {
+ return safeMulDivFloor(durationNanos, reducedRate,
reducedRatePeriod);
+ }
+
+ public long calculateDuration(long tokens) {
+ return safeMulDivFloor(tokens, reducedRatePeriod, reducedRate);
+ }
+
+ private static long safeMulDivFloor(long multiplicand, long
multiplier, long divisor) {
+ if (multiplicand < 0 || multiplier < 0) {
+ throw new IllegalArgumentException("multiplicand and
multiplier must be >= 0");
+ }
+ if (divisor <= 0) {
+ throw new IllegalArgumentException("divisor must be > 0");
+ }
+ if (multiplicand == 0 || multiplier == 0) {
+ return 0;
+ }
+ // Fast path
+ // Check if multiplication fits in a 64-bit value
+ // Math.multiplyHigh is intrinsified by the JVM (single mulq/mul
instruction),
+ // avoiding the cost of a division-based overflow check.
+ // It returns the upper 64 bits of the full 128-bit multiplication
result.
+ // When the result is 0, the product fits in 64 bits.
+ if (Math.multiplyHigh(multiplicand, multiplier) == 0) {
+ long product = multiplicand * multiplier;
+ if (product >= 0) {
+ // product fits in signed 64-bit
+ return product / divisor;
+ }
+ // product is in [2^63, 2^64): fits unsigned but not signed
+ long result = Long.divideUnsigned(product, divisor);
+ // cap at Long.MAX_VALUE if result itself overflows signed long
+ return result >= 0 ? result : Long.MAX_VALUE;
+ }
+ // Fallback to BigInteger division
+ BigInteger result = BigInteger.valueOf(multiplicand)
+ .multiply(BigInteger.valueOf(multiplier))
+ .divide(BigInteger.valueOf(divisor));
+ return result.bitLength() < Long.SIZE ? result.longValue() :
Long.MAX_VALUE;
+ }
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
index f709cb65448..8f47b948aca 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
@@ -205,6 +205,8 @@ public class AsyncTokenBucketTest {
{1_000_000_000L},
{1_500_000_000L},
{2_000_000_000L},
+ {100_000_000_000L},
+ {Long.MAX_VALUE / 1_000_000_000L * 1_000_000_000L},
{Long.MAX_VALUE / 100L},
{Long.MAX_VALUE / 10L},
{Long.MAX_VALUE / 9L},