lhotari commented on code in PR #24799:
URL: https://github.com/apache/pulsar/pull/24799#discussion_r2428087569
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java:
##########
@@ -18,125 +18,442 @@
*/
package org.apache.pulsar.broker.service;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
/**
- * Tracks the state of throttling for a connection. The throttling happens by
pausing reads by setting
- * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false
for the channel (connection).
- * <p>
- * There can be multiple rate limiters that can throttle a connection. Each
rate limiter will independently
- * call the {@link #incrementThrottleCount()} and {@link
#decrementThrottleCount()} methods to signal that the
- * connection should be throttled or not. The connection will be throttled if
the counter is greater than 0.
- * <p>
- * Besides the rate limiters, the connection can also be throttled if the
number of pending publish requests exceeds
- * a configured threshold. This throttling is toggled with the {@link
#setPendingSendRequestsExceeded} method.
- * There's also per-thread memory limits which could throttle the connection.
This throttling is toggled with the
- * {@link #setPublishBufferLimiting} method. Internally, these two methods
will call the
- * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()}
methods when the state changes.
+ * Manages and tracks throttling state for server connections in Apache Pulsar.
+ *
+ * <p>This class provides a centralized mechanism to control connection
throttling by managing
+ * multiple throttling conditions simultaneously. When throttling is active,
it pauses incoming
+ * requests by setting Netty's {@link
io.netty.channel.ChannelConfig#setAutoRead(boolean)} to
+ * {@code false} for the associated channel.
+ *
+ * <h3>Throttling Mechanism</h3>
+ * <p>The tracker maintains independent counters for different types of
throttling conditions
+ * defined in {@link ThrottleType}. A connection is considered throttled if
any of these
+ * conditions are active (counter > 0). The connection will only resume normal
operation
+ * when all throttling conditions are cleared.
+ *
+ * <h3>Supported Throttling Types</h3>
+ * <ul>
+ * <li><b>Connection-level:</b> Max pending publish requests, outbound
buffer limits</li>
+ * <li><b>Thread-level:</b> IO thread memory limits for in-flight
publishing</li>
+ * <li><b>Topic-level:</b> Topic publish rate limiting</li>
+ * <li><b>Resource Group-level:</b> Resource group publish rate limiting</li>
+ * <li><b>Broker-level:</b> Global broker publish rate limiting</li>
+ * <li><b>Flow Control:</b> Channel writability and cooldown rate
limiting</li>
+ * </ul>
+ *
+ * <h3>Reentrant vs Non-Reentrant Types</h3>
+ * <p>Some throttling types support multiple concurrent activations
(reentrant):
+ * <ul>
+ * <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple
producers may share
+ * the same rate limiter which relates to the same topic</li>
+ * <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because
multiple producers may share
+ * the same rate limiter which relates to the same resource group</li>
+ * </ul>
+ * <p>Other types are non-reentrant and can only be activated once at a time.
The reentrant types
+ * use counters to track how many producers are affected by the same shared
rate limiter, while
+ * non-reentrant types use simple boolean states.
+ *
+ * <h3>Thread Safety</h3>
+ * <p>This class is designed to be used from a single thread (the connection's
IO thread)
+ * and is not thread-safe for concurrent access from multiple threads.
+ *
+ * <h3>Usage Example</h3>
+ * <pre>{@code
+ * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx);
+ *
+ * // Mark connection as throttled due to rate limiting
+ * tracker.markThrottled(ThrottleType.TopicPublishRate);
+ *
+ * // Later, when rate limiting condition is cleared
+ * tracker.unmarkThrottled(ThrottleType.TopicPublishRate);
+ * }</pre>
+ *
+ * @see ThrottleType
+ * @see ThrottleRes
+ * @see ServerCnx
*/
@Slf4j
-final class ServerCnxThrottleTracker {
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
THROTTLE_COUNT_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "throttleCount");
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
- PENDING_SEND_REQUESTS_EXCEEDED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class,
"pendingSendRequestsExceeded");
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
PUBLISH_BUFFER_LIMITING_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "publishBufferLimiting");
+public final class ServerCnxThrottleTracker {
+
private final ServerCnx serverCnx;
- private volatile int throttleCount;
- private volatile int pendingSendRequestsExceeded;
- private volatile int publishBufferLimiting;
+ private final int[] states = new int[ThrottleType.values().length];
+ /**
+ * Enumeration of different throttling conditions that can be applied to a
server connection.
+ *
+ * <p>Each type represents a specific resource constraint or rate limiting
condition
+ * that may require throttling the connection to maintain system stability
and fairness.
+ *
+ * <p>Some types support reentrant behavior (can be activated multiple
times concurrently),
+ * while others are non-reentrant (single activation only).
+ */
+ public static enum ThrottleType {
- public ServerCnxThrottleTracker(ServerCnx serverCnx) {
- this.serverCnx = serverCnx;
+ /**
+ * Throttling due to excessive pending publish requests on the
connection.
+ *
+ * <p>This throttling is activated when the number of in-flight
publish requests
+ * exceeds the configured limit. It helps prevent memory exhaustion
and ensures
+ * fair resource allocation across connections.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()}
+ */
+ ConnectionMaxQuantityOfInFlightPublishing(false),
Review Comment:
the name `ConnectionMaxQuantityOfInFlightPublishing` is not great. It's
better to align more closely with `maxPendingPublishRequestsPerConnection`
configuration setting.
For example, `ConnectionMaxPendingPublishRequestsExceeded` (could also omit
`Exceeded` suffix).
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java:
##########
@@ -18,125 +18,442 @@
*/
package org.apache.pulsar.broker.service;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.ServiceConfiguration;
/**
- * Tracks the state of throttling for a connection. The throttling happens by
pausing reads by setting
- * Netty {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} to false
for the channel (connection).
- * <p>
- * There can be multiple rate limiters that can throttle a connection. Each
rate limiter will independently
- * call the {@link #incrementThrottleCount()} and {@link
#decrementThrottleCount()} methods to signal that the
- * connection should be throttled or not. The connection will be throttled if
the counter is greater than 0.
- * <p>
- * Besides the rate limiters, the connection can also be throttled if the
number of pending publish requests exceeds
- * a configured threshold. This throttling is toggled with the {@link
#setPendingSendRequestsExceeded} method.
- * There's also per-thread memory limits which could throttle the connection.
This throttling is toggled with the
- * {@link #setPublishBufferLimiting} method. Internally, these two methods
will call the
- * {@link #incrementThrottleCount()} and {@link #decrementThrottleCount()}
methods when the state changes.
+ * Manages and tracks throttling state for server connections in Apache Pulsar.
+ *
+ * <p>This class provides a centralized mechanism to control connection
throttling by managing
+ * multiple throttling conditions simultaneously. When throttling is active,
it pauses incoming
+ * requests by setting Netty's {@link
io.netty.channel.ChannelConfig#setAutoRead(boolean)} to
+ * {@code false} for the associated channel.
+ *
+ * <h3>Throttling Mechanism</h3>
+ * <p>The tracker maintains independent counters for different types of
throttling conditions
+ * defined in {@link ThrottleType}. A connection is considered throttled if
any of these
+ * conditions are active (counter > 0). The connection will only resume normal
operation
+ * when all throttling conditions are cleared.
+ *
+ * <h3>Supported Throttling Types</h3>
+ * <ul>
+ * <li><b>Connection-level:</b> Max pending publish requests, outbound
buffer limits</li>
+ * <li><b>Thread-level:</b> IO thread memory limits for in-flight
publishing</li>
+ * <li><b>Topic-level:</b> Topic publish rate limiting</li>
+ * <li><b>Resource Group-level:</b> Resource group publish rate limiting</li>
+ * <li><b>Broker-level:</b> Global broker publish rate limiting</li>
+ * <li><b>Flow Control:</b> Channel writability and cooldown rate
limiting</li>
+ * </ul>
+ *
+ * <h3>Reentrant vs Non-Reentrant Types</h3>
+ * <p>Some throttling types support multiple concurrent activations
(reentrant):
+ * <ul>
+ * <li>{@link ThrottleType#TopicPublishRate} - Reentrant because multiple
producers may share
+ * the same rate limiter which relates to the same topic</li>
+ * <li>{@link ThrottleType#ResourceGroupPublishRate} - Reentrant because
multiple producers may share
+ * the same rate limiter which relates to the same resource group</li>
+ * </ul>
+ * <p>Other types are non-reentrant and can only be activated once at a time.
The reentrant types
+ * use counters to track how many producers are affected by the same shared
rate limiter, while
+ * non-reentrant types use simple boolean states.
+ *
+ * <h3>Thread Safety</h3>
+ * <p>This class is designed to be used from a single thread (the connection's
IO thread)
+ * and is not thread-safe for concurrent access from multiple threads.
+ *
+ * <h3>Usage Example</h3>
+ * <pre>{@code
+ * ServerCnxThrottleTracker tracker = new ServerCnxThrottleTracker(serverCnx);
+ *
+ * // Mark connection as throttled due to rate limiting
+ * tracker.markThrottled(ThrottleType.TopicPublishRate);
+ *
+ * // Later, when rate limiting condition is cleared
+ * tracker.unmarkThrottled(ThrottleType.TopicPublishRate);
+ * }</pre>
+ *
+ * @see ThrottleType
+ * @see ThrottleRes
+ * @see ServerCnx
*/
@Slf4j
-final class ServerCnxThrottleTracker {
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
THROTTLE_COUNT_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "throttleCount");
-
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
- PENDING_SEND_REQUESTS_EXCEEDED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class,
"pendingSendRequestsExceeded");
- private static final AtomicIntegerFieldUpdater<ServerCnxThrottleTracker>
PUBLISH_BUFFER_LIMITING_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(
- ServerCnxThrottleTracker.class, "publishBufferLimiting");
+public final class ServerCnxThrottleTracker {
+
private final ServerCnx serverCnx;
- private volatile int throttleCount;
- private volatile int pendingSendRequestsExceeded;
- private volatile int publishBufferLimiting;
+ private final int[] states = new int[ThrottleType.values().length];
+ /**
+ * Enumeration of different throttling conditions that can be applied to a
server connection.
+ *
+ * <p>Each type represents a specific resource constraint or rate limiting
condition
+ * that may require throttling the connection to maintain system stability
and fairness.
+ *
+ * <p>Some types support reentrant behavior (can be activated multiple
times concurrently),
+ * while others are non-reentrant (single activation only).
+ */
+ public static enum ThrottleType {
- public ServerCnxThrottleTracker(ServerCnx serverCnx) {
- this.serverCnx = serverCnx;
+ /**
+ * Throttling due to excessive pending publish requests on the
connection.
+ *
+ * <p>This throttling is activated when the number of in-flight
publish requests
+ * exceeds the configured limit. It helps prevent memory exhaustion
and ensures
+ * fair resource allocation across connections.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getMaxPendingPublishRequestsPerConnection()}
+ */
+ ConnectionMaxQuantityOfInFlightPublishing(false),
+
+ /**
+ * Throttling due to excessive memory usage by in-flight publish
operations on the IO thread.
+ *
+ * <p>This throttling is activated when the total memory used by
pending publish operations
+ * on a shared IO thread exceeds the configured limit. Multiple
connections may share the
+ * same IO thread, so this limit applies across all connections on
that thread.
+ *
+ * <p><b>Type:</b> Non-reentrant
+ * <p><b>Configuration:</b> {@link
ServiceConfiguration#getMaxMessagePublishBufferSizeInMB()}
+ */
+ IOThreadMaxBytesOfInFlightPublishing(false),
Review Comment:
The name `IOThreadMaxBytesOfInFlightPublishing` isn't great. It's better to
more closely alight with the configuration setting
`maxMessagePublishBufferSizeInMB`.
For example `IOThreadMaxPendingPublishBytesExceeded`. This would also be
aligned with the possible new name
`ConnectionMaxPendingPublishRequestsExceeded`.
(could also omit `Exceeded` suffixes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]