This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f8e4c11b5cd [fix][broker] Fix rate limiter token bucket and clock
consistency issues causing excessive throttling and connection timeouts (#23930)
f8e4c11b5cd is described below
commit f8e4c11b5cd94382a3493b3e129e46bfc6a0621d
Author: Lari Hotari <[email protected]>
AuthorDate: Sat Feb 8 19:01:07 2025 +0200
[fix][broker] Fix rate limiter token bucket and clock consistency issues
causing excessive throttling and connection timeouts (#23930)
---
microbench/README.md | 26 +++
.../broker/qos/AsyncTokenBucketBenchmark.java | 17 +-
...=> DefaultMonotonicSnapshotClockBenchmark.java} | 55 +++--
.../apache/pulsar/broker/qos/AsyncTokenBucket.java | 127 ++++++----
.../pulsar/broker/qos/AsyncTokenBucketBuilder.java | 34 +++
.../broker/qos/DefaultMonotonicSnapshotClock.java | 260 ++++++++++++++++++---
.../broker/qos/DynamicRateAsyncTokenBucket.java | 7 +-
.../qos/DynamicRateAsyncTokenBucketBuilder.java | 6 +-
.../broker/qos/FinalRateAsyncTokenBucket.java | 7 +-
.../qos/FinalRateAsyncTokenBucketBuilder.java | 2 +-
.../pulsar/broker/service/BrokerService.java | 6 +
.../broker/service/PublishRateLimiterImpl.java | 11 +-
.../service/persistent/DispatchRateLimiter.java | 39 +++-
.../service/persistent/SubscribeRateLimiter.java | 8 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 46 ++--
.../pulsar/broker/qos/AsyncTokenBucketTest.java | 143 +++++++++++-
.../qos/DefaultMonotonicSnapshotClockTest.java | 185 +++++++++++++++
.../RGUsageMTAggrWaitForAllMsgsTest.java | 5 +-
.../broker/service/PublishRateLimiterTest.java | 5 +-
.../api/AbstractMessageDispatchThrottlingTest.java | 116 +++++++++
.../client/api/MessageDispatchThrottlingTest.java | 166 ++++---------
.../SubscriptionMessageDispatchThrottlingTest.java | 57 ++---
.../client/impl/MessagePublishThrottlingTest.java | 2 +-
23 files changed, 1026 insertions(+), 304 deletions(-)
diff --git a/microbench/README.md b/microbench/README.md
index 780e3a5a1d3..f50c3036ff4 100644
--- a/microbench/README.md
+++ b/microbench/README.md
@@ -41,3 +41,29 @@ For fast recompiling of the benchmarks (without compiling
Pulsar modules) and cr
mvn -Pmicrobench -pl microbench clean package
```
+### Running specific benchmarks
+
+Display help:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar -h
+```
+
+Listing all benchmarks:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar -l
+```
+
+Running specific benchmarks:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*"
+```
+
+Checking what benchmarks match the pattern:
+
+```shell
+java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
+```
+
diff --git
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
index 4c069e72ea3..1b210258f13 100644
---
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
+++
b/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
@@ -33,6 +33,7 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@@ -59,23 +60,29 @@ public class AsyncTokenBucketBenchmark {
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark001Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void consumeTokensBenchmark001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
}
@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark010Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void consumeTokensBenchmark010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
}
@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark100Threads() {
+ public void consumeTokensBenchmark100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole);
+ }
+
+ private void consumeTokenAndGetTokens(Blackhole blackhole) {
asyncTokenBucket.consumeTokens(1);
+ // blackhole is used to ensure that the compiler doesn't do dead code
elimination
+ blackhole.consume(asyncTokenBucket.getTokens());
}
}
diff --git
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
similarity index 58%
copy from
microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
copy to
microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
index 4c069e72ea3..d9054b8fe4b 100644
---
a/microbench/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBenchmark.java
+++
b/microbench/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockBenchmark.java
@@ -28,27 +28,19 @@ import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
-public class AsyncTokenBucketBenchmark {
- private AsyncTokenBucket asyncTokenBucket;
+public class DefaultMonotonicSnapshotClockBenchmark {
private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
- new
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(8),
System::nanoTime);
-
- @Setup(Level.Iteration)
- public void setup() {
- long ratePerSecond = 100_000_000;
- asyncTokenBucket =
AsyncTokenBucket.builder().rate(ratePerSecond).clock(monotonicSnapshotClock)
- .initialTokens(2 * ratePerSecond).capacity(2 *
ratePerSecond).build();
- }
+ new
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1),
System::nanoTime);
@TearDown(Level.Iteration)
public void teardown() {
@@ -59,23 +51,52 @@ public class AsyncTokenBucketBenchmark {
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark001Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void getTickNanos001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+
+ @Threads(10)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanos010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+
+ @Threads(100)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanos100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, false);
+ }
+
+ @Threads(1)
+ @Benchmark
+ @Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ @Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
+ public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
}
@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark010Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
}
@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
- public void consumeTokensBenchmark100Threads() {
- asyncTokenBucket.consumeTokens(1);
+ public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) {
+ consumeTokenAndGetTokens(blackhole, true);
+ }
+
+ private void consumeTokenAndGetTokens(Blackhole blackhole, boolean
requestSnapshot) {
+ // blackhole is used to ensure that the compiler doesn't do dead code
elimination
+
blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot));
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e59..8c43fa0a816 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.LongAdder;
* connection or client from the throttling queue to unthrottle. Before
unthrottling, the application should check
* for available tokens. If tokens are still not available, the application
should continue with throttling and
* repeat the throttling loop.
+ * <p>By default, the AsyncTokenBucket is eventually consistent. This means
that the token balance is updated
+ * with added tokens and consumed tokens at most once during each "increment",
when time advances more than the
+ * configured resolution. There are settings for configuring consistency,
please see {@link AsyncTokenBucketBuilder}
+ * for details.
* <p>This class does not produce side effects outside its own scope. It
functions similarly to a stateful function,
* akin to a counter function. In essence, it is a sophisticated counter. It
can serve as a foundational component for
* constructing higher-level asynchronous rate limiter implementations, which
require side effects for throttling.
@@ -119,9 +123,28 @@ public abstract class AsyncTokenBucket {
*/
private final LongAdder pendingConsumedTokens = new LongAdder();
- protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long
resolutionNanos) {
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that
the consumed tokens are subtracted from
+ * the total amount of tokens at most once during each "increment", when
time advances more than the configured
+ * resolution. This setting determines if the consumed tokens are
subtracted from tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ private final boolean consistentConsumedTokens;
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that
the added tokens are calculated based
+ * on elapsed time at most once during each "increment", when time
advances more than the configured
+ * resolution. This setting determines if the added tokens are calculated
and added to tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ private final boolean consistentAddedTokens;
+
+ protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long
resolutionNanos,
+ boolean consistentConsumedTokens, boolean
consistentAddedTokens) {
this.clockSource = clockSource;
this.resolutionNanos = resolutionNanos;
+ this.lastNanos = Long.MIN_VALUE;
+ this.consistentConsumedTokens = consistentConsumedTokens;
+ this.consistentAddedTokens = consistentAddedTokens;
}
public static FinalRateAsyncTokenBucketBuilder builder() {
@@ -139,36 +162,46 @@ public abstract class AsyncTokenBucket {
/**
* Consumes tokens and possibly updates the tokens balance. New tokens are
calculated and added to the current
* tokens balance each time the update takes place. The update takes place
once in every interval of the configured
- * resolutionNanos or when the forceUpdateTokens parameter is true.
+ * resolutionNanos or when the forceConsistentTokens parameter is true.
* When the tokens balance isn't updated, the consumed tokens are added to
the pendingConsumedTokens LongAdder
* counter which gets flushed the next time the tokens are updated. This
makes the tokens balance
* eventually consistent. The reason for this design choice is to optimize
performance by preventing CAS loop
* contention which could cause excessive CPU consumption.
*
* @param consumeTokens number of tokens to consume, can be 0 to
update the tokens balance
- * @param forceUpdateTokens if true, the tokens are updated even if the
configured resolution hasn't passed
+ * @param forceConsistentTokens if true, the token balance is updated
consistently
* @return the current number of tokens in the bucket or Long.MIN_VALUE
when the number of tokens is unknown due
* to eventual consistency
*/
- private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens,
boolean forceUpdateTokens) {
+ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens,
boolean forceConsistentTokens) {
if (consumeTokens < 0) {
throw new IllegalArgumentException("consumeTokens must be >= 0");
}
- long currentNanos = clockSource.getTickNanos(forceUpdateTokens);
+ boolean requestConsistentTickNanosSnapshot =
+ consistentAddedTokens || consistentConsumedTokens ||
forceConsistentTokens || resolutionNanos == 0;
+ long currentNanos =
clockSource.getTickNanos(requestConsistentTickNanosSnapshot);
+ long newTokens = 0;
// check if the tokens should be updated immediately
- if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
+ if (shouldAddTokensImmediately(currentNanos, forceConsistentTokens)) {
// calculate the number of new tokens since the last update
- long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
- // calculate the total amount of tokens to consume in this update
+ newTokens = calculateNewTokensSinceLastUpdate(currentNanos,
forceConsistentTokens);
+ }
+ // update tokens if there are new tokens or if resolutionNanos is set
to 0 which is currently used for testing
+ if (newTokens > 0 || resolutionNanos == 0 || consistentConsumedTokens
|| forceConsistentTokens) {
// flush the pendingConsumedTokens by calling "sumThenReset"
- long totalConsumedTokens = consumeTokens +
pendingConsumedTokens.sumThenReset();
- // update the tokens and return the current token value
- return TOKENS_UPDATER.updateAndGet(this,
- currentTokens ->
- // after adding new tokens, limit the tokens to
the capacity
- Math.min(currentTokens + newTokens, getCapacity())
- // subtract the consumed tokens
- - totalConsumedTokens);
+ long currentPendingConsumedTokens =
pendingConsumedTokens.sumThenReset();
+ // calculate the token delta by subtracting the consumed tokens
from the new tokens
+ long tokenDelta = newTokens - currentPendingConsumedTokens;
+ if (tokenDelta != 0 || consumeTokens != 0) {
+ // update the tokens and return the current token value
+ return TOKENS_UPDATER.updateAndGet(this,
+ // limit the tokens to the capacity of the bucket
+ currentTokens -> Math.min(currentTokens + tokenDelta,
getCapacity())
+ // subtract the consumed tokens from the
capped tokens
+ - consumeTokens);
+ } else {
+ return tokens;
+ }
} else {
// eventual consistent fast path, tokens are not updated
immediately
@@ -187,19 +220,19 @@ public abstract class AsyncTokenBucket {
*
* The tokens will be updated once every resolutionNanos nanoseconds.
* This method checks if the configured resolutionNanos has passed since
the last update.
- * If the forceUpdateTokens is true, the tokens will be updated
immediately.
+ * If the forceConsistentTokens is true, the tokens will be updated
immediately.
*
- * @param currentNanos the current monotonic clock time in nanoseconds
- * @param forceUpdateTokens if true, the tokens will be updated immediately
+ * @param currentNanos the current monotonic clock time in nanoseconds
+ * @param forceConsistentTokens if true, the tokens are added even if the
configured resolution hasn't fully passed
* @return true if the tokens should be updated immediately, false
otherwise
*/
- private boolean shouldUpdateTokensImmediately(long currentNanos, boolean
forceUpdateTokens) {
+ private boolean shouldAddTokensImmediately(long currentNanos, boolean
forceConsistentTokens) {
long currentIncrement = resolutionNanos != 0 ? currentNanos /
resolutionNanos : 0;
long currentLastIncrement = lastIncrement;
return currentIncrement == 0
|| (currentIncrement > currentLastIncrement
&& LAST_INCREMENT_UPDATER.compareAndSet(this,
currentLastIncrement, currentIncrement))
- || forceUpdateTokens;
+ || consistentAddedTokens || forceConsistentTokens;
}
/**
@@ -209,10 +242,22 @@ public abstract class AsyncTokenBucket {
* @param currentNanos the current monotonic clock time in nanoseconds
* @return the number of new tokens to add since the last update
*/
- private long calculateNewTokensSinceLastUpdate(long currentNanos) {
+ private long calculateNewTokensSinceLastUpdate(long currentNanos, boolean
forceConsistentTokens) {
+ long previousLastNanos = lastNanos;
+ long newLastNanos;
+ // update lastNanos only if at least resolutionNanos/2 nanoseconds has
passed since the last update
+ // unless consistency is needed
+ long minimumIncrementNanos = forceConsistentTokens ||
consistentAddedTokens ? 0L : resolutionNanos / 2;
+ if (currentNanos > previousLastNanos + minimumIncrementNanos) {
+ newLastNanos = currentNanos;
+ } else {
+ newLastNanos = previousLastNanos;
+ }
long newTokens;
- long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this,
currentNanos);
- if (previousLastNanos == 0) {
+ if (newLastNanos == previousLastNanos
+ // prevent races with a CAS update of lastNanos
+ || !LAST_NANOS_UPDATER.compareAndSet(this, previousLastNanos,
newLastNanos)
+ || previousLastNanos == Long.MIN_VALUE) {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos +
REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
@@ -267,15 +312,14 @@ public abstract class AsyncTokenBucket {
}
/**
- * Returns the current token balance. When forceUpdateTokens is true, the
tokens balance is updated before
- * returning. If forceUpdateTokens is false, the tokens balance could be
updated if the last updated happened
+ * Returns the current token balance. When forceConsistentTokens is true,
the tokens balance is updated before
+ * returning. If forceConsistentTokens is false, the tokens balance could
be updated if the last updated happened
* more than resolutionNanos nanoseconds ago.
*
- * @param forceUpdateTokens if true, the tokens balance is updated before
returning
* @return the current token balance
*/
- protected long tokens(boolean forceUpdateTokens) {
- long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0,
forceUpdateTokens);
+ private long tokens() {
+ long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0,
false);
if (currentTokens != Long.MIN_VALUE) {
// when currentTokens isn't Long.MIN_VALUE, the current tokens
balance is known
return currentTokens;
@@ -295,7 +339,7 @@ public abstract class AsyncTokenBucket {
long currentTokens = consumeTokensAndMaybeUpdateTokensBalance(0, true);
if (currentTokens == Long.MIN_VALUE) {
throw new IllegalArgumentException(
- "Unexpected result from updateAndConsumeTokens with
forceUpdateTokens set to true");
+ "Unexpected result from updateAndConsumeTokens with
forceConsistentTokens set to true");
}
if (currentTokens > 0) {
return 0L;
@@ -309,10 +353,11 @@ public abstract class AsyncTokenBucket {
/**
* Returns the current number of tokens in the bucket.
- * The token balance is updated if the configured resolutionNanos has
passed since the last update.
+ * The token balance is updated if the configured resolutionNanos has
passed since the last update unless
+ * consistentConsumedTokens is true.
*/
public final long getTokens() {
- return tokens(false);
+ return tokens();
}
public abstract long getRate();
@@ -320,25 +365,13 @@ public abstract class AsyncTokenBucket {
/**
* Checks if the bucket contains tokens.
* The token balance is updated before the comparison if the configured
resolutionNanos has passed since the last
- * update. It's possible that the returned result is not definite since
the token balance is eventually consistent.
+ * update. It's possible that the returned result is not definite since
the token balance is eventually consistent
+ * if consistentConsumedTokens is false.
*
* @return true if the bucket contains tokens, false otherwise
*/
public boolean containsTokens() {
- return containsTokens(false);
- }
-
- /**
- * Checks if the bucket contains tokens.
- * The token balance is updated before the comparison if the configured
resolutionNanos has passed since the last
- * update. The token balance is also updated when forceUpdateTokens is
true.
- * It's possible that the returned result is not definite since the token
balance is eventually consistent.
- *
- * @param forceUpdateTokens if true, the token balance is updated before
the comparison
- * @return true if the bucket contains tokens, false otherwise
- */
- public boolean containsTokens(boolean forceUpdateTokens) {
- return tokens(forceUpdateTokens) > 0;
+ return tokens() > 0;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
index ee256d5a37d..1c05f1a213e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucketBuilder.java
@@ -23,6 +23,8 @@ package org.apache.pulsar.broker.qos;
public abstract class AsyncTokenBucketBuilder<SELF extends
AsyncTokenBucketBuilder<SELF>> {
protected MonotonicSnapshotClock clock =
AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK;
protected long resolutionNanos = AsyncTokenBucket.defaultResolutionNanos;
+ protected boolean consistentConsumedTokens;
+ protected boolean consistentAddedTokens;
protected AsyncTokenBucketBuilder() {
}
@@ -31,15 +33,47 @@ public abstract class AsyncTokenBucketBuilder<SELF extends
AsyncTokenBucketBuild
return (SELF) this;
}
+ /**
+ * Set the clock source for the token bucket. It's recommended to use the
{@link DefaultMonotonicSnapshotClock}
+ * for most use cases.
+ */
public SELF clock(MonotonicSnapshotClock clock) {
this.clock = clock;
return self();
}
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that
the token balance is updated, when time
+ * advances more than the configured resolution. This setting determines
the duration of the increment.
+ * Setting this value to 0 will make the token balance fully consistent.
There's a performance trade-off
+ * when setting this value to 0.
+ */
public SELF resolutionNanos(long resolutionNanos) {
this.resolutionNanos = resolutionNanos;
return self();
}
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that
the consumed tokens are subtracted from
+ * the total amount of tokens at most once during each "increment", when
time advances more than the configured
+ * resolution. This setting determines if the consumed tokens are
subtracted from tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ public SELF consistentConsumedTokens(boolean consistentConsumedTokens) {
+ this.consistentConsumedTokens = consistentConsumedTokens;
+ return self();
+ }
+
+ /**
+ * By default, AsyncTokenBucket is eventually consistent. This means that
the added tokens are calculated based
+ * on elapsed time at most once during each "increment", when time
advances more than the configured
+ * resolution. This setting determines if the added tokens are calculated
and added to tokens balance consistently.
+ * For high performance, it is recommended to keep this setting as false.
+ */
+ public SELF consistentAddedTokens(boolean consistentAddedTokens) {
+ this.consistentAddedTokens = consistentAddedTokens;
+ return self();
+ }
+
public abstract AsyncTokenBucket build();
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
index df3843921ed..23b9359c804 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClock.java
@@ -19,71 +19,269 @@
package org.apache.pulsar.broker.qos;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Default implementation of {@link MonotonicSnapshotClock}.
+ * Default implementation of {@link MonotonicSnapshotClock} optimized for use
with {@link AsyncTokenBucket}.
*
- * Starts a daemon thread that updates the snapshot value periodically with a
configured interval. The close method
- * should be called to stop the thread.
+ * <p>
+ * This class provides a monotonic snapshot value that consistently increases,
ensuring reliable behavior
+ * even in environments where the underlying clock source may not be strictly
monotonic across all CPUs,
+ * such as certain virtualized platforms.
+ * </p>
+ *
+ * <p>
+ * Upon instantiation, a daemon thread is launched to periodically update the
snapshot value at a configured
+ * interval. It is essential to invoke the {@link #close()} method to
gracefully terminate this thread when it is
+ * no longer needed.
+ * </p>
+ *
+ * <p>
+ * The {@link AsyncTokenBucket} utilizes this clock to obtain tick values. It
does not require a consistent value on
+ * every retrieval. However, when a consistent snapshot is necessary, the
{@link #getTickNanos(boolean)} method
+ * is called with the {@code requestSnapshot} parameter set to {@code true}.
+ * </p>
+ *
+ * <p>
+ * By employing a single thread to update the monotonic clock value, this
implementation ensures that the snapshot
+ * value remains strictly increasing. This approach mitigates potential
inconsistencies that may arise from clock
+ * source discrepancies across different CPUs.
+ * </p>
*/
public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class);
- private final long sleepMillis;
- private final int sleepNanos;
- private final LongSupplier clockSource;
- private final Thread thread;
+ private final TickUpdaterThread tickUpdaterThread;
private volatile long snapshotTickNanos;
public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos,
LongSupplier clockSource) {
if (snapshotIntervalNanos < TimeUnit.MILLISECONDS.toNanos(1)) {
throw new IllegalArgumentException("snapshotIntervalNanos must be
at least 1 millisecond");
}
- this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
- this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
- this.clockSource = clockSource;
- updateSnapshotTickNanos();
- thread = new Thread(this::snapshotLoop, getClass().getSimpleName() +
"-update-loop");
- thread.setDaemon(true);
- thread.start();
+ tickUpdaterThread = new TickUpdaterThread(snapshotIntervalNanos,
+ Objects.requireNonNull(clockSource, "clockSource must not be
null"), this::setSnapshotTickNanos);
+ tickUpdaterThread.start();
+ }
+
+ private void setSnapshotTickNanos(long snapshotTickNanos) {
+ this.snapshotTickNanos = snapshotTickNanos;
}
/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
- updateSnapshotTickNanos();
+ tickUpdaterThread.requestUpdateAndWait();
}
return snapshotTickNanos;
}
- private void updateSnapshotTickNanos() {
- snapshotTickNanos = clockSource.getAsLong();
+ @Override
+ public void close() {
+ tickUpdaterThread.interrupt();
}
- private void snapshotLoop() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- updateSnapshotTickNanos();
+ /**
+ * A thread that updates snapshotTickNanos value periodically with a
configured interval.
+ * The thread is started when the DefaultMonotonicSnapshotClock is created
and runs until the close method is
+ * called.
+ * A single thread is used to read the clock source value since on some
hardware of virtualized platforms,
+ * System.nanoTime() isn't strictly monotonic across all CPUs. Reading by
a single thread will improve the
+ * stability of the read value since a single thread is scheduled on a
single CPU. If the thread is migrated
+ * to another CPU, the clock source value might leap backward or forward,
but logic in this class will handle it.
+ */
+ private static class TickUpdaterThread extends Thread {
+ private final Object tickUpdateDelayMonitor = new Object();
+ private final Object tickUpdatedMonitor = new Object();
+ private final MonotonicLeapDetectingTickUpdater tickUpdater;
+ private volatile boolean running;
+ private boolean tickUpdateDelayMonitorNotified;
+ private AtomicLong requestCount = new AtomicLong();
+ private final long sleepMillis;
+ private final int sleepNanos;
+
+ TickUpdaterThread(long snapshotIntervalNanos, LongSupplier
clockSource, LongConsumer setSnapshotTickNanos) {
+ super(DefaultMonotonicSnapshotClock.class.getSimpleName() +
"-update-loop");
+ // set as daemon thread so that it doesn't prevent the JVM from
exiting
+ setDaemon(true);
+ // set the highest priority
+ setPriority(MAX_PRIORITY);
+ this.sleepMillis =
TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
+ this.sleepNanos = (int) (snapshotIntervalNanos -
TimeUnit.MILLISECONDS.toNanos(sleepMillis));
+ tickUpdater = new MonotonicLeapDetectingTickUpdater(clockSource,
setSnapshotTickNanos,
+ snapshotIntervalNanos);
+ }
+
+ @Override
+ public void run() {
+ try {
+ running = true;
+ long updatedForRequestCount = -1;
+ while (!isInterrupted()) {
+ try {
+ // track if the thread has waited for the whole
duration of the snapshot interval
+ // before updating the tick value
+ boolean waitedSnapshotInterval = false;
+ // sleep for the configured interval on a monitor that
can be notified to stop the sleep
+ // and update the tick value immediately. This is used
in requestUpdate method.
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = false;
+ // only wait if no explicit request has been made
since the last update
+ if (requestCount.get() == updatedForRequestCount) {
+ // if no request has been made, sleep for the
configured interval
+ tickUpdateDelayMonitor.wait(sleepMillis,
sleepNanos);
+ waitedSnapshotInterval =
!tickUpdateDelayMonitorNotified;
+ }
+ }
+ updatedForRequestCount = requestCount.get();
+ // update the tick value using the tick updater which
will tolerate leaps backward
+ tickUpdater.update(waitedSnapshotInterval);
+ notifyAllTickUpdated();
+ } catch (InterruptedException e) {
+ interrupt();
+ break;
+ }
+ }
+ } catch (Throwable t) {
+ // report unexpected error since this would be a fatal error
when the clock doesn't progress anymore
+ // this is very unlikely to happen, but it's better to log it
in any case
+ LOG.error("Unexpected fatal error that stopped the clock.", t);
+ } finally {
+ LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread
stopped. {},tid={}", this, getId());
+ running = false;
+ notifyAllTickUpdated();
+ }
+ }
+
+ private void notifyAllTickUpdated() {
+ synchronized (tickUpdatedMonitor) {
+ // notify all threads that are waiting for the tick value to
be updated
+ tickUpdatedMonitor.notifyAll();
+ }
+ }
+
+ public void requestUpdateAndWait() {
+ if (!running) {
+ synchronized (tickUpdater) {
+ // thread has stopped running, fallback to update the
value directly without optimizations
+ tickUpdater.update(false);
+ }
+ return;
+ }
+ // increment the request count that ensures that the thread will
update the tick value after this request
+ // was made also when there's a race condition between the request
and the update
+ // this solution doesn't prevent all races, and it's not
guaranteed that the tick value is always updated
+ // it will prevent the request having to wait for the delayed
update cycle. This is sufficient for the
+ // use case.
+ requestCount.incrementAndGet();
+ synchronized (tickUpdatedMonitor) {
+ // notify the thread to stop waiting and update the tick value
+ synchronized (tickUpdateDelayMonitor) {
+ tickUpdateDelayMonitorNotified = true;
+ tickUpdateDelayMonitor.notify();
+ }
+ // wait until the tick value has been updated
try {
- Thread.sleep(sleepMillis, sleepNanos);
+ tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
+ currentThread().interrupt();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void start() {
+ // wait until the thread is started and the tick value has been
updated
+ synchronized (tickUpdatedMonitor) {
+ super.start();
+ try {
+ tickUpdatedMonitor.wait();
+ } catch (InterruptedException e) {
+ currentThread().interrupt();
}
}
- } catch (Throwable t) {
- // report unexpected error since this would be a fatal error when
the clock doesn't progress anymore
- // this is very unlikely to happen, but it's better to log it in
any case
- LOG.error("Unexpected fatal error that stopped the clock.", t);
}
}
- @Override
- public void close() {
- thread.interrupt();
+ /**
+ * Handles updating the tick value in a monotonic way so that the value is
always increasing,
+ * regardless of leaps backward in the clock source value.
+ */
+ static class MonotonicLeapDetectingTickUpdater {
+ private final LongSupplier clockSource;
+ private final long snapshotInternalNanos;
+ private final long maxDeltaNanosForLeapDetection;
+ private final LongConsumer tickUpdatedCallback;
+ private long referenceClockSourceValue = Long.MIN_VALUE;
+ private long baseSnapshotTickNanos;
+ private long previousSnapshotTickNanos;
+
+ MonotonicLeapDetectingTickUpdater(LongSupplier clockSource,
LongConsumer tickUpdatedCallback,
+ long snapshotInternalNanos) {
+ this.clockSource = clockSource;
+ this.snapshotInternalNanos = snapshotInternalNanos;
+ this.maxDeltaNanosForLeapDetection = 2 * snapshotInternalNanos;
+ this.tickUpdatedCallback = tickUpdatedCallback;
+ }
+
+ /**
+ * Updates the snapshot tick value. The tickUpdatedCallback is called
if the value has changed.
+ * The value is updated in a monotonic way so that the value is always
increasing, regardless of leaps backward
+ * in the clock source value.
+ * Leap detection is done by comparing the new value with the previous
value and the maximum delta value.
+ *
+ * @param waitedSnapshotInterval if true, the method has waited for
the snapshot interval since the previous
+ * call.
+ */
+ public void update(boolean waitedSnapshotInterval) {
+ // get the current clock source value
+ long clockValue = clockSource.getAsLong();
+
+ // Initialization on first call
+ if (referenceClockSourceValue == Long.MIN_VALUE) {
+ referenceClockSourceValue = clockValue;
+ baseSnapshotTickNanos = clockValue;
+ previousSnapshotTickNanos = clockValue;
+ // update the tick value using the callback
+ tickUpdatedCallback.accept(clockValue);
+ return;
+ }
+
+ // calculate the duration since the reference clock source value
+ // so that the snapshot value is always increasing and tolerates
it when the clock source is not strictly
+ // monotonic across all CPUs and leaps backward
+ long durationSinceReference = clockValue -
referenceClockSourceValue;
+ // calculate the new snapshot tick value as a duration since the
reference clock source value
+ // and add it to the base snapshot tick value
+ long newSnapshotTickNanos = baseSnapshotTickNanos +
durationSinceReference;
+
+ // reset the reference clock source value if the clock source
value leaps backward
+ // more than the maximum delta value
+ if (newSnapshotTickNanos < previousSnapshotTickNanos -
maxDeltaNanosForLeapDetection) {
+ // when the clock source value leaps backward, reset the
reference value to the new value
+ // for future duration calculations
+ referenceClockSourceValue = clockValue;
+ // if the updater thread has waited for the snapshot interval
since the previous call,
+ // increment the base snapshot tick value by the snapshot
interval value
+ long incrementWhenLeapDetected = waitedSnapshotInterval ?
snapshotInternalNanos : 0;
+ // set the base snapshot tick value to the new value
+ baseSnapshotTickNanos = previousSnapshotTickNanos +
incrementWhenLeapDetected;
+ // set the new snapshot tick value to the base value
+ newSnapshotTickNanos = baseSnapshotTickNanos;
+ }
+
+ // update snapshotTickNanos value if the new value is greater than
the previous value
+ if (newSnapshotTickNanos > previousSnapshotTickNanos) {
+ // store the previous value
+ previousSnapshotTickNanos = newSnapshotTickNanos;
+ // update the tick value using the callback
+ tickUpdatedCallback.accept(newSnapshotTickNanos);
+ }
+ }
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
index 8edc73d1f51..f2eae8aed8d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucket.java
@@ -34,15 +34,16 @@ public class DynamicRateAsyncTokenBucket extends
AsyncTokenBucket {
protected DynamicRateAsyncTokenBucket(double capacityFactor, LongSupplier
rateFunction,
MonotonicSnapshotClock clockSource,
LongSupplier ratePeriodNanosFunction,
- long resolutionNanos, double
initialTokensFactor,
+ long resolutionNanos, boolean
consistentConsumedTokens,
+ boolean consistentAddedTokens,
double initialTokensFactor,
double
targetFillFactorAfterThrottling) {
- super(clockSource, resolutionNanos);
+ super(clockSource, resolutionNanos, consistentConsumedTokens,
consistentAddedTokens);
this.capacityFactor = capacityFactor;
this.rateFunction = rateFunction;
this.ratePeriodNanosFunction = ratePeriodNanosFunction;
this.targetFillFactorAfterThrottling = targetFillFactorAfterThrottling;
this.tokens = (long) (rateFunction.getAsLong() * initialTokensFactor);
- tokens(false);
+ getTokens();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
index 22270484c72..8aebecddf90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/DynamicRateAsyncTokenBucketBuilder.java
@@ -64,9 +64,7 @@ public class DynamicRateAsyncTokenBucketBuilder
@Override
public AsyncTokenBucket build() {
return new DynamicRateAsyncTokenBucket(this.capacityFactor,
this.rateFunction,
- this.clock,
- this.ratePeriodNanosFunction, this.resolutionNanos,
- this.initialFillFactor,
- targetFillFactorAfterThrottling);
+ this.clock, this.ratePeriodNanosFunction,
this.resolutionNanos, this.consistentConsumedTokens,
+ this.consistentAddedTokens, this.initialFillFactor,
targetFillFactorAfterThrottling);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
index 627c5ee1334..d83290b723f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucket.java
@@ -30,15 +30,16 @@ class FinalRateAsyncTokenBucket extends AsyncTokenBucket {
private final long targetAmountOfTokensAfterThrottling;
protected FinalRateAsyncTokenBucket(long capacity, long rate,
MonotonicSnapshotClock clockSource,
- long ratePeriodNanos, long
resolutionNanos, long initialTokens) {
- super(clockSource, resolutionNanos);
+ long ratePeriodNanos, long
resolutionNanos, boolean consistentConsumedTokens,
+ boolean consistentAddedTokens, long
initialTokens) {
+ super(clockSource, resolutionNanos, consistentConsumedTokens,
consistentAddedTokens);
this.capacity = capacity;
this.rate = rate;
this.ratePeriodNanos = ratePeriodNanos != -1 ? ratePeriodNanos :
ONE_SECOND_NANOS;
// The target amount of tokens is the amount of tokens made available
in the resolution duration
this.targetAmountOfTokensAfterThrottling =
Math.max(this.resolutionNanos * rate / ratePeriodNanos, 1);
this.tokens = initialTokens;
- tokens(false);
+ getTokens();
}
@Override
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
index ff4ed53c6c7..a292000eaa8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/FinalRateAsyncTokenBucketBuilder.java
@@ -55,7 +55,7 @@ public class FinalRateAsyncTokenBucketBuilder
public AsyncTokenBucket build() {
return new FinalRateAsyncTokenBucket(this.capacity != null ?
this.capacity : this.rate, this.rate,
this.clock,
- this.ratePeriodNanos, this.resolutionNanos,
+ this.ratePeriodNanos, this.resolutionNanos,
this.consistentConsumedTokens, this.consistentAddedTokens,
this.initialTokens != null ? this.initialTokens : this.rate
);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ddd436b0854..413b1b79d7a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2479,6 +2479,12 @@ public class BrokerService implements Closeable {
private void handleMetadataChanges(Notification n) {
+ if (!pulsar.isRunning()) {
+ // Ignore metadata changes when broker is not running
+ log.info("Ignoring metadata change since broker is not running
(id={}, state={}) {}", pulsar.getBrokerId(),
+ pulsar.getState(), n);
+ return;
+ }
if (n.getType() == NotificationType.Modified &&
NamespaceResources.pathIsFromNamespace(n.getPath())) {
NamespaceName ns =
NamespaceResources.namespaceFromPath(n.getPath());
handlePoliciesUpdates(ns);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 8255d9b6931..90c8de5f97a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -20,11 +20,11 @@
package org.apache.pulsar.broker.service;
import com.google.common.annotations.VisibleForTesting;
-import io.netty.channel.EventLoopGroup;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
import org.apache.pulsar.common.policies.data.Policies;
@@ -32,6 +32,7 @@ import org.apache.pulsar.common.policies.data.PublishRate;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
+@Slf4j
public class PublishRateLimiterImpl implements PublishRateLimiter {
private volatile AsyncTokenBucket tokenBucketOnMessage;
private volatile AsyncTokenBucket tokenBucketOnByte;
@@ -80,7 +81,7 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
// schedule unthrottling when the throttling count is incremented to 1
// this is to avoid scheduling unthrottling multiple times for
concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
- EventLoopGroup executor =
producer.getCnx().getBrokerService().executor();
+ ScheduledExecutorService executor =
producer.getCnx().getBrokerService().executor().next();
scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
}
}
@@ -134,7 +135,11 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
// unthrottle as many producers as possible while there are token
available
while ((throttlingDuration = calculateThrottlingDurationNanos())
== 0L
&& (producer = unthrottlingQueue.poll()) != null) {
- producer.decrementThrottleCount();
+ try {
+ producer.decrementThrottleCount();
+ } catch (Exception e) {
+ log.error("Failed to unthrottle producer {}", producer, e);
+ }
throttledProducersCount.decrementAndGet();
}
// if there are still producers to be unthrottled, schedule
unthrottling again
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index b29cbcd660d..f43b134eb12 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.qos.AsyncTokenBucketBuilder;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -76,7 +77,9 @@ public class DispatchRateLimiter {
* @return
*/
public long getAvailableDispatchRateLimitOnMsg() {
- return dispatchRateLimiterOnMessage == null ? -1 :
Math.max(dispatchRateLimiterOnMessage.getTokens(), 0);
+ AsyncTokenBucket localDispatchRateLimiterOnMessage =
dispatchRateLimiterOnMessage;
+ return localDispatchRateLimiterOnMessage == null ? -1 :
+ Math.max(localDispatchRateLimiterOnMessage.getTokens(), 0);
}
/**
@@ -85,7 +88,8 @@ public class DispatchRateLimiter {
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
- return dispatchRateLimiterOnByte == null ? -1 :
Math.max(dispatchRateLimiterOnByte.getTokens(), 0);
+ AsyncTokenBucket localDispatchRateLimiterOnByte =
dispatchRateLimiterOnByte;
+ return localDispatchRateLimiterOnByte == null ? -1 :
Math.max(localDispatchRateLimiterOnByte.getTokens(), 0);
}
/**
@@ -95,11 +99,13 @@ public class DispatchRateLimiter {
* @param byteSize
*/
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
- if (numberOfMessages > 0 && dispatchRateLimiterOnMessage != null) {
- dispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
+ AsyncTokenBucket localDispatchRateLimiterOnMessage =
dispatchRateLimiterOnMessage;
+ if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null)
{
+ localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
}
- if (byteSize > 0 && dispatchRateLimiterOnByte != null) {
- dispatchRateLimiterOnByte.consumeTokens(byteSize);
+ AsyncTokenBucket localDispatchRateLimiterOnByte =
dispatchRateLimiterOnByte;
+ if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
+ localDispatchRateLimiterOnByte.consumeTokens(byteSize);
}
}
@@ -221,13 +227,14 @@ public class DispatchRateLimiter {
if (msgRate > 0) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnMessage =
- AsyncTokenBucket.builderForDynamicRate()
+
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
.rateFunction(() ->
getRelativeDispatchRateInMsg(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
.build();
} else {
this.dispatchRateLimiterOnMessage =
-
AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos)
+ configureAsyncTokenBucket(AsyncTokenBucket.builder())
+ .rate(msgRate).ratePeriodNanos(ratePeriodNanos)
.build();
}
} else {
@@ -238,13 +245,14 @@ public class DispatchRateLimiter {
if (byteRate > 0) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnByte =
- AsyncTokenBucket.builderForDynamicRate()
+
configureAsyncTokenBucket(AsyncTokenBucket.builderForDynamicRate())
.rateFunction(() ->
getRelativeDispatchRateInByte(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
.build();
} else {
this.dispatchRateLimiterOnByte =
-
AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos)
+ configureAsyncTokenBucket(AsyncTokenBucket.builder())
+
.rate(byteRate).ratePeriodNanos(ratePeriodNanos)
.build();
}
} else {
@@ -252,6 +260,11 @@ public class DispatchRateLimiter {
}
}
+ private <T extends AsyncTokenBucketBuilder<T>> T
configureAsyncTokenBucket(T builder) {
+ builder.clock(brokerService.getPulsar().getMonotonicSnapshotClock());
+ return builder;
+ }
+
private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInMsg() +
dispatchRate.getDispatchThrottlingRateInMsg()
@@ -270,7 +283,8 @@ public class DispatchRateLimiter {
* @return
*/
public long getDispatchRateOnMsg() {
- return dispatchRateLimiterOnMessage != null ?
dispatchRateLimiterOnMessage.getRate() : -1;
+ AsyncTokenBucket localDispatchRateLimiterOnMessage =
dispatchRateLimiterOnMessage;
+ return localDispatchRateLimiterOnMessage != null ?
localDispatchRateLimiterOnMessage.getRate() : -1;
}
/**
@@ -279,7 +293,8 @@ public class DispatchRateLimiter {
* @return
*/
public long getDispatchRateOnByte() {
- return dispatchRateLimiterOnByte != null ?
dispatchRateLimiterOnByte.getRate() : -1;
+ AsyncTokenBucket localDispatchRateLimiterOnByte =
dispatchRateLimiterOnByte;
+ return localDispatchRateLimiterOnByte != null ?
localDispatchRateLimiterOnByte.getRate() : -1;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index b1de10e73b7..0f98ab94142 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -70,7 +70,7 @@ public class SubscribeRateLimiter {
if (tokenBucket == null) {
return true;
}
- if (!tokenBucket.containsTokens(true)) {
+ if (!tokenBucket.containsTokens()) {
return false;
}
tokenBucket.consumeTokens(1);
@@ -117,7 +117,11 @@ public class SubscribeRateLimiter {
// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
AsyncTokenBucket tokenBucket =
-
AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
+ AsyncTokenBucket.builder()
+ .consistentAddedTokens(true)
+ .consistentConsumedTokens(true)
+
.clock(brokerService.getPulsar().getMonotonicSnapshotClock())
+
.rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket);
} else {
// subscribe-rate should be disable and close
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 8dd2fc1c3c2..42e2c00f73a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -170,20 +170,26 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected final void internalSetup() throws Exception {
init();
- lookupUrl = new URI(brokerUrl.toString());
- if (isTcpLookup) {
- lookupUrl = new URI(pulsar.getBrokerServiceUrl());
-
+ lookupUrl = resolveLookupUrl();
+ if (isTcpLookup && enableBrokerGateway) {
// setup port forwarding from the advertised port to the listen
port
- if (enableBrokerGateway) {
- InetSocketAddress gatewayAddress = new
InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
- InetSocketAddress brokerAddress = new
InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get());
- brokerGateway = new PortForwarder(gatewayAddress,
brokerAddress);
- }
+ InetSocketAddress gatewayAddress = new
InetSocketAddress(lookupUrl.getHost(), lookupUrl.getPort());
+ InetSocketAddress brokerAddress = new
InetSocketAddress("127.0.0.1", pulsar.getBrokerListenPort().get());
+ brokerGateway = new PortForwarder(gatewayAddress, brokerAddress);
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}
+ private URI resolveLookupUrl() {
+ if (isTcpLookup) {
+ return URI.create(pulsar.getBrokerServiceUrl());
+ } else {
+ return URI.create(brokerUrl != null
+ ? brokerUrl.toString()
+ : brokerUrlTls.toString());
+ }
+ }
+
protected final void internalSetup(ServiceConfiguration
serviceConfiguration) throws Exception {
this.conf = serviceConfiguration;
internalSetup();
@@ -228,11 +234,10 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected final void internalSetupForStatsTest() throws Exception {
init();
- String lookupUrl = brokerUrl.toString();
- if (isTcpLookup) {
- lookupUrl = new URI(pulsar.getBrokerServiceUrl()).toString();
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
}
- pulsarClient = newPulsarClient(lookupUrl, 1);
+ pulsarClient = newPulsarClient(resolveLookupUrl().toString(), 1);
}
protected void doInitConf() throws Exception {
@@ -360,6 +365,9 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
protected void restartBroker() throws Exception {
stopBroker();
startBroker();
+ if (pulsarClient == null) {
+ pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+ }
}
protected void stopBroker() throws Exception {
@@ -384,12 +392,16 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
brokerUrl = pulsar.getWebServiceAddress() != null ? new
URL(pulsar.getWebServiceAddress()) : null;
brokerUrlTls = pulsar.getWebServiceAddressTls() != null ? new
URL(pulsar.getWebServiceAddressTls()) : null;
- if (admin != null) {
- admin.close();
- if (MockUtil.isMock(admin)) {
- Mockito.reset(admin);
+ URI newLookupUrl = resolveLookupUrl();
+ if (lookupUrl == null || !newLookupUrl.equals(lookupUrl)) {
+ lookupUrl = newLookupUrl;
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}
}
+
+ closeAdmin();
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null
? brokerUrl.toString()
: brokerUrlTls.toString());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
index b446f9e902f..82793f2748d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/AsyncTokenBucketTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.qos;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +51,8 @@ public class AsyncTokenBucketTest {
@Test
void shouldAddTokensWithConfiguredRate() {
asyncTokenBucket =
-
AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ AsyncTokenBucket.builder().consistentConsumedTokens(true)
+
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
incrementSeconds(5);
assertEquals(asyncTokenBucket.getTokens(), 50);
incrementSeconds(1);
@@ -64,7 +66,7 @@ public class AsyncTokenBucketTest {
// Consume all and verify none available and then wait 1 period and
check replenished
asyncTokenBucket.consumeTokens(100);
- assertEquals(asyncTokenBucket.tokens(true), 0);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
incrementSeconds(1);
assertEquals(asyncTokenBucket.getTokens(), 10);
}
@@ -91,13 +93,148 @@ public class AsyncTokenBucketTest {
@Test
void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens() {
asyncTokenBucket =
-
AsyncTokenBucket.builder().capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ AsyncTokenBucket.builder().capacity(100)
+ .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+ .rate(10)
+ .initialTokens(0)
+ .clock(clockSource)
+ .build();
for (int i = 0; i < 150; i++) {
incrementMillis(1);
}
assertEquals(asyncTokenBucket.getTokens(), 1);
incrementMillis(150);
assertEquals(asyncTokenBucket.getTokens(), 3);
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 3);
+ incrementMillis(99);
+ assertEquals(asyncTokenBucket.getTokens(), 4);
+ }
+
+ @Test
+ void shouldSupportFractionsAndRetainLeftoverWhenUpdatingTokens2() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder().capacity(100)
+ .resolutionNanos(TimeUnit.MILLISECONDS.toNanos(1))
+ .rate(1)
+ .initialTokens(0)
+ .clock(clockSource)
+ .build();
+ for (int i = 0; i < 150; i++) {
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ }
+ incrementMillis(150);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ incrementMillis(699);
+ assertEquals(asyncTokenBucket.getTokens(), 0);
+ incrementMillis(1);
+ assertEquals(asyncTokenBucket.getTokens(), 1);
+ incrementMillis(1000);
+ assertEquals(asyncTokenBucket.getTokens(), 2);
+ }
+
+ @Test
+ void shouldHandleNegativeBalanceWithEventuallyConsistentTokenUpdates() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ // intentionally pick a coarse resolution
+ .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ // assert that the token balance is 0 initially
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 10000; i++) {
+ asyncTokenBucket.consumeTokens(500);
+ incrementSeconds(50);
+ }
+
+ // let 9 seconds pass
+ incrementSeconds(9);
+
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
}
+ @Test
+ void shouldNotExceedTokenBucketSizeWithNegativeTokens() {
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ // intentionally pick a coarse resolution
+ .resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+
.capacity(100).rate(10).initialTokens(0).clock(clockSource).build();
+ // assert that the token balance is 0 initially
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(0);
+
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 100; i++) {
+ asyncTokenBucket.consumeTokens(600);
+ incrementSeconds(50);
+ // let tokens accumulate back to 0 every 10 seconds
+ if ((i + 1) % 10 == 0) {
+ incrementSeconds(100);
+ }
+ }
+
+ // let 9 seconds pass
+ incrementSeconds(9);
+
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+ }
+
+ @Test
+ void
shouldAccuratelyCalculateTokensWhenTimeIsLaggingBehindInInconsistentUpdates() {
+ clockSource = requestSnapshot -> {
+ if (requestSnapshot) {
+ return manualClockSource.get();
+ } else {
+ // let the clock lag behind
+ return manualClockSource.get() - TimeUnit.SECONDS.toNanos(52);
+ }
+ };
+ incrementSeconds(1);
+ asyncTokenBucket =
+
AsyncTokenBucket.builder().resolutionNanos(TimeUnit.SECONDS.toNanos(51))
+
.capacity(100).rate(10).initialTokens(100).clock(clockSource).build();
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(100);
+
+ // consume tokens without exceeding the rate
+ for (int i = 0; i < 10000; i++) {
+ asyncTokenBucket.consumeTokens(500);
+ incrementSeconds(i == 0 ? 40 : 50);
+ }
+
+ // let 9 seconds pass
+ incrementSeconds(9);
+
+ // there should be 90 tokens available
+ assertThat(asyncTokenBucket.getTokens()).isEqualTo(90);
+ }
+
+ @Test
+ void shouldHandleEventualConsistency() {
+ AtomicLong offset = new AtomicLong(0);
+ long resolutionNanos = TimeUnit.MILLISECONDS.toNanos(1);
+ DefaultMonotonicSnapshotClock monotonicSnapshotClock =
+ new DefaultMonotonicSnapshotClock(resolutionNanos,
+ () -> offset.get() + manualClockSource.get());
+ long initialTokens = 500L;
+ asyncTokenBucket =
+ AsyncTokenBucket.builder()
+ .consistentConsumedTokens(true)
+ .resolutionNanos(resolutionNanos)
+
.capacity(100000).rate(1000).initialTokens(initialTokens).clock(monotonicSnapshotClock).build();
+ for (int i = 0; i < 100000; i++) {
+ // increment the clock by 1ms, since rate is 1000 tokens/s, this
should make 1 token available
+ incrementMillis(1);
+ // consume 1 token
+ asyncTokenBucket.consumeTokens(1);
+ }
+ assertThat(asyncTokenBucket.getTokens())
+ // since the rate is 1/ms and the test increments the clock by
1ms and consumes 1 token in each
+ // iteration, the tokens should be equal to the initial tokens
+ .isEqualTo(initialTokens);
+ }
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
new file mode 100644
index 00000000000..0820b439915
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/qos/DefaultMonotonicSnapshotClockTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.qos;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.assertj.core.data.Offset;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class DefaultMonotonicSnapshotClockTest {
+ @DataProvider
+ private static Object[] booleanValues() {
+ return new Object[]{ true, false };
+ }
+
+ @Test(dataProvider = "booleanValues")
+ void testClockHandlesTimeLeapsBackwards(boolean requestSnapshot) throws
InterruptedException {
+ long snapshotIntervalMillis = 5;
+ AtomicLong clockValue = new AtomicLong(1);
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new
DefaultMonotonicSnapshotClock(Duration.ofMillis(snapshotIntervalMillis).toNanos(),
+ clockValue::get);
+
+
+ long previousTick = -1;
+ boolean leapDirection = true;
+ for (int i = 0; i < 10000; i++) {
+ clockValue.addAndGet(TimeUnit.MILLISECONDS.toNanos(1));
+ long tick = clock.getTickNanos(requestSnapshot);
+ //log.info("i = {}, tick = {}", i, tick);
+ if ((i + 1) % 5 == 0) {
+ leapDirection = !leapDirection;
+ //log.info("Time leap 5 minutes backwards");
+ clockValue.addAndGet(-Duration.ofMinutes(5).toNanos());
+ }
+ if (previousTick != -1) {
+ assertThat(tick)
+ .describedAs("i = %d, tick = %d, previousTick = %d",
i, tick, previousTick)
+ .isGreaterThanOrEqualTo(previousTick)
+ .isCloseTo(previousTick,
+ // then snapshot is requested, the time
difference between the two ticks is accurate
+ // otherwise allow time difference at most 4
times the snapshot interval since the
+ // clock is updated periodically by a
background thread
+ Offset.offset(TimeUnit.MILLISECONDS.toNanos(
+ requestSnapshot ? 1 : 4 *
snapshotIntervalMillis)));
+ }
+ previousTick = tick;
+ }
+ }
+
+ @Test
+ void testRequestUpdate() throws InterruptedException {
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(),
System::nanoTime);
+ long tick1 = clock.getTickNanos(false);
+ long tick2 = clock.getTickNanos(true);
+ assertThat(tick2).isGreaterThan(tick1);
+ }
+
+ @Test
+ void testRequestingSnapshotAfterClosed() throws InterruptedException {
+ DefaultMonotonicSnapshotClock clock =
+ new
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(),
System::nanoTime);
+ clock.close();
+ long tick1 = clock.getTickNanos(true);
+ Thread.sleep(10);
+ long tick2 = clock.getTickNanos(true);
+ assertThat(tick2).isGreaterThan(tick1);
+ }
+
+ @Test
+ void testConstructorValidation() {
+ assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(0,
System::nanoTime))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("snapshotIntervalNanos must be at least 1
millisecond");
+ assertThatThrownBy(() -> new DefaultMonotonicSnapshotClock(-1,
System::nanoTime))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("snapshotIntervalNanos must be at least 1
millisecond");
+ assertThatThrownBy(() -> new
DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessage("clockSource must not be null");
+ }
+
+ @Test
+ void testFailureHandlingInClockSource() {
+ @Cleanup
+ DefaultMonotonicSnapshotClock clock =
+ new
DefaultMonotonicSnapshotClock(Duration.ofSeconds(5).toNanos(), () -> {
+ throw new RuntimeException("Test clock failure");
+ });
+ // the exception should be propagated
+ assertThatThrownBy(() -> clock.getTickNanos(true))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Test clock failure");
+ }
+
+ @Test
+ void testLeapDetectionIndependently() {
+ AtomicLong clockValue = new AtomicLong(0);
+ AtomicLong tickValue = new AtomicLong(0);
+ long expectedTickValue = 0;
+ long snapshotIntervalNanos = TimeUnit.MILLISECONDS.toNanos(1);
+ DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater
updater =
+ new
DefaultMonotonicSnapshotClock.MonotonicLeapDetectingTickUpdater(clockValue::get,
tickValue::set,
+ snapshotIntervalNanos);
+
+ updater.update(true);
+
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // simulate a leap backwards in time
+ clockValue.addAndGet(-10 * snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // simulate a leap backwards in time, without waiting a full snapshot
interval
+ clockValue.addAndGet(-10 * snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // advance the clock
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // simulate a small leap backwards in time which isn't detected,
without waiting a full snapshot interval
+ clockValue.addAndGet(-1 * snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // clock doesn't advance for one snapshot interval
+ clockValue.addAndGet(snapshotIntervalNanos);
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ // now the clock should advance again
+ clockValue.addAndGet(snapshotIntervalNanos);
+ expectedTickValue += snapshotIntervalNanos;
+ updater.update(false);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+
+ // simulate a leap forward
+ clockValue.addAndGet(10 * snapshotIntervalNanos);
+ // no special handling for leap forward
+ expectedTickValue += 10 * snapshotIntervalNanos;
+ updater.update(true);
+ assertThat(tickValue.get()).isEqualTo(expectedTickValue);
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
index 392ec0d3ff4..8343680f9bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMsgsTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupUsageStatsType;
@@ -58,9 +59,10 @@ import org.testng.annotations.Test;
@Slf4j
@Test(groups = "flaky")
public class RGUsageMTAggrWaitForAllMsgsTest extends ProducerConsumerBase {
- @BeforeClass
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ AsyncTokenBucket.switchToConsistentTokensView();
super.internalSetup();
this.prepareForOps();
@@ -91,6 +93,7 @@ public class RGUsageMTAggrWaitForAllMsgsTest extends
ProducerConsumerBase {
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
+ AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
}
@Test
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 2c44ba7e230..5c149d4e1e7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
@@ -73,7 +74,9 @@ public class PublishRateLimiterTest {
when(transportCnx.getBrokerService()).thenReturn(brokerService);
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
when(brokerService.executor()).thenReturn(eventLoopGroup);
- doReturn(null).when(eventLoopGroup).schedule(any(Runnable.class),
anyLong(), any());
+ EventLoop eventLoop = mock(EventLoop.class);
+ when(eventLoopGroup.next()).thenReturn(eventLoop);
+ doReturn(null).when(eventLoop).schedule(any(Runnable.class),
anyLong(), any());
incrementSeconds(1);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
new file mode 100644
index 00000000000..31c628b2bc4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AbstractMessageDispatchThrottlingTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+
+public abstract class AbstractMessageDispatchThrottlingTest extends
ProducerConsumerBase {
+ public static <T> T[] merge(T[] first, T[] last) {
+ int totalLength = first.length + last.length;
+ T[] result = Arrays.copyOf(first, totalLength);
+ int offset = first.length;
+ System.arraycopy(last, 0, result, offset, first.length);
+ return result;
+ }
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ AsyncTokenBucket.switchToConsistentTokensView();
+ this.conf.setClusterName("test");
+ internalSetup();
+ producerBaseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void reset() throws Exception {
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
+
+ for (String tenant : admin.tenants().getTenants()) {
+ for (String namespace : admin.namespaces().getNamespaces(tenant)) {
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+ admin.tenants().deleteTenant(tenant, true);
+ }
+
+ for (String cluster : admin.clusters().getClusters()) {
+ admin.clusters().deleteCluster(cluster);
+ }
+
+ pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
+ pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
+
+ producerBaseSetup();
+ }
+
+ @DataProvider(name = "subscriptions")
+ public Object[][] subscriptionsProvider() {
+ return new Object[][]{new Object[]{SubscriptionType.Shared},
{SubscriptionType.Exclusive}};
+ }
+
+ @DataProvider(name = "dispatchRateType")
+ public Object[][] dispatchRateProvider() {
+ return new Object[][]{{DispatchRateType.messageRate},
{DispatchRateType.byteRate}};
+ }
+
+ @DataProvider(name = "subscriptionAndDispatchRateType")
+ public Object[][] subDisTypeProvider() {
+ List<Object[]> mergeList = new LinkedList<>();
+ for (Object[] sub : subscriptionsProvider()) {
+ for (Object[] dispatch : dispatchRateProvider()) {
+ mergeList.add(AbstractMessageDispatchThrottlingTest.merge(sub,
dispatch));
+ }
+ }
+ return mergeList.toArray(new Object[0][0]);
+ }
+
+ protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
+ Field statsUpdaterField =
BrokerService.class.getDeclaredField("statsUpdater");
+ statsUpdaterField.setAccessible(true);
+ ScheduledExecutorService statsUpdater = (ScheduledExecutorService)
statsUpdaterField
+ .get(pulsar.getBrokerService());
+ statsUpdater.shutdownNow();
+ ledger.getCursors().forEach(cursor -> {
+ ledger.deactivateCursor(cursor);
+ });
+ }
+
+ enum DispatchRateType {
+ messageRate, byteRate;
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index a544c7e13bc..5d6f0c519ab 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
@@ -27,15 +28,11 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
@@ -43,7 +40,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
-import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -52,93 +49,17 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
-import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.assertj.core.data.Offset;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
-public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
+@Test(groups = "broker-api")
+public class MessageDispatchThrottlingTest extends
AbstractMessageDispatchThrottlingTest {
private static final Logger log =
LoggerFactory.getLogger(MessageDispatchThrottlingTest.class);
- @BeforeClass
- @Override
- protected void setup() throws Exception {
- AsyncTokenBucket.switchToConsistentTokensView();
- this.conf.setClusterName("test");
- super.internalSetup();
- super.producerBaseSetup();
- }
-
- @AfterClass(alwaysRun = true)
- @Override
- protected void cleanup() throws Exception {
- super.internalCleanup();
- AsyncTokenBucket.resetToDefaultEventualConsistentTokensView();
- }
-
- @AfterMethod(alwaysRun = true)
- protected void reset() throws Exception {
- pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
- pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
-
- for (String tenant : admin.tenants().getTenants()) {
- for (String namespace : admin.namespaces().getNamespaces(tenant)) {
- admin.namespaces().deleteNamespace(namespace, true);
- }
- admin.tenants().deleteTenant(tenant, true);
- }
-
- for (String cluster : admin.clusters().getClusters()) {
- admin.clusters().deleteCluster(cluster);
- }
-
- pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
- pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
-
- super.producerBaseSetup();
- }
-
-
- @DataProvider(name = "subscriptions")
- public Object[][] subscriptionsProvider() {
- return new Object[][] { new Object[] { SubscriptionType.Shared }, {
SubscriptionType.Exclusive } };
- }
-
- @DataProvider(name = "dispatchRateType")
- public Object[][] dispatchRateProvider() {
- return new Object[][] { { DispatchRateType.messageRate }, {
DispatchRateType.byteRate } };
- }
-
- @DataProvider(name = "subscriptionAndDispatchRateType")
- public Object[][] subDisTypeProvider() {
- List<Object[]> mergeList = new LinkedList<>();
- for (Object[] sub : subscriptionsProvider()) {
- for (Object[] dispatch : dispatchRateProvider()) {
- mergeList.add(merge(sub, dispatch));
- }
- }
- return mergeList.toArray(new Object[0][0]);
- }
-
- public static <T> T[] merge(T[] first, T[] last) {
- int totalLength = first.length + last.length;
- T[] result = Arrays.copyOf(first, totalLength);
- int offset = first.length;
- System.arraycopy(last, 0, result, offset, first.length);
- return result;
- }
-
- enum DispatchRateType {
- messageRate, byteRate;
- }
-
/**
* verifies: message-rate change gets reflected immediately into topic at
runtime
*
@@ -150,7 +71,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
@@ -220,7 +141,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
@SuppressWarnings("deprecation")
@Test
public void testSystemTopicDeliveryNonBlock() throws Exception {
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
final String topicName = "persistent://" + namespace + "/" +
UUID.randomUUID().toString().replaceAll("-", "");
admin.topics().createNonPartitionedTopic(topicName);
@@ -264,7 +185,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
DispatchRateType dispatchRateType) throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
final int messageRate = 100;
@@ -332,7 +253,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testClusterMsgByteRateLimitingClusterConfig() throws Exception
{
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
final int messageRate = 5;
final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to
be delivered
@@ -407,7 +328,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingAll";
final int messageRate = 10;
@@ -475,7 +396,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingAll";
final String subscriptionName = "my-subscriber-name";
@@ -528,8 +449,9 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testRateLimitingMultipleConsumers() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingMultipleConsumers";
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
final int messageRate = 5;
DispatchRate dispatchRate = DispatchRate.builder()
@@ -540,7 +462,8 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setDispatchRate(namespace, dispatchRate);
// create producer and topic
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ @Cleanup
+ Producer<byte[]> producer =
pulsarClient.newProducer().enableBatching(false).topic(topicName).create();
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
Awaitility.await()
@@ -566,10 +489,15 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
throw new RuntimeException(e);
}
});
+ @Cleanup
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer5 = consumerBuilder.subscribe();
// deactive cursors
@@ -585,15 +513,10 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
Thread.sleep(1000);
// rate limiter should have limited messages with at least 10%
accuracy (or 2 messages if messageRate is low)
- Assert.assertEquals(totalReceived.get(), messageRate,
Math.max(messageRate / 10, 2));
+ assertThat(totalReceived.get()).isCloseTo(messageRate,
Offset.offset(Math.max(messageRate / 10, 2)));
- consumer1.close();
- consumer2.close();
- consumer3.close();
- consumer4.close();
- consumer5.close();
- producer.close();
log.info("-- Exiting {} test --", methodName);
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false);
}
@Test
@@ -602,7 +525,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
conf.setDispatchThrottlingOnBatchMessageEnabled(true);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingMultipleConsumers";
final int messageRate = 5;
@@ -614,6 +537,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
final int messagesPerBatch = 100;
final int numProducedMessages = messageRate * messagesPerBatch;
// create producer and topic
+ @Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(true)
.batchingMaxPublishDelay(1,
TimeUnit.SECONDS).batchingMaxMessages(messagesPerBatch).create();
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
@@ -634,10 +558,15 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
log.debug("Received message [{}] in the listener",
receivedMessage);
totalReceived.incrementAndGet();
});
+ @Cleanup
Consumer<byte[]> consumer1 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer2 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer3 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer4 = consumerBuilder.subscribe();
+ @Cleanup
Consumer<byte[]> consumer5 = consumerBuilder.subscribe();
// deactive cursors
@@ -657,12 +586,6 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
// consumer should not have received all published message due to
message-rate throttling
Assert.assertEquals(totalReceived.get(), numProducedMessages);
- consumer1.close();
- consumer2.close();
- consumer3.close();
- consumer4.close();
- consumer5.close();
- producer.close();
log.info("-- Exiting {} test --", methodName);
}
@@ -670,7 +593,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testClusterRateLimitingConfiguration(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
final int messageRate = 5;
@@ -688,12 +611,14 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
// create producer and topic
+ @Cleanup
Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic)
pulsar.getBrokerService().getOrCreateTopic(topicName).get();
int numMessages = 500;
final AtomicInteger totalReceived = new AtomicInteger(0);
+ @Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
@@ -716,8 +641,6 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
// consumer should not have received all published message due to
message-rate throttling
Assert.assertNotEquals(totalReceived.get(), numMessages);
- consumer.close();
- producer.close();
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg",
Integer.toString(initValue));
log.info("-- Exiting {} test --", methodName);
@@ -733,7 +656,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testMessageByteRateThrottlingCombined(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingAll";
final int messageRate = 5; // 5 msgs per second
@@ -803,7 +726,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testGlobalNamespaceThrottling() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
final int messageRate = 5;
@@ -869,7 +792,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/throttlingBlock";
final int messageRate = 10;
@@ -948,7 +871,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testClusterPolicyOverrideConfiguration() throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName1 = "persistent://" + namespace +
"/throttlingOverride1";
final String topicName2 = "persistent://" + namespace +
"/throttlingOverride2";
final int clusterMessageRate = 100;
@@ -1018,7 +941,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testClosingRateLimiter(SubscriptionType subscription) throws
Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = "persistent://" + namespace +
"/closingRateLimiter" + subscription.name();
final String subName = "mySubscription" + subscription.name();
@@ -1066,7 +989,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
@SuppressWarnings("deprecation")
@Test
public void testDispatchRateCompatibility2() throws Exception {
- final String namespace = "my-property/dispatch-rate-compatibility";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/dispatch-rate-compatibility");
final String topicName = "persistent://" + namespace + "/t1";
final String cluster = "test";
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(cluster));
@@ -1112,17 +1035,6 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
topic.close().get();
}
- protected void deactiveCursors(ManagedLedgerImpl ledger) throws Exception {
- Field statsUpdaterField =
BrokerService.class.getDeclaredField("statsUpdater");
- statsUpdaterField.setAccessible(true);
- ScheduledExecutorService statsUpdater = (ScheduledExecutorService)
statsUpdaterField
- .get(pulsar.getBrokerService());
- statsUpdater.shutdownNow();
- ledger.getCursors().forEach(cursor -> {
- ledger.deactivateCursor(cursor);
- });
- }
-
/**
* It verifies that relative throttling at least dispatch messages as
publish-rate.
*
@@ -1133,7 +1045,7 @@ public class MessageDispatchThrottlingTest extends
ProducerConsumerBase {
public void testRelativeMessageRateLimitingThrottling(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
- final String namespace = "my-property/relative_throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/relative_throttling_ns");
final String topicName = "persistent://" + namespace +
"/relative-throttle" + subscription;
final int messageRate = 1;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index ce554ab2d9c..db40ec644e9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.client.api;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import com.google.common.collect.Sets;
+import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -30,6 +32,7 @@ import
org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
@@ -37,8 +40,8 @@ import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
-@Test(groups = "flaky")
-public class SubscriptionMessageDispatchThrottlingTest extends
MessageDispatchThrottlingTest {
+@Test(groups = "broker-api")
+public class SubscriptionMessageDispatchThrottlingTest extends
AbstractMessageDispatchThrottlingTest {
private static final Logger log =
LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class);
/**
@@ -241,7 +244,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
admin.namespaces().setSubscriptionDispatchRate(namespace,
subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
long initBytes =
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" +
brokerRate);
+ updateBrokerDispatchThrottlingRateInBytes(brokerRate);
final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -272,10 +275,11 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
- Assert.assertTrue(brokerDispatchRateLimiter != null
- && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ assertThat(brokerDispatchRateLimiter)
+ .isNotNull()
+ .satisfies(l ->
assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate));
DispatchRateLimiter topicDispatchRateLimiter =
topic.getDispatchRateLimiter().orElse(null);
Assert.assertTrue(topicDispatchRateLimiter != null
&& topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -301,10 +305,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
consumer.close();
producer.close();
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte",
Long.toString(initBytes));
-
- admin.topics().delete(topicName, true);
- admin.namespaces().deleteNamespace(namespace);
+ updateBrokerDispatchThrottlingRateInBytes(initBytes);
}
/**
@@ -401,7 +402,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
private void testDispatchRate(SubscriptionType subscription,
int brokerRate, int topicRate, int subRate,
int expectRate) throws Exception {
- final String namespace = "my-property/throttling_ns";
+ final String namespace =
BrokerTestUtil.newUniqueName("my-property/throttling_ns");
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;
@@ -419,7 +420,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
admin.namespaces().setSubscriptionDispatchRate(namespace,
subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
long initBytes =
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" +
brokerRate);
+ updateBrokerDispatchThrottlingRateInBytes(brokerRate);
final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
@@ -450,10 +451,11 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
- Assert.assertTrue(brokerDispatchRateLimiter != null
- && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ assertThat(brokerDispatchRateLimiter)
+ .isNotNull()
+ .satisfies(l ->
assertThat(l.getDispatchRateOnByte()).isEqualTo(brokerRate));
DispatchRateLimiter topicDispatchRateLimiter =
topic.getDispatchRateLimiter().orElse(null);
Assert.assertTrue(topicDispatchRateLimiter != null
&& topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -482,9 +484,18 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
consumer.close();
producer.close();
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte",
Long.toString(initBytes));
- admin.topics().delete(topicName, true);
- admin.namespaces().deleteNamespace(namespace);
+ updateBrokerDispatchThrottlingRateInBytes(initBytes);
+ }
+
+ private void updateBrokerDispatchThrottlingRateInBytes(long bytes) throws
PulsarAdminException {
+
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte",
Long.toString(bytes));
+ long expectedBytes = bytes > 0L ? bytes : -1L;
+ await().untilAsserted(() -> {
+ DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ assertThat(brokerDispatchRateLimiter)
+ .isNotNull()
+ .satisfies(l ->
assertThat(l.getDispatchRateOnByte()).isEqualTo(expectedBytes));
+ });
}
/**
@@ -537,7 +548,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
long initBytes =
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
final int byteRate = 1000;
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" +
byteRate);
+ updateBrokerDispatchThrottlingRateInBytes(byteRate);
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(pulsar.getConfiguration().getDispatchThrottlingRateInByte(),
byteRate);
@@ -576,12 +587,6 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName1).create();
Producer<byte[]> producer2 =
pulsarClient.newProducer().topic(topicName2).create();
- Awaitility.await().untilAsserted(() -> {
- DispatchRateLimiter rateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
- Assert.assertTrue(rateLimiter != null
- && rateLimiter.getDispatchRateOnByte() > 0);
- });
-
long start = System.currentTimeMillis();
// Asynchronously produce messages
for (int i = 0; i < numProducedMessagesEachTopic; i++) {
@@ -600,7 +605,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
consumer2.close();
producer1.close();
producer2.close();
-
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte",
Long.toString(initBytes));
+ updateBrokerDispatchThrottlingRateInBytes(initBytes);
log.info("-- Exiting {} test --", methodName);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
index 1c0ae5547d5..a848d68f37f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java
@@ -41,7 +41,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-@Test
+@Test(groups = "broker-api")
public class MessagePublishThrottlingTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(MessagePublishThrottlingTest.class);