poorbarcode commented on code in PR #24423:
URL: https://github.com/apache/pulsar/pull/24423#discussion_r2374015896


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -438,11 +452,56 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
         }
     }
 
+    private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand 
cmd) {
+        if (rateLimitingSecondsAfterResumeFromUnreadable <= 0 || cmd.getType() 
== BaseCommand.Type.PONG
+                || cmd.getType() == BaseCommand.Type.PING) {
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.debug("Start to handle request [{}], 
totalPendingWriteBytes: {}, channel isWritable: {}",
+                        cmd.getType(), 
outboundBuffer.totalPendingWriteBytes(), ctx.channel().isWritable());
+            } else {
+                log.debug("Start to handle request [{}], channel isWritable: 
{}",
+                        cmd.getType(), ctx.channel().isWritable());
+            }
+        }
+        // "requestRateLimiter" will return the permits that you acquired if 
it is not opening(has been called
+        // "timingOpen(duration)").
+        if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+            log.warn("[{}] Reached rate limitation", this);
+            // Stop receiving requests.
+            pausedDueToRateLimitation = true;
+            ctx.channel().config().setAutoRead(false);
+            // Resume after 1 second.
+            ctx.channel().eventLoop().schedule(() -> {
+                if (pausedDueToRateLimitation) {
+                    log.info("[{}] Resuming connection after rate limitation", 
this);
+                    ctx.channel().config().setAutoRead(true);
+                    pausedDueToRateLimitation = false;
+                }
+            }, 1, TimeUnit.SECONDS);
+        }
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Channel writability has changed to: {}", 
ctx.channel().isWritable());
+        if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+            log.info("[{}] is writable, turn on channel auto-read", this);
+            ctx.channel().config().setAutoRead(true);
+            
requestRateLimiter.timingOpen(rateLimitingSecondsAfterResumeFromUnreadable, 
TimeUnit.SECONDS);
+        } else if (pauseReceivingRequestsIfUnwritable && 
!ctx.channel().isWritable()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                log.warn("[{}] is not writable, turn off channel auto-read, 
totalPendingWriteBytes: {}",
+                        this, outboundBuffer.totalPendingWriteBytes());
+            } else {
+                log.warn("[{}] is not writable, turn off channel auto-read", 
this);
+            }

Review Comment:
   Changed the log level to `info`(because it is really helpful for 
troubleshooting). If it really prints too many, I will push a PR to change it 
to the `debug` level



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;

Review Comment:
   Improved



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;

Review Comment:
   Renamed



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines the"
+            + " period of the rate limiter in milliseconds."

Review Comment:
   Improved



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -919,6 +919,56 @@ The max allowed delay for delayed delivery (in 
milliseconds). If the broker rece
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in seconds. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownSeconds = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownLimitRate = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines the"

Review Comment:
   Improved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to