lhotari commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2373384002
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
}
+ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand
cmd) {
+ if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType()
== BaseCommand.Type.PONG
Review Comment:
The later comment about "Please ensure that this is a no-op when this
feature is disabled" hasn't been addressed yet. I would assume that it could be
addressed by adding `!pulsarChannelPauseReceivingRequestsIfUnwritable ||`
here.
```suggestion
if (!pulsarChannelPauseReceivingRequestsIfUnwritable ||
rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() ==
BaseCommand.Type.PONG
```
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in
milliseconds). If the broker rece
)
private int brokerMaxConnections = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferHighWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer exceeds this value, channel
writable state will start to return \"false\"."
+ )
+ private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferLowWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer is smaller than this value,
channel writable state will start to return"
+ + " \"true\"."
+ )
+ private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "If enabled, the broker will pause reading from the channel to
deal with new request once the writer"
+ + " buffer is full, until it is changed to writable."
+ )
+ private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
+ + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ + " how long the rate limiting should last, in seconds. Once
the bytes that are waiting to be sent out"
+ + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ + " value will disable the rate limiting."
+ )
+ private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines how"
+ + " many requests should be allowed in the rate limiting period."
+
+ )
+ private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines the"
Review Comment:
"period of time" -> "period of time defined by
pulsarChannelPauseReceivingCooldownSeconds
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +450,37 @@ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
}
+ protected void checkRateLimit(BaseCommand cmd) {
+ if (cmd.getType() == BaseCommand.Type.PONG && cmd.getType() ==
BaseCommand.Type.PING) {
+ return;
+ }
+ if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
Review Comment:
The comment about "Please ensure that this is a no-op when this feature is
disabled" hasn't been addressed yet. I also added a new comment about 15 lines
above.
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in
milliseconds). If the broker rece
)
private int brokerMaxConnections = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferHighWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer exceeds this value, channel
writable state will start to return \"false\"."
+ )
+ private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferLowWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer is smaller than this value,
channel writable state will start to return"
+ + " \"true\"."
+ )
+ private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "If enabled, the broker will pause reading from the channel to
deal with new request once the writer"
+ + " buffer is full, until it is changed to writable."
+ )
+ private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
+ + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ + " how long the rate limiting should last, in seconds. Once
the bytes that are waiting to be sent out"
+ + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ + " value will disable the rate limiting."
+ )
+ private int pulsarChannelPauseReceivingCooldownSeconds = 5;
Review Comment:
I think I added a comment about the granularity of this setting, but I no
longer see the previous comment due to changes. seconds might be too granular
for tuning purposes. using milliseconds from the beginning would resolve that.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
}
}
+ private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand
cmd) {
+ if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType()
== BaseCommand.Type.PONG
+ || cmd.getType() == BaseCommand.Type.PING) {
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
+ if (outboundBuffer != null) {
+ log.debug("Start to handle request [{}],
totalPendingWriteBytes: {}, channel isWritable: {}",
+ cmd.getType(),
outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable());
+ } else {
+ log.debug("Start to handle request [{}], channel isWritable:
{}",
+ cmd.getType(), ctx.channel().isWritable());
+ }
+ }
+ // "requestRateLimiter" will return the permits that you acquired if
it is not opening(has been called
+ // "timingOpen(duration)").
+ if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+ log.warn("[{}] Reached rate limitation", this);
+ // Stop receiving requests.
+ pausedDueToRateLimitation = true;
+ ctx.channel().config().setAutoRead(false);
+ // Resume after 1 second.
+ ctx.channel().eventLoop().schedule(() -> {
+ if (pausedDueToRateLimitation) {
+ log.info("[{}] Resuming connection after rate limitation",
this);
+ ctx.channel().config().setAutoRead(true);
+ pausedDueToRateLimitation = false;
+ }
+ }, 1, TimeUnit.SECONDS);
+ }
+ }
+
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
- if (log.isDebugEnabled()) {
- log.debug("Channel writability has changed to: {}",
ctx.channel().isWritable());
+ if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+ log.info("[{}] is writable, turn on channel auto-read", this);
+ ctx.channel().config().setAutoRead(true);
+
requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable,
TimeUnit.SECONDS);
+ } else if (pauseReceivingRequestsIfUnwritable &&
!ctx.channel().isWritable()) {
+ final ChannelOutboundBuffer outboundBuffer =
ctx.channel().unsafe().outboundBuffer();
+ if (outboundBuffer != null) {
+ log.warn("[{}] is not writable, turn off channel auto-read,
totalPendingWriteBytes: {}",
+ this, outboundBuffer.totalPendingWriteBytes());
+ } else {
+ log.warn("[{}] is not writable, turn off channel auto-read",
this);
+ }
Review Comment:
I think that this will cause very verbose logging.
Channel writability changes are completely normal in a Netty application. No
warning should be printed when the writability changes since it's not an error.
There will always be cases where the broker will be filling the outbound buffer
with very high amounts until there's a different type of flow control for
dispatching to consumers. The dispatcher doesn't currently care if the channel
is writable or not and that's also why Pulsar is really fast at the moment. It
will buffer data and it can utilize the network very efficiently because of
this. The flow control is currently based on
`managedLedgerMaxReadsInFlightSizeInMB`. The main weakness is that the solution
doesn't have a concept of "fairness". A single slow consumer can hog the
resources even when it's not making progress. That's why writability should
also be used in dispatching, but the high water mark / low water marks should
be relatively high so that sufficient buffering is happening. Since water marks
could be adjusted
separately for each channel instance, it should possibly be later some sort of
policy setting so that client connections that require high throughput could be
configured with higher buffering.
This is why I think that adding PIP-434 is useful and we will learn the
impacts of it along the way when we start experimenting how it behaves in
practice. At this moment, I don't believe that the cooldown period is useful
since channel writability can be switching back and forth constantly in a
completely healthy situation. Flow control in Netty is like that when the
server is pushing a lot of data (and the client cannot keep up).
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in
milliseconds). If the broker rece
)
private int brokerMaxConnections = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferHighWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer exceeds this value, channel
writable state will start to return \"false\"."
+ )
+ private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferLowWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer is smaller than this value,
channel writable state will start to return"
+ + " \"true\"."
+ )
+ private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "If enabled, the broker will pause reading from the channel to
deal with new request once the writer"
+ + " buffer is full, until it is changed to writable."
+ )
+ private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
+ + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ + " how long the rate limiting should last, in seconds. Once
the bytes that are waiting to be sent out"
+ + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ + " value will disable the rate limiting."
+ )
+ private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines how"
+ + " many requests should be allowed in the rate limiting period."
+
+ )
+ private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
Review Comment:
Why is this called "*LimitRate" and the matching pair of this is
"*RateLimitPeriod"?
"*RateLimitPermits" and "*RateLimitPeriod" could possibly be more
descriptive.
##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in
milliseconds). If the broker rece
)
private int brokerMaxConnections = 0;
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferHighWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer exceeds this value, channel
writable state will start to return \"false\"."
+ )
+ private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "It relates to configuration \"WriteBufferLowWaterMark\" of
Netty Channel Config. If the number of bytes"
+ + " queued in the write buffer is smaller than this value,
channel writable state will start to return"
+ + " \"true\"."
+ )
+ private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "If enabled, the broker will pause reading from the channel to
deal with new request once the writer"
+ + " buffer is full, until it is changed to writable."
+ )
+ private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
+ + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ + " how long the rate limiting should last, in seconds. Once
the bytes that are waiting to be sent out"
+ + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ + " value will disable the rate limiting."
+ )
+ private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines how"
+ + " many requests should be allowed in the rate limiting period."
+
+ )
+ private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+ @FieldContext(
+ category = CATEGORY_POLICIES,
+ doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ + " period of time to avoid overwhelming due to the backlog of
requests. This parameter defines the"
+ + " period of the rate limiter in milliseconds."
Review Comment:
"period of the rate limiter" remains a concept that is hard to understand.
There should be a way to explain it. One possibility is to provide an example
to clarify it. "If the rate limit period is set to 1000, then the unit is
requests per second. When it's 10, the unit is requests per every 10ms"
Cross referencing the "*RateLimitPermits" (I suggested a rename) config
could also be useful since it's a "pair" for this config.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]