This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new acad78c10f0 [improve][broker] Part-2 of PIP-434: Use
ServerCnxThrottleTracker, instead of modifying channel.readable directly
(#24799)
acad78c10f0 is described below
commit acad78c10f0c0c4756d2dd73bf729bb2f84f53e7
Author: fengyubiao <[email protected]>
AuthorDate: Thu Oct 16 00:01:00 2025 +0800
[improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead
of modifying channel.readable directly (#24799)
---
.../resourcegroup/ResourceGroupPublishLimiter.java | 9 +-
.../pulsar/broker/service/AbstractTopic.java | 8 +-
.../pulsar/broker/service/BrokerService.java | 8 +-
.../org/apache/pulsar/broker/service/Producer.java | 24 -
.../broker/service/PublishRateLimiterImpl.java | 13 +-
.../apache/pulsar/broker/service/ServerCnx.java | 36 +-
.../broker/service/ServerCnxThrottleTracker.java | 481 +++++++++++++++++----
.../apache/pulsar/broker/service/TransportCnx.java | 17 +-
.../service/PublishRateLimiterDisableTest.java | 23 +-
.../broker/service/PublishRateLimiterTest.java | 111 +++--
.../service/TopicPublishRateThrottleTest.java | 31 ++
11 files changed, 565 insertions(+), 196 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
index fc4514db81f..04f56e0ca69 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.resourcegroup;
import org.apache.pulsar.broker.qos.MonotonicClock;
import
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
+import org.apache.pulsar.broker.service.ServerCnxThrottleTracker;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.ResourceGroup;
@@ -30,7 +31,13 @@ public class ResourceGroupPublishLimiter extends
PublishRateLimiterImpl {
private volatile long publishMaxByteRate;
public ResourceGroupPublishLimiter(ResourceGroup resourceGroup,
MonotonicClock monotonicClock) {
- super(monotonicClock);
+ super(monotonicClock, producer -> {
+ producer.getCnx().getThrottleTracker().markThrottled(
+
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
+ }, producer -> {
+ producer.getCnx().getThrottleTracker().unmarkThrottled(
+
ServerCnxThrottleTracker.ThrottleType.ResourceGroupPublishRate);
+ });
update(resourceGroup);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 3ec6f5a0cd5..24bce1e39bb 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static
org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
+import static
org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
@@ -193,7 +194,12 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener {
updateTopicPolicyByBrokerConfig();
this.lastActive = System.nanoTime();
- topicPublishRateLimiter = new
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock());
+ topicPublishRateLimiter = new
PublishRateLimiterImpl(brokerService.getPulsar().getMonotonicClock(),
+ producer -> {
+
producer.getCnx().getThrottleTracker().markThrottled(ThrottleType.TopicPublishRate);
+ }, producer -> {
+
producer.getCnx().getThrottleTracker().unmarkThrottled(ThrottleType.TopicPublishRate);
+ });
updateActiveRateLimiters();
additionalSystemCursorNames =
brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 79dffdf7aad..378fda44c2e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -339,7 +339,13 @@ public class BrokerService implements Closeable {
this.pulsar = pulsar;
this.clock = pulsar.getClock();
this.dynamicConfigurationMap = prepareDynamicConfigurationMap();
- this.brokerPublishRateLimiter = new
PublishRateLimiterImpl(pulsar.getMonotonicClock());
+ this.brokerPublishRateLimiter = new
PublishRateLimiterImpl(pulsar.getMonotonicClock(), producer -> {
+ producer.getCnx().getThrottleTracker().markThrottled(
+ ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+ }, producer -> {
+ producer.getCnx().getThrottleTracker().unmarkThrottled(
+ ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+ });
this.dispatchRateLimiterFactory =
createDispatchRateLimiterFactory(pulsar.getConfig());
this.managedLedgerStorage = pulsar.getManagedLedgerStorage();
this.keepAliveIntervalSeconds =
pulsar.getConfiguration().getKeepAliveIntervalSeconds();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 0784f74591e..9d0c1080254 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -899,30 +899,6 @@ public class Producer {
private static final Logger log = LoggerFactory.getLogger(Producer.class);
- /**
- * This method increments a counter that is used to control the throttling
of a connection.
- * The connection's read operations are paused when the counter's value is
greater than 0, indicating that
- * throttling is in effect.
- * It's important to note that after calling this method, it is the
caller's responsibility to ensure that the
- * counter is decremented by calling the {@link #decrementThrottleCount()}
method when throttling is no longer
- * needed on the connection.
- */
- public void incrementThrottleCount() {
- cnx.incrementThrottleCount();
- }
-
- /**
- * This method decrements a counter that is used to control the throttling
of a connection.
- * The connection's read operations are resumed when the counter's value
is 0, indicating that
- * throttling is no longer in effect.
- * It's important to note that before calling this method, the caller
should have previously
- * incremented the counter by calling the {@link
#incrementThrottleCount()} method when throttling
- * was needed on the connection.
- */
- public void decrementThrottleCount() {
- cnx.decrementThrottleCount();
- }
-
public Attributes getOpenTelemetryAttributes() {
if (attributes != null) {
return attributes;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
index 0015f2675a2..096418191dc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.MonotonicClock;
@@ -42,9 +43,14 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
private final AtomicBoolean processingQueuedProducers = new
AtomicBoolean(false);
+ private final Consumer<Producer> throttleAction;
+ private final Consumer<Producer> unthrottleAction;
- public PublishRateLimiterImpl(MonotonicClock monotonicClock) {
+ public PublishRateLimiterImpl(MonotonicClock monotonicClock,
Consumer<Producer> throttleAction,
+ Consumer<Producer> unthrottleAction) {
this.monotonicClock = monotonicClock;
+ this.throttleAction = throttleAction;
+ this.unthrottleAction = unthrottleAction;
}
/**
@@ -68,7 +74,7 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
}
if (shouldThrottle) {
// throttle the producer by incrementing the throttle count
- producer.incrementThrottleCount();
+ throttleAction.accept(producer);
// schedule decrementing the throttle count to possibly unthrottle
the producer after the
// throttling period
scheduleDecrementThrottleCount(producer);
@@ -136,7 +142,8 @@ public class PublishRateLimiterImpl implements
PublishRateLimiter {
while ((throttlingDuration = calculateThrottlingDurationNanos())
== 0L
&& (producer = unthrottlingQueue.poll()) != null) {
try {
- producer.decrementThrottleCount();
+ final Producer producerFinal = producer;
+ producer.getCnx().execute(() ->
unthrottleAction.accept(producerFinal));
} catch (Exception e) {
log.error("Failed to unthrottle producer {}", producer, e);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fbfb8108846..d7010e3cf8c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,7 @@ import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync;
import static org.apache.pulsar.broker.lookup.TopicLookupBase.lookupTopicAsync;
+import static
org.apache.pulsar.broker.service.ServerCnxThrottleTracker.ThrottleType;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.getMigratedClusterUrl;
import static
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.ignoreUnrecoverableBKException;
import static org.apache.pulsar.common.api.proto.ProtocolVersion.v5;
@@ -283,7 +284,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
if (maxPendingBytesPerThread > 0 && pendingBytes >
maxPendingBytesPerThread
&& !limitExceeded) {
limitExceeded = true;
- cnxsPerThread.get().forEach(cnx ->
cnx.throttleTracker.setPublishBufferLimiting(true));
+ cnxsPerThread.get().forEach(cnx ->
cnx.throttleTracker.markThrottled(
+ ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
}
}
@@ -293,7 +295,8 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// we resume all connections sharing the same thread
if (limitExceeded && pendingBytes <=
resumeThresholdPendingBytesPerThread) {
limitExceeded = false;
- cnxsPerThread.get().forEach(cnx ->
cnx.throttleTracker.setPublishBufferLimiting(false));
+ cnxsPerThread.get().forEach(cnx ->
cnx.throttleTracker.unmarkThrottled(
+ ThrottleType.IOThreadMaxPendingPublishBytesExceeded));
}
}
}
@@ -311,6 +314,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
Start, Connected, Failed, Connecting
}
+ @Getter
private final ServerCnxThrottleTracker throttleTracker;
public ServerCnx(PulsarService pulsar) {
@@ -481,12 +485,12 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
log.warn("[{}] Reached rate limitation", this);
// Stop receiving requests.
pausedDueToRateLimitation = true;
- ctx.channel().config().setAutoRead(false);
+
getThrottleTracker().markThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
// Resume after 1 second.
ctx.channel().eventLoop().schedule(() -> {
if (pausedDueToRateLimitation) {
log.info("[{}] Resuming connection after rate limitation",
this);
- ctx.channel().config().setAutoRead(true);
+
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionPauseReceivingCooldownRateLimit);
pausedDueToRateLimitation = false;
}
}, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
@@ -497,7 +501,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
log.info("[{}] is writable, turn on channel auto-read", this);
- ctx.channel().config().setAutoRead(true);
+
getThrottleTracker().unmarkThrottled(ThrottleType.ConnectionOutboundBufferFull);
requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds,
TimeUnit.MILLISECONDS);
} else if (pauseReceivingRequestsIfUnwritable &&
!ctx.channel().isWritable()) {
final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
@@ -511,7 +515,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off
channel auto-read", this);
}
}
- ctx.channel().config().setAutoRead(false);
+
getThrottleTracker().markThrottled(ThrottleType.ConnectionOutboundBufferFull);
}
ctx.fireChannelWritabilityChanged();
}
@@ -3399,7 +3403,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
// or the pending publish bytes
private void increasePendingSendRequestsAndPublishBytes(int msgSize) {
if (++pendingSendRequest == maxPendingSendRequests) {
- throttleTracker.setPendingSendRequestsExceeded(true);
+
throttleTracker.markThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
}
PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize,
maxPendingBytesPerThread);
}
@@ -3424,7 +3428,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize,
resumeThresholdPendingBytesPerThread);
if (--pendingSendRequest == resumeReadsThreshold) {
- throttleTracker.setPendingSendRequestsExceeded(false);
+
throttleTracker.unmarkThrottled(ThrottleType.ConnectionMaxPendingPublishRequestsExceeded);
}
if (isNonPersistentTopic) {
@@ -3803,22 +3807,6 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
this.authRole = authRole;
}
- /**
- * {@inheritDoc}
- */
- @Override
- public void incrementThrottleCount() {
- throttleTracker.incrementThrottleCount();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void decrementThrottleCount() {
- throttleTracker.decrementThrottleCount();
- }
-
@VisibleForTesting
void setAuthState(AuthenticationState authState) {
this.authState = authState;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
index 78bac024218..037612d2156 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
@@ -18,125 +18,442 @@
*/
package org.apache.pulsar.broker.service;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
/**
- * Tracks the state of throttling for a connection. The throttling happens by
pausing reads by setting
- * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false
for the channel (connection).
- * <p>
- * There can be multiple rate limiters that can throttle a connection. Each
rate limiter will independently
- * call the {@link #incrementThrottleCount()} and {@link
#decrementThrottleCount()} methods to signal that the
- * connection should be throttled or not. The connection will be throttled if
the counter is greater than 0.
- * <p>
- * Besides the rate limiters, the connection can also be throttled if the
number of pending publish requests exceeds
- * a configured threshold. This throttling is toggled with the {@link
#setPendingSendRequestsExceeded} method.
- * There's also per-thread memory limits which could throttle the connection.
This throttling is toggled with the
- * {@link #setPublishBufferLimiting} method. Internally, these two methods
will call the
- * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()}
methods when the state changes.
+ * Manages and tracks throttling state for server connections in Apache Pulsar.
+ *
+ * <p>This class provides a centralized mechanism to control connection
throttling by managing
+ * multiple throttling conditions simultaneously. When throttling is active,
it pauses incoming
+ * requests by setting Netty's {@link
io.netty.channel.ChannelConfig#setAutoRead(boolean)} to
+ * {@code false} for the associated channel.
+ *
+ * <h3>Throttling Mechanism</h3>
+ * <p>The tracker maintains independent counters for different types of
throttling conditions
+ * defined in {@link ThrottleType}. A connection is considered throttled if
any of these
+ * conditions are active (counter > 0). The connection will only resume normal
operation
+ * when all throttling conditions are cleared.
+ *
+ * <h3>Supported Throttling Types</h3>
+ * <ul>
+ * <li><b>Connection-level:</b> Max pending publish requests, outbound
buffer limits</li>
+ * <li><b>Thread-level:</b> IO thread memory limits for in-flight
publishing</li>
+ * <li><b>Topic-level:</b> Topic publish rate limiting</li>
+ * <li><b>Resource Group-level:</b> Resource group publish rate limiting</li>
+ * <li><b>Broker-level:</b> Global broker publish rate limiting</li>
+ * <li><b>Flow Control:</b> Channel writability and cooldown rate
limiting</li>
+ * </ul>
+ *
+ * <h3>Reentrant vs Non-Reentrant Types</h3>
+ * <p>Some throttling types support multiple concurrent activations
(reentrant):
+ * <ul>
+ * <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple
producers may share
+ * the same rate limiter which relates to the same topic</li>
+ * <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because
multiple producers may share
+ * the same rate limiter which relates to the same resource group</li>
+ * </ul>
+ * <p>Other types are non-reentrant and can only be activated once at a time.
The reentrant types
+ * use counters to track how many producers are affected by the same shared
rate limiter, while
+ * non-reentrant types use simple boolean states.
+ *
+ * <h3>Thread Safety</h3>
+ * <p>This class is designed to be used from a single thread (the connection's
IO thread)
+ * and is not thread-safe for concurrent access from multiple threads.
+ *
+ * <h3>Usage Example</h3>
+ * <pre>{@code
+ * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx);
+ *
+ * // Mark connection as throttled due to rate limiting
+ * tracker.markThrottled(ThrottleType.TopicPublishRate);
+ *
+ * // Later, when rate limiting condition is cleared
+ * tracker.unmarkThrottled(ThrottleType.TopicPublishRate);
+ * }</pre>
+ *
+ * @see ThrottleType
+ * @see ThrottleRes
+ * @see ServerCnx
*/
@Slf4j
-final class ServerCnxThrottleTracker {
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
THROTTLE_COUNT_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "throttleCount");
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
- PENDING_SEND_REQUESTS_EXCEEDED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class,
"pendingSendRequestsExceeded");
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
PUBLISH_BUFFER_LIMITING_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "publishBufferLimiting");
+public final class ServerCnxThrottleTracker {
+
private final ServerCnx serverCnx;
- private volatile int throttleCount;
- private volatile int pendingSendRequestsExceeded;
- private volatile int publishBufferLimiting;
+ private final int[] states = new int[ThrottleType.values().length];
+ /**
+ * Enumeration of different throttling conditions that can be applied to a
server connection.
+ *
+ * <p>Each type represents a specific resource constraint or rate limiting
condition
+ * that may require throttling the connection to maintain system stability
and fairness.
+ *
+ * <p>Some types support reentrant behavior (can be activated multiple
times concurrently),
+ * while others are non-reentrant (single activation only).
+ */
+ public enum ThrottleType {
- public ServerCnxThrottleTracker(ServerCnx serverCnx) {
- this.serverCnx = serverCnx;
+ /**
+ * Throttling due to excessive pending publish requests on the
connection.
+ *
+ * <p>This throttling is activated when the number of in-flight
publish requests
+ * exceeds the configured limit. It helps prevent memory exhaustion
and ensures
+ * fair resource allocation across connections.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()}
+ */
+ ConnectionMaxPendingPublishRequestsExceeded(false),
+
+ /**
+ * Throttling due to excessive memory usage by in-flight publish
operations on the IO thread.
+ *
+ * <p>This throttling is activated when the total memory used by
pending publish operations
+ * on a shared IO thread exceeds the configured limit. Multiple
connections may share the
+ * same IO thread, so this limit applies across all connections on
that thread.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getMaxMessagePublishBufferSizeInMB()}
+ */
+ IOThreadMaxPendingPublishBytesExceeded(false),
+
+ /**
+ * Throttling due to topic-level publish rate limiting.
+ *
+ * <p>This throttling is activated when publish operations exceed the
configured
+ * rate limits for a specific topic. Multiple producers on the same
topic may
+ * contribute to triggering this throttling condition.
+ *
+ * <p><b>Type:</b> Reentrant (supports multiple concurrent activations)
+ * <br><b>Reason for reentrancy:</b> Multiple producers may share the
same rate limiter
+ * which relates to the same topic. Each producer can independently
trigger throttling
+ * when the shared topic rate limiter becomes active, requiring a
counter to track
+ * how many producers are affected by the same rate limiter.
+ *
+ * <p><b>Configuration:</b> Topic-level publish rate policies
+ */
+ TopicPublishRate(true),
+
+ /**
+ * Throttling due to resource group-level publish rate limiting.
+ *
+ * <p>This throttling is activated when publish operations exceed the
configured
+ * rate limits for a resource group. Resource groups allow
fine-grained control
+ * over resource allocation across multiple topics and tenants.
+ *
+ * <p><b>Type:</b> Reentrant (supports multiple concurrent activations)
+ * <br><b>Reason for reentrancy:</b> Multiple producers may share the
same rate limiter
+ * which relates to the same resource group. Each producer can
independently trigger
+ * throttling when the shared resource group rate limiter becomes
active, requiring
+ * a counter to track how many producers are affected by the same rate
limiter.
+ *
+ * <p><b>Configuration:</b> Resource group publish rate policies
+ */
+ ResourceGroupPublishRate(true),
+
+ /**
+ * Throttling due to broker-level publish rate limiting.
+ *
+ * <p>This throttling is activated when publish operations exceed the
global
+ * broker-level rate limits. This provides a safety mechanism to
prevent
+ * the entire broker from being overwhelmed by publish traffic.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getBrokerPublisherThrottlingMaxMessageRate()}
+ * and {@link
ServiceConfiguration#getBrokerPublisherThrottlingMaxByteRate()}
+ */
+ BrokerPublishRate(false),
+ /**
+ * Throttling due to channel outbound buffer being full.
+ *
+ * <p>This throttling is activated when the Netty channel's outbound
buffer
+ * reaches its high water mark, indicating that the client cannot keep
up
+ * with the rate of outgoing messages. This prevents memory exhaustion
+ * and provides backpressure to publishers.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Reference:</b> PIP-434: Expose Netty channel configuration
WRITE_BUFFER_WATER_MARK
+ */
+ ConnectionOutboundBufferFull(false),
+
+ /**
+ * Throttling due to connection pause/resume cooldown rate limiting.
+ *
+ * <p>This throttling is activated during cooldown periods after a
connection
+ * has been resumed from a throttled state. It prevents rapid
oscillation
+ * between throttled and unthrottled states.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ */
+ ConnectionPauseReceivingCooldownRateLimit(false);
+
+ @Getter
+ final boolean reentrant;
+
+ ThrottleType(boolean reentrant) {
+ this.reentrant = reentrant;
+ }
}
/**
- * See {@link Producer#incrementThrottleCount()} for documentation.
+ * Enumeration representing the result of a throttling state change
operation.
+ *
+ * <p>This enum indicates what happened when a throttling condition was
marked or unmarked,
+ * helping callers understand whether the overall connection state changed
or if the
+ * operation was ignored.
*/
- public void incrementThrottleCount() {
- int currentThrottleCount =
THROTTLE_COUNT_UPDATER.incrementAndGet(this);
- if (currentThrottleCount == 1) {
- changeAutoRead(false);
+ enum ThrottleRes {
+ /**
+ * The operation resulted in a change to the overall connection
throttling state.
+ *
+ * <p>This occurs when:
+ * <ul>
+ * <li>The connection transitions from unthrottled to throttled
(first throttle type activated)</li>
+ * <li>The connection transitions from throttled to unthrottled
(last throttle type deactivated)</li>
+ * </ul>
+ *
+ * <p>When this result is returned, the connection's auto-read setting
will be updated
+ * accordingly to pause or resume request processing.
+ */
+ ConnectionStateChanged,
+
+ /**
+ * The operation changed the state of the specific throttle type but
did not affect
+ * the overall connection throttling state.
+ *
+ * <p>This occurs when:
+ * <ul>
+ * <li>A throttle type is activated, but the connection was already
throttled by other types</li>
+ * <li>A throttle type is deactivated, but the connection remains
throttled by other types</li>
+ * <li>A reentrant throttle type's counter is incremented or
decremented</li>
+ * </ul>
+ */
+ TypeStateChanged,
+
+ /**
+ * The operation was dropped because it would violate the throttle
type's constraints.
+ *
+ * <p>This occurs when:
+ * <ul>
+ * <li>Attempting to mark a non-reentrant throttle type that is
already active</li>
+ * <li>Attempting to unmark a throttle type that is not currently
active</li>
+ * <li>Attempting to unmark a reentrant throttle type with an
invalid counter state</li>
+ * </ul>
+ */
+ Dropped
+ }
+
+ /**
+ * Checks if the connection is currently throttled by any throttle type.
+ *
+ * <p>This method examines all throttle type states and returns {@code
true}
+ * if any of them are active (counter > 0).
+ *
+ * @return {@code true} if any throttling condition is active, {@code
false} otherwise
+ */
+ private boolean hasThrottled() {
+ for (int stat : states) {
+ if (stat > 0) {
+ return true;
+ }
}
+ return false;
}
/**
- * See {@link Producer#decrementThrottleCount()} for documentation.
+ * Returns the total count of active throttling conditions across all
types.
+ *
+ * <p>This method sums up all the individual counters for each throttle
type,
+ * providing a measure of the overall throttling pressure on the
connection.
+ * For reentrant types, this includes the full counter value (not just 0
or 1).
+ *
+ * @return the total number of active throttling conditions
*/
- public void decrementThrottleCount() {
- int currentThrottleCount =
THROTTLE_COUNT_UPDATER.decrementAndGet(this);
- if (currentThrottleCount == 0) {
- changeAutoRead(true);
+ @VisibleForTesting
+ public int throttledCount() {
+ int i = 0;
+ for (int stat : states) {
+ i += stat;
}
+ return i;
}
- private void changeAutoRead(boolean autoRead) {
- if (isChannelActive()) {
+ /**
+ * Marks the connection as throttled for the specified throttle type.
+ *
+ * <p>This method activates throttling for the given type and may pause
the connection's
+ * request processing if this is the first active throttling condition.
For reentrant
+ * types ({@link ThrottleType#TopicPublishRate} and {@link
ThrottleType#ResourceGroupPublishRate}),
+ * this increments the counter. For non-reentrant types, this sets the
state to active.
+ *
+ * <p>If the connection transitions from unthrottled to throttled, this
method will
+ * set the Netty channel's auto-read to {@code false}, effectively pausing
incoming
+ * request processing.
+ *
+ * <p>Metrics are automatically recorded to track throttling events and
connection state changes.
+ *
+ * @param type the type of throttling condition to activate
+ * @throws IllegalArgumentException if type is null
+ *
+ * @see #unmarkThrottled(ThrottleType)
+ * @see ThrottleType
+ */
+ public void markThrottled(ThrottleType type) {
+ assert serverCnx.ctx().executor().inEventLoop() : "This method should
be called in serverCnx.ctx().executor()";
+ ThrottleRes res = doMarkThrottled(type);
+ recordMetricsAfterThrottling(type, res);
+ if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
if (log.isDebugEnabled()) {
- log.debug("[{}] Setting auto read to {}",
serverCnx.toString(), autoRead);
+ log.debug("[{}] Setting auto read to false",
serverCnx.toString());
}
- // change the auto read flag on the channel
- serverCnx.ctx().channel().config().setAutoRead(autoRead);
- }
- // update the metrics that track throttling
- if (autoRead) {
- serverCnx.getBrokerService().recordConnectionResumed();
- } else if (isChannelActive()) {
- serverCnx.increasePublishLimitedTimesForTopics();
- serverCnx.getBrokerService().recordConnectionPaused();
+ serverCnx.ctx().channel().config().setAutoRead(false);
}
}
- private boolean isChannelActive() {
- return serverCnx.isActive() && serverCnx.ctx() != null &&
serverCnx.ctx().channel().isActive();
+ /**
+ * Unmarks the connection as throttled for the specified throttle type.
+ *
+ * <p>This method deactivates throttling for the given type and may resume
the connection's
+ * request processing if this was the last active throttling condition.
For reentrant
+ * types ({@link ThrottleType#TopicPublishRate} and {@link
ThrottleType#ResourceGroupPublishRate}),
+ * this decrements the counter. For non-reentrant types, this clears the
active state.
+ *
+ * <p>If the connection transitions from throttled to unthrottled, this
method will
+ * set the Netty channel's auto-read to {@code true}, effectively resuming
incoming
+ * request processing.
+ *
+ * <p>Metrics are automatically recorded to track unthrottling events and
connection state changes.
+ *
+ * @param type the type of throttling condition to deactivate
+ * @throws IllegalArgumentException if type is null
+ *
+ * @see #markThrottled(ThrottleType)
+ * @see ThrottleType
+ */
+ public void unmarkThrottled(ThrottleType type) {
+ assert serverCnx.ctx().executor().inEventLoop() : "This method should
be called in serverCnx.ctx().executor()";
+ ThrottleRes res = doUnmarkThrottled(type);
+ recordMetricsAfterUnthrottling(type, res);
+ if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Setting auto read to true",
serverCnx.toString());
+ }
+ serverCnx.ctx().channel().config().setAutoRead(true);
+ }
}
- public void setPublishBufferLimiting(boolean throttlingEnabled) {
- changeThrottlingFlag(PUBLISH_BUFFER_LIMITING_UPDATER,
throttlingEnabled);
+ /**
+ * Internal method to mark a throttle type as active without side effects.
+ *
+ * <p>This method updates the internal state for the specified throttle
type
+ * and returns the result of the operation. It handles both reentrant and
+ * non-reentrant throttle types appropriately:
+ *
+ * <ul>
+ * <li><b>Reentrant types:</b> Increment the counter</li>
+ * <li><b>Non-reentrant types:</b> Set to active (1) if not already
active</li>
+ * </ul>
+ *
+ * @param throttleType the type of throttling to mark as active
+ * @return the result of the marking operation
+ * @see ThrottleRes
+ */
+ private ThrottleRes doMarkThrottled(ThrottleType throttleType) {
+ // Two reentrant type: "TopicPublishRate" and
"ResourceGroupPublishRate".
+ boolean throttled = hasThrottled();
+ int value = states[throttleType.ordinal()];
+ if (throttleType.isReentrant()) {
+ states[throttleType.ordinal()] = value + 1;
+ } else {
+ states[throttleType.ordinal()] = 1;
+ if (value != 0) {
+ return ThrottleRes.Dropped;
+ }
+ }
+ return throttled ? ThrottleRes.TypeStateChanged :
ThrottleRes.ConnectionStateChanged;
}
- public void setPendingSendRequestsExceeded(boolean throttlingEnabled) {
- boolean changed =
changeThrottlingFlag(PENDING_SEND_REQUESTS_EXCEEDED_UPDATER, throttlingEnabled);
- if (changed) {
- // update the metrics that track throttling due to pending send
requests
- if (throttlingEnabled) {
- serverCnx.getBrokerService().recordConnectionThrottled();
- } else {
- serverCnx.getBrokerService().recordConnectionUnthrottled();
+ /**
+ * Internal method to unmark a throttle type as active without side
effects.
+ *
+ * <p>This method updates the internal state for the specified throttle
type
+ * and returns the result of the operation. It handles both reentrant and
+ * non-reentrant throttle types appropriately:
+ *
+ * <ul>
+ * <li><b>Reentrant types:</b> Decrement the counter</li>
+ * <li><b>Non-reentrant types:</b> Clear active state if currently
active</li>
+ * </ul>
+ *
+ * @param throttleType the type of throttling to mark as inactive
+ * @return the result of the unmarking operation
+ * @see ThrottleRes
+ */
+ private ThrottleRes doUnmarkThrottled(ThrottleType throttleType) {
+ int value = states[throttleType.ordinal()];
+ if (throttleType.isReentrant()) {
+ states[throttleType.ordinal()] = value - 1;
+ } else {
+ if (value != 1) {
+ return ThrottleRes.Dropped;
}
+ states[throttleType.ordinal()] = 0;
+ }
+ return hasThrottled() ? ThrottleRes.TypeStateChanged :
ThrottleRes.ConnectionStateChanged;
+ }
+
+ /**
+ * Records metrics after a throttling operation.
+ *
+ * <p>This method updates various broker metrics to track throttling
events:
+ * <ul>
+ * <li>Connection-specific throttling metrics for in-flight publishing
limits</li>
+ * <li>Connection pause metrics when the overall connection state
changes</li>
+ * <li>Topic-level publish limiting counters</li>
+ * </ul>
+ *
+ * @param type the throttle type that was activated
+ * @param res the result of the throttling operation
+ */
+ private void recordMetricsAfterThrottling(ThrottleType type, ThrottleRes
res) {
+ if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded
&& res != ThrottleRes.Dropped) {
+ serverCnx.getBrokerService().recordConnectionThrottled();
+ }
+ if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+ serverCnx.increasePublishLimitedTimesForTopics();
+ serverCnx.getBrokerService().recordConnectionPaused();
}
}
- private boolean
changeThrottlingFlag(AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
throttlingFlagFieldUpdater,
- boolean throttlingEnabled) {
- // don't change a throttling flag if the channel is not active
- if (!isChannelActive()) {
- return false;
+ /**
+ * Records metrics after an unthrottling operation.
+ *
+ * <p>This method updates various broker metrics to track unthrottling
events:
+ * <ul>
+ * <li>Connection-specific unthrottling metrics for in-flight publishing
limits</li>
+ * <li>Connection resume metrics when the overall connection state
changes</li>
+ * </ul>
+ *
+ * @param type the throttle type that was deactivated
+ * @param res the result of the unthrottling operation
+ */
+ private void recordMetricsAfterUnthrottling(ThrottleType type, ThrottleRes
res) {
+ if (type == ThrottleType.ConnectionMaxPendingPublishRequestsExceeded
&& res != ThrottleRes.Dropped) {
+ serverCnx.getBrokerService().recordConnectionUnthrottled();
}
- if (throttlingFlagFieldUpdater.compareAndSet(this,
booleanToInt(!throttlingEnabled),
- booleanToInt(throttlingEnabled))) {
- if (throttlingEnabled) {
- incrementThrottleCount();
- } else {
- decrementThrottleCount();
- }
- return true;
- } else {
- return false;
+ if (res == ThrottleRes.ConnectionStateChanged && isChannelActive()) {
+ serverCnx.getBrokerService().recordConnectionResumed();
}
}
- private static int booleanToInt(boolean value) {
- return value ? 1 : 0;
+ public ServerCnxThrottleTracker(ServerCnx serverCnx) {
+ this.serverCnx = serverCnx;
+ }
+
+ private boolean isChannelActive() {
+ return serverCnx.isActive() && serverCnx.ctx() != null &&
serverCnx.ctx().channel().isActive();
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
index 63599f09eef..2c0b247a94b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java
@@ -88,22 +88,9 @@ public interface TransportCnx {
CompletableFuture<Optional<Boolean>> checkConnectionLiveness();
/**
- * Increments the counter that controls the throttling of the connection
by pausing reads.
- * The connection will be throttled while the counter is greater than 0.
- * <p>
- * The caller is responsible for decrementing the counter by calling
{@link #decrementThrottleCount()} when the
- * connection should no longer be throttled.
+ * Get the throttle tracker for this connection.
*/
- void incrementThrottleCount();
-
- /**
- * Decrements the counter that controls the throttling of the connection
by pausing reads.
- * The connection will be throttled while the counter is greater than 0.
- * <p>
- * This method should be called when the connection should no longer be
throttled. However, the caller should have
- * previously called {@link #incrementThrottleCount()}.
- */
- void decrementThrottleCount();
+ ServerCnxThrottleTracker getThrottleTracker();
FeatureFlags getFeatures();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
index ec952a7ca77..3e6edb04932 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterDisableTest.java
@@ -18,10 +18,11 @@
*/
package org.apache.pulsar.broker.service;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class PublishRateLimiterDisableTest {
@@ -29,9 +30,23 @@ public class PublishRateLimiterDisableTest {
// GH issue #10603
@Test
void shouldAlwaysAllowAcquire() {
- PublishRateLimiter publishRateLimiter = new
PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK);
+ PublishRateLimiter publishRateLimiter = new
PublishRateLimiterImpl(AsyncTokenBucket.DEFAULT_SNAPSHOT_CLOCK,
+ producer -> {
+ producer.getCnx().getThrottleTracker().markThrottled(
+
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+ }, producer -> {
+ producer.getCnx().getThrottleTracker().unmarkThrottled(
+
ServerCnxThrottleTracker.ThrottleType.BrokerPublishRate);
+ });
Producer producer = mock(Producer.class);
+ ServerCnx serverCnx = mock(ServerCnx.class);
+ doAnswer(a -> serverCnx).when(producer).getCnx();
+ ServerCnxThrottleTracker throttleTracker = new
ServerCnxThrottleTracker(serverCnx);
+ doAnswer(a -> throttleTracker).when(serverCnx).getThrottleTracker();
+ when(producer.getCnx()).thenReturn(serverCnx);
+ BrokerService brokerService = mock(BrokerService.class);
+ when(serverCnx.getBrokerService()).thenReturn(brokerService);
publishRateLimiter.handlePublishThrottling(producer,
Integer.MAX_VALUE, Long.MAX_VALUE);
- verify(producer, never()).incrementThrottleCount();
+ Assert.assertEquals(throttleTracker.throttledCount(), 0);
}
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
index 20c1ad0a412..573e3980c73 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java
@@ -19,18 +19,18 @@
package org.apache.pulsar.broker.service;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -47,36 +47,40 @@ public class PublishRateLimiterTest {
private AtomicLong manualClockSource;
private Producer producer;
+ private ServerCnx serverCnx;
private PublishRateLimiterImpl publishRateLimiter;
-
- private AtomicInteger throttleCount = new AtomicInteger(0);
+ private ServerCnxThrottleTracker throttleTracker;
+ private DefaultThreadFactory threadFactory = new
DefaultThreadFactory("pulsar-io");
+ private EventLoop eventLoop = new DefaultEventLoop(threadFactory);
@BeforeMethod
public void setup() throws Exception {
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
manualClockSource = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
- publishRateLimiter = new PublishRateLimiterImpl(() ->
manualClockSource.get());
+ publishRateLimiter = new PublishRateLimiterImpl(() ->
manualClockSource.get(),
+ producer -> {
+ producer.getCnx().getThrottleTracker().markThrottled(
+
ServerCnxThrottleTracker.ThrottleType.TopicPublishRate);
+ }, producer -> {
+ producer.getCnx().getThrottleTracker().unmarkThrottled(
+ ServerCnxThrottleTracker.ThrottleType.TopicPublishRate);
+ });
publishRateLimiter.update(policies, CLUSTER_NAME);
producer = mock(Producer.class);
- throttleCount.set(0);
- doAnswer(a -> {
- throttleCount.incrementAndGet();
- return null;
- }).when(producer).incrementThrottleCount();
- doAnswer(a -> {
- throttleCount.decrementAndGet();
- return null;
- }).when(producer).decrementThrottleCount();
- TransportCnx transportCnx = mock(TransportCnx.class);
- when(producer.getCnx()).thenReturn(transportCnx);
+ serverCnx = mock(ServerCnx.class);
+ ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
+ doAnswer(a -> eventLoop).when(channelHandlerContext).executor();
+ doAnswer(a -> channelHandlerContext).when(serverCnx).ctx();
+ doAnswer(a -> this.serverCnx).when(producer).getCnx();
+ throttleTracker = new ServerCnxThrottleTracker(this.serverCnx);
+ doAnswer(a ->
throttleTracker).when(this.serverCnx).getThrottleTracker();
+ when(producer.getCnx()).thenReturn(serverCnx);
BrokerService brokerService = mock(BrokerService.class);
- when(transportCnx.getBrokerService()).thenReturn(brokerService);
+ when(serverCnx.getBrokerService()).thenReturn(brokerService);
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
when(brokerService.executor()).thenReturn(eventLoopGroup);
- EventLoop eventLoop = mock(EventLoop.class);
when(eventLoopGroup.next()).thenReturn(eventLoop);
- doReturn(null).when(eventLoop).schedule(any(Runnable.class),
anyLong(), any());
incrementSeconds(1);
}
@@ -86,38 +90,63 @@ public class PublishRateLimiterTest {
policies.publishMaxMessageRate = null;
}
+ @AfterMethod
+ public void tearDown() throws Exception {
+ eventLoop.shutdownGracefully();
+ }
+
private void incrementSeconds(int seconds) {
manualClockSource.addAndGet(TimeUnit.SECONDS.toNanos(seconds));
}
@Test
public void testPublishRateLimiterImplExceed() throws Exception {
- // increment not exceed
- publishRateLimiter.handlePublishThrottling(producer, 5, 50);
- assertEquals(throttleCount.get(), 0);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ eventLoop.execute(() -> {
+ try {
+ // increment not exceed
+ publishRateLimiter.handlePublishThrottling(producer, 5, 50);
+ assertEquals(throttleTracker.throttledCount(), 0);
- incrementSeconds(1);
+ incrementSeconds(1);
- // numOfMessages increment exceeded
- publishRateLimiter.handlePublishThrottling(producer, 11, 100);
- assertEquals(throttleCount.get(), 1);
+ // numOfMessages increment exceeded
+ publishRateLimiter.handlePublishThrottling(producer, 11, 100);
+ assertEquals(throttleTracker.throttledCount(), 1);
- incrementSeconds(1);
+ incrementSeconds(1);
+
+ // msgSizeInBytes increment exceeded
+ publishRateLimiter.handlePublishThrottling(producer, 9, 110);
+ assertEquals(throttleTracker.throttledCount(), 2);
- // msgSizeInBytes increment exceeded
- publishRateLimiter.handlePublishThrottling(producer, 9, 110);
- assertEquals(throttleCount.get(), 2);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ });
+ future.get(5, TimeUnit.SECONDS);
}
@Test
- public void testPublishRateLimiterImplUpdate() {
- publishRateLimiter.handlePublishThrottling(producer, 11, 110);
- assertEquals(throttleCount.get(), 1);
-
- // update
- throttleCount.set(0);
- publishRateLimiter.update(newPublishRate);
- publishRateLimiter.handlePublishThrottling(producer, 11, 110);
- assertEquals(throttleCount.get(), 0);
+ public void testPublishRateLimiterImplUpdate() throws Exception {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ eventLoop.execute(() -> {
+ try {
+ publishRateLimiter.handlePublishThrottling(producer, 11, 110);
+ assertEquals(throttleTracker.throttledCount(), 1);
+
+ // update
+ throttleTracker = new ServerCnxThrottleTracker(serverCnx);
+ publishRateLimiter.update(newPublishRate);
+ publishRateLimiter.handlePublishThrottling(producer, 11, 110);
+ assertEquals(throttleTracker.throttledCount(), 0);
+
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ });
+ future.get(5, TimeUnit.SECONDS);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
index 40bcb19ab0c..929350b599e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPublishRateThrottleTest.java
@@ -73,6 +73,37 @@ public class TopicPublishRateThrottleTest extends
BrokerTestBase{
pulsarClient.close();
}
+ @Test
+ public void testResumeEvenProducerClosed() throws Exception {
+ PublishRate publishRate = new PublishRate(1, 10);
+ conf.setMaxPendingPublishRequestsPerConnection(0);
+ super.baseSetup();
+ admin.namespaces().setPublishRate("prop/ns-abc", publishRate);
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+ org.apache.pulsar.client.api.Producer<byte[]> producer =
pulsarClient.newProducer()
+ .topic(topic).create();
+
+ Topic topicRef =
pulsar.getBrokerService().getTopicReference(topic).get();
+ Assert.assertNotNull(topicRef);
+ MessageId messageId = null;
+ // first will be success, and the second will fail, will set auto read
to false.
+ messageId = producer.sendAsync(new byte[10]).get(500,
TimeUnit.MILLISECONDS);
+ Assert.assertNotNull(messageId);
+ // second will be blocked
+ producer.sendAsync(new byte[10]);
+
+ // Verify: even through the producer was closed before the unblock,
the state should be unblocked at the next
+ // period of rate limiter.
+ producer.close();
+ Thread.sleep(3000);
+ org.apache.pulsar.client.api.Producer<byte[]> producer2 =
pulsarClient.newProducer()
+ .topic(topic).create();
+ producer2.sendAsync(new byte[2]).get(500, TimeUnit.MILLISECONDS);
+
+ // Close the PulsarClient gracefully to avoid ByteBuf leak
+ pulsarClient.close();
+ }
+
@Test
public void testSystemTopicPublishNonBlock() throws Exception {
super.baseSetup();