This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b74fe07bd2 [improve][broker] Part-1 of PIP-434: Expose Netty channel 
configuration WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests 
when channel is unwritable (#24423)
1b74fe07bd2 is described below

commit 1b74fe07bd24e079434769de2c52222761d88ca1
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 25 03:45:17 2025 +0800

    [improve][broker] Part-1 of PIP-434: Expose Netty channel configuration 
WRITE_BUFFER_WATER_MARK to pulsar conf and pause receive requests when channel 
is unwritable (#24423)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  52 ++++
 .../broker/service/PulsarChannelInitializer.java   |   2 +
 .../apache/pulsar/broker/service/ServerCnx.java    |  76 ++++-
 .../pulsar/utils/TimedSingleThreadRateLimiter.java |  83 +++++
 .../broker/service/utils/ClientChannelHelper.java  |   3 +-
 .../pulsar/client/api/MockBrokerService.java       |   3 +-
 .../api/PatternConsumerBackPressureTest.java       |  99 ++++++
 .../utils/TimedSingleThreadRateLimiterTest.java    | 337 +++++++++++++++++++++
 .../pulsar/common/protocol/PulsarDecoder.java      |   4 +-
 .../pulsar/common/protocol/PulsarHandler.java      |   3 +-
 .../pulsar/common/protocol/PulsarDecoderTest.java  |   3 +-
 .../pulsar/proxy/server/DirectProxyHandler.java    |   3 +-
 12 files changed, 657 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5d2d94ea477..a59cf07075a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -927,6 +927,58 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int brokerMaxConnections = 0;
 
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferHighWaterMark\" of 
Netty Channel Config. If the number of bytes"
+            + " queued in the write buffer exceeds this value, channel 
writable state will start to return \"false\"."
+    )
+    private int pulsarChannelWriteBufferHighWaterMark = 64 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "It relates to configuration \"WriteBufferLowWaterMark\" of 
Netty Channel Config. If the number of bytes"
+                + " queued in the write buffer is smaller than this value, 
channel writable state will start to return"
+                + " \"true\"."
+    )
+    private int pulsarChannelWriteBufferLowWaterMark = 32 * 1024;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "If enabled, the broker will pause reading from the channel to 
deal with new request once the writer"
+            + " buffer is full, until it is changed to writable."
+    )
+    private boolean pulsarChannelPauseReceivingRequestsIfUnwritable = false;
+
+    @FieldContext(
+            category = CATEGORY_POLICIES,
+            doc = "After the connection is recovered from an pause receiving 
state, the channel will be rate-limited"
+                + " for a of time window to avoid overwhelming due to the 
backlog of requests. This parameter defines"
+                + " how long the rate limiting should last, in millis. Once 
the bytes that are waiting to be sent out"
+                + " reach the \"pulsarChannelWriteBufferHighWaterMark\", the 
timer will be reset. Setting a negative"
+                + " value will disable the rate limiting."
+    )
+    private int pulsarChannelPauseReceivingCooldownMs = 5000;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time to avoid overwhelming due to the backlog of 
requests. This parameter defines how"
+            + " many requests should be allowed in the rate limiting period."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownRateLimitPermits = 5;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "After the connection is recovered from a pause receiving state, 
the channel will be rate-limited for a"
+            + " period of time defined by 
pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the"
+            + " backlog of requests. This parameter defines the period of the 
rate limiter in milliseconds. If the rate"
+            + " limit period is set to 1000, then the unit is requests per 
1000 milli seconds. When it's 10, the unit"
+            + " is requests per every 10ms."
+
+    )
+    private int pulsarChannelPauseReceivingCooldownRateLimitPeriodMs = 10;
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "The maximum number of connections per IP. If it exceeds, new 
connections are rejected."
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index 68da1083d22..dffd8260d46 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -81,6 +81,8 @@ public class PulsarChannelInitializer extends 
ChannelInitializer<SocketChannel>
         // disable auto read explicitly so that requests aren't served until 
auto read is enabled
         // ServerCnx must enable auto read in channelActive after 
PulsarService is ready to accept incoming requests
         ch.config().setAutoRead(false);
+        
ch.config().setWriteBufferHighWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferHighWaterMark());
+        
ch.config().setWriteBufferLowWaterMark(pulsar.getConfig().getPulsarChannelWriteBufferLowWaterMark());
         ch.pipeline().addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
         if (this.enableTls) {
             ch.pipeline().addLast(TLS_HANDLER, new 
SslHandler(this.sslFactory.createServerSslEngine(ch.alloc())));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1c61d4c467f..011b54c0b0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -37,6 +37,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundBuffer;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.FastThreadLocal;
@@ -182,6 +183,7 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
+import org.apache.pulsar.utils.TimedSingleThreadRateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -192,6 +194,8 @@ import org.slf4j.LoggerFactory;
  * parameter instance lifecycle.
  */
 public class ServerCnx extends PulsarHandler implements TransportCnx {
+    private static final Logger PAUSE_RECEIVING_LOG = 
LoggerFactory.getLogger(ServerCnx.class.getName()
+            + ".pauseReceiving");
     private final BrokerService service;
     private final SchemaRegistryService schemaService;
     private final String listenerName;
@@ -251,6 +255,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
     private final long connectionLivenessCheckTimeoutMillis;
     private final TopicsPattern.RegexImplementation 
topicsPatternImplementation;
+    private final boolean pauseReceivingRequestsIfUnwritable;
+    private final TimedSingleThreadRateLimiter requestRateLimiter;
+    private final int pauseReceivingCooldownMilliSeconds;
+    private boolean pausedDueToRateLimitation = false;
 
     // Tracks and limits number of bytes pending to be published from a single 
specific IO thread.
     static final class PendingBytesPerThreadTracker {
@@ -314,6 +322,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         // the null check is a workaround for #13620
         super(pulsar.getBrokerService() != null ? 
pulsar.getBrokerService().getKeepAliveIntervalSeconds() : 0,
                 TimeUnit.SECONDS);
+        this.pauseReceivingRequestsIfUnwritable =
+                
pulsar.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable();
+        this.requestRateLimiter = new TimedSingleThreadRateLimiter(
+                
pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(),
+                
pulsar.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(),
+                TimeUnit.MILLISECONDS);
+        this.pauseReceivingCooldownMilliSeconds =
+                pulsar.getConfig().getPulsarChannelPauseReceivingCooldownMs();
         this.service = pulsar.getBrokerService();
         this.schemaService = pulsar.getSchemaRegistryService();
         this.listenerName = listenerName;
@@ -442,11 +458,62 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
     }
 
+    private void checkPauseReceivingRequestsAfterResumeRateLimit(BaseCommand 
cmd) {
+        if (!pauseReceivingRequestsIfUnwritable
+                || pauseReceivingCooldownMilliSeconds <= 0 || cmd.getType() == 
BaseCommand.Type.PONG
+                || cmd.getType() == BaseCommand.Type.PING) {
+            return;
+        }
+        if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                PAUSE_RECEIVING_LOG.debug("Start to handle request [{}], 
totalPendingWriteBytes: {}, channel"
+                    + " isWritable: {}", cmd.getType(), 
outboundBuffer.totalPendingWriteBytes(),
+                    ctx.channel().isWritable());
+            } else {
+                PAUSE_RECEIVING_LOG.debug("Start to handle request [{}], 
channel isWritable: {}",
+                        cmd.getType(), ctx.channel().isWritable());
+            }
+        }
+        // "requestRateLimiter" will return the permits that you acquired if 
it is not opening(has been called
+        // "timingOpen(duration)").
+        if (requestRateLimiter.acquire(1) == 0 && !pausedDueToRateLimitation) {
+            log.warn("[{}] Reached rate limitation", this);
+            // Stop receiving requests.
+            pausedDueToRateLimitation = true;
+            ctx.channel().config().setAutoRead(false);
+            // Resume after 1 second.
+            ctx.channel().eventLoop().schedule(() -> {
+                if (pausedDueToRateLimitation) {
+                    log.info("[{}] Resuming connection after rate limitation", 
this);
+                    ctx.channel().config().setAutoRead(true);
+                    pausedDueToRateLimitation = false;
+                }
+            }, 1, TimeUnit.SECONDS);
+        }
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
-        if (log.isDebugEnabled()) {
-            log.debug("Channel writability has changed to: {}", 
ctx.channel().isWritable());
+        if (pauseReceivingRequestsIfUnwritable && ctx.channel().isWritable()) {
+            log.info("[{}] is writable, turn on channel auto-read", this);
+            ctx.channel().config().setAutoRead(true);
+            requestRateLimiter.timingOpen(pauseReceivingCooldownMilliSeconds, 
TimeUnit.MILLISECONDS);
+        } else if (pauseReceivingRequestsIfUnwritable && 
!ctx.channel().isWritable()) {
+            final ChannelOutboundBuffer outboundBuffer = 
ctx.channel().unsafe().outboundBuffer();
+            if (outboundBuffer != null) {
+                if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+                    PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off 
channel auto-read,"
+                        + " totalPendingWriteBytes: {}", this, 
outboundBuffer.totalPendingWriteBytes());
+                }
+            } else {
+                if (PAUSE_RECEIVING_LOG.isDebugEnabled()) {
+                    PAUSE_RECEIVING_LOG.debug("[{}] is not writable, turn off 
channel auto-read", this);
+                }
+            }
+            ctx.channel().config().setAutoRead(false);
         }
+        ctx.fireChannelWritabilityChanged();
     }
 
     @Override
@@ -3652,8 +3719,9 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
     }
 
     @Override
-    protected void messageReceived() {
-        super.messageReceived();
+    protected void messageReceived(BaseCommand cmd) {
+        checkPauseReceivingRequestsAfterResumeRateLimit(cmd);
+        super.messageReceived(cmd);
         if (connectionCheckInProgress != null && 
!connectionCheckInProgress.isDone()) {
             connectionCheckInProgress.complete(Optional.of(true));
             connectionCheckInProgress = null;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
new file mode 100644
index 00000000000..80043fb67f8
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class TimedSingleThreadRateLimiter {
+
+    @Getter
+    private final int rate;
+    @Getter
+    private final long periodAtMs;
+    private long lastTimeReset;
+    @Getter
+    private int remaining;
+    private long closeAfterAtMs;
+
+    public TimedSingleThreadRateLimiter(final int rate, final long period, 
final TimeUnit unit) {
+        this.rate = rate;
+        this.periodAtMs = unit.toMillis(period);
+        this.lastTimeReset = System.currentTimeMillis();
+        this.remaining = rate;
+    }
+
+    public int acquire(int permits) {
+        final long now = System.currentTimeMillis();
+        if (permits < 0) {
+            return 0;
+        }
+        if (now > closeAfterAtMs) {
+            return permits;
+        }
+        mayRenew(now);
+        if (remaining > permits) {
+            remaining -= permits;
+            if (log.isDebugEnabled()) {
+                log.debug("acquired: {}, remaining:{}", permits, remaining);
+            }
+            return permits;
+        } else {
+            int acquired = remaining;
+            remaining = 0;
+            if (log.isDebugEnabled()) {
+                log.debug("acquired: {}, remaining:{}", acquired, remaining);
+            }
+            return acquired;
+        }
+    }
+
+    public void timingOpen(long closeAfter, final TimeUnit unit) {
+        if (closeAfter <= 0) {
+            this.closeAfterAtMs = 0;
+        } else {
+            this.closeAfterAtMs = System.currentTimeMillis() + 
unit.toMillis(closeAfter);
+        }
+    }
+
+    private void mayRenew(long now) {
+        if (now > lastTimeReset + periodAtMs) {
+            remaining = rate;
+            lastTimeReset = now;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
index 0ad8ca8f1c7..d6210767ed1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.embedded.EmbeddedChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import java.util.Queue;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
 import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
@@ -71,7 +72,7 @@ public class ClientChannelHelper {
     private final PulsarDecoder decoder = new PulsarDecoder() {
 
         @Override
-        protected void messageReceived() {
+        protected void messageReceived(BaseCommand cmd) {
         }
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index e66880738cf..b7e0ee42903 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -46,6 +46,7 @@ import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook;
 import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSubscribeHook;
 import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandTopicLookupHook;
 import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandUnsubscribeHook;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
@@ -132,7 +133,7 @@ public class MockBrokerService {
         }
 
         @Override
-        protected void messageReceived() {
+        protected void messageReceived(BaseCommand cmd) {
         }
 
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
new file mode 100644
index 00000000000..aa3e17c2ea0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerBackPressureTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PatternConsumerBackPressureTest extends 
MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeMethod
+    protected void setup() throws Exception {
+        isTcpLookup = true;
+        conf.setEnableBrokerSideSubscriptionPatternEvaluation(false);
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        conf.setPulsarChannelPauseReceivingRequestsIfUnwritable(true);
+        // 5m.
+        conf.setPulsarChannelWriteBufferHighWaterMark(1 * 1024 * 1024);
+        // 32k.
+        conf.setPulsarChannelWriteBufferLowWaterMark(32 * 1024);
+    }
+
+    @Test(timeOut = 60 * 1000)
+    public void testInfiniteGetThousandsTopics() throws PulsarAdminException, 
InterruptedException {
+        final int topicCount = 8192;
+        final int requests = 2048;
+        final String topicName = UUID.randomUUID().toString();
+        admin.topics().createPartitionedTopic(topicName, topicCount);
+        final ExecutorService executorService = 
Executors.newFixedThreadPool(Runtime.getRuntime()
+                .availableProcessors());
+
+        final PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) 
pulsarClient;
+        final AtomicInteger success = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(requests);
+        for (int i = 0; i < requests; i++) {
+            executorService.execute(() -> {
+                pulsarClientImpl.getLookup()
+                    .getTopicsUnderNamespace(NamespaceName.get("public", 
"default"),
+                            CommandGetTopicsOfNamespace.Mode.PERSISTENT, ".*", 
"")
+                    .whenComplete((result, ex) -> {
+                        if (ex == null) {
+                            success.incrementAndGet();
+                        } else {
+                            log.error("Failed to get topic list.", ex);
+                        }
+                        log.info("latch-count: {}, succeed: {}", 
latch.getCount(), success.get());
+                        latch.countDown();
+                    });
+            });
+        }
+        latch.await();
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(success.get(), requests);
+        });
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
new file mode 100644
index 00000000000..183d68e81cc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TimedSingleThreadRateLimiterTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import static org.testng.Assert.assertEquals;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+/**
+ * Comprehensive test suite for TimedSingleThreadRateLimiter class.
+ */
+public class TimedSingleThreadRateLimiterTest {
+
+    @Test
+    public void testConstructorAndGetters() {
+        int rate = 100;
+        long period = 5;
+        TimeUnit unit = TimeUnit.SECONDS;
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(rate, period, unit);
+        assertEquals(limiter.getRate(), rate);
+        assertEquals(limiter.getPeriodAtMs(), unit.toMillis(period));
+        assertEquals(limiter.getRemaining(), rate); // Initially should have 
all permits
+    }
+
+    @Test
+    public void testConstructorWithDifferentTimeUnits() {
+        // Test with milliseconds
+        TimedSingleThreadRateLimiter limiterMs = new 
TimedSingleThreadRateLimiter(50, 1000, TimeUnit.MILLISECONDS);
+        assertEquals(limiterMs.getPeriodAtMs(), 1000);
+        // Test with seconds
+        TimedSingleThreadRateLimiter limiterSec = new 
TimedSingleThreadRateLimiter(50, 2, TimeUnit.SECONDS);
+        assertEquals(limiterSec.getPeriodAtMs(), 2000);
+        // Test with minutes
+        TimedSingleThreadRateLimiter limiterMin = new 
TimedSingleThreadRateLimiter(50, 1, TimeUnit.MINUTES);
+        assertEquals(limiterMin.getPeriodAtMs(), 60000);
+    }
+
+    @Test
+    public void testBasicAcquire() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Test acquiring single permit
+        int acquired = limiter.acquire(1);
+        assertEquals(acquired, 1);
+        assertEquals(limiter.getRemaining(), 99);
+        // Test acquiring multiple permits
+        acquired = limiter.acquire(10);
+        assertEquals(acquired, 10);
+        assertEquals(limiter.getRemaining(), 89);
+    }
+
+    @Test
+    public void testAcquireMoreThanRemaining() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Acquire most permits
+        int acquired = limiter.acquire(8);
+        assertEquals(acquired, 8);
+        assertEquals(limiter.getRemaining(), 2);
+        // Try to acquire more than remaining
+        acquired = limiter.acquire(5);
+        assertEquals(acquired, 2); // Should only get remaining permits
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testAcquireWhenNoPermitsRemaining() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(5, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Exhaust all permits
+        limiter.acquire(5);
+        assertEquals(limiter.getRemaining(), 0);
+        // Try to acquire when no permits left
+        int acquired = limiter.acquire(3);
+        assertEquals(acquired, 0);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testAcquireZeroPermits() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        int acquired = limiter.acquire(0);
+        assertEquals(acquired, 0);
+        assertEquals(limiter.getRemaining(), 10); // Should remain unchanged
+    }
+
+    @Test
+    public void testPermitRenewalAfterPeriod() throws InterruptedException {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Exhaust all permits
+        limiter.acquire(10);
+        assertEquals(limiter.getRemaining(), 0);
+        // Wait for period to pass
+        Thread.sleep(150);
+        // Acquire should trigger renewal
+        int acquired = limiter.acquire(5);
+        assertEquals(acquired, 5);
+        assertEquals(limiter.getRemaining(), 5);
+    }
+
+    @Test
+    public void testNoRenewalBeforePeriodExpires() throws InterruptedException 
{
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Exhaust all permits
+        limiter.acquire(10);
+        assertEquals(limiter.getRemaining(), 0);
+        // Should not renew yet
+        int acquired = limiter.acquire(5);
+        assertEquals(acquired, 0);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testTimingOpen() throws Exception {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        // Set timing to open for 500ms
+        limiter.timingOpen(500, TimeUnit.MILLISECONDS);
+        // During open period.
+        int acquired = limiter.acquire(15);
+        assertEquals(acquired, 10);
+        assertEquals(limiter.getRemaining(), 0);
+        // Closed.
+        Thread.sleep(1000);
+        int acquired2 = limiter.acquire(1000);
+        assertEquals(acquired2, 1000);
+    }
+
+    @Test
+    public void testTimingOpenWithZeroDuration() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        // Set timing to open for 0 duration.
+        limiter.timingOpen(0, TimeUnit.MILLISECONDS);
+        // Closed.
+        int acquired = limiter.acquire(7000);
+        assertEquals(acquired, 7000);
+    }
+
+    @Test
+    public void testHighRateAcquisition() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(1000, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Acquire permits in chunks
+        int totalAcquired = 0;
+        for (int i = 0; i < 10; i++) {
+            totalAcquired += limiter.acquire(100);
+        }
+        assertEquals(totalAcquired, 1000);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testLowRateAcquisition() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(3, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Acquire all permits one by one
+        assertEquals(limiter.acquire(1), 1);
+        assertEquals(limiter.getRemaining(), 2);
+        assertEquals(limiter.acquire(1), 1);
+        assertEquals(limiter.getRemaining(), 1);
+        assertEquals(limiter.acquire(1), 1);
+        assertEquals(limiter.getRemaining(), 0);
+        // No more permits available
+        assertEquals(limiter.acquire(1), 0);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testRenewalWithPartialAcquisition() throws 
InterruptedException {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 100, TimeUnit.MILLISECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Acquire some permits
+        limiter.acquire(6);
+        assertEquals(limiter.getRemaining(), 4);
+        // Wait for renewal
+        Thread.sleep(150);
+        // After renewal, should have full rate again
+        int acquired = limiter.acquire(8);
+        assertEquals(acquired, 8);
+        assertEquals(limiter.getRemaining(), 2);
+    }
+
+    @Test
+    public void testConcurrentBehaviorSimulation() throws InterruptedException 
{
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(20, 100, TimeUnit.MILLISECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Simulate rapid acquisitions
+        int totalAcquired = 0;
+        for (int i = 0; i < 5; i++) {
+            totalAcquired += limiter.acquire(5);
+        }
+        assertEquals(totalAcquired, 20);
+        assertEquals(limiter.getRemaining(), 0);
+        // Wait for renewal
+        Thread.sleep(150);
+        // Should be able to acquire again
+        int newAcquired = limiter.acquire(10);
+        assertEquals(newAcquired, 10);
+        assertEquals(limiter.getRemaining(), 10);
+    }
+
+    @Test
+    public void testVeryShortPeriod() throws InterruptedException {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(5, 10, TimeUnit.MILLISECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Exhaust permits
+        limiter.acquire(5);
+        assertEquals(limiter.getRemaining(), 0);
+        // Wait for very short period
+        Thread.sleep(20);
+        // Should renew quickly
+        int acquired = limiter.acquire(3);
+        assertEquals(acquired, 3);
+        assertEquals(limiter.getRemaining(), 2);
+    }
+
+    @Test
+    public void testVeryLongPeriod() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.HOURS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        assertEquals(limiter.getPeriodAtMs(), TimeUnit.HOURS.toMillis(1));
+        // Acquire some permits
+        int acquired = limiter.acquire(7);
+        assertEquals(acquired, 7);
+        assertEquals(limiter.getRemaining(), 3);
+        // Even after a short wait, should not renew (period is 1 hour)
+        int acquired2 = limiter.acquire(5);
+        assertEquals(acquired2, 3); // Only remaining permits
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testSinglePermitRate() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(1, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        assertEquals(limiter.getRate(), 1);
+        assertEquals(limiter.getRemaining(), 1);
+        // Acquire the only permit
+        int acquired = limiter.acquire(1);
+        assertEquals(acquired, 1);
+        assertEquals(limiter.getRemaining(), 0);
+        // Try to acquire more
+        acquired = limiter.acquire(1);
+        assertEquals(acquired, 0);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testLargePermitRequest() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Request much more than available
+        int acquired = limiter.acquire(1000);
+        assertEquals(acquired, 10); // Should get all available permits
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testNegativePermitRequest() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(10, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Request negative permits (edge case)
+        int acquired = limiter.acquire(-5);
+        // The implementation doesn't explicitly handle negative permits
+        // This test documents the current behavior
+        assertEquals(acquired, 0); // Should not return negative
+        assertEquals(limiter.getRemaining(), 10); // Remaining should not go 
negative
+    }
+
+    @Test
+    public void testMultipleRenewalCycles() throws InterruptedException {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(5, 50, TimeUnit.MILLISECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // First cycle
+        limiter.acquire(5);
+        assertEquals(limiter.getRemaining(), 0);
+        // Wait for first renewal
+        Thread.sleep(60);
+        limiter.acquire(3);
+        assertEquals(limiter.getRemaining(), 2);
+        // Wait for second renewal
+        Thread.sleep(60);
+        int acquired = limiter.acquire(4);
+        assertEquals(acquired, 4);
+        assertEquals(limiter.getRemaining(), 1);
+        // Wait for third renewal
+        Thread.sleep(60);
+        acquired = limiter.acquire(5);
+        assertEquals(acquired, 5);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testRapidAcquisitionPattern() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(100, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Simulate rapid small acquisitions
+        int totalAcquired = 0;
+        for (int i = 0; i < 50; i++) {
+            totalAcquired += limiter.acquire(2);
+        }
+        assertEquals(totalAcquired, 100);
+        assertEquals(limiter.getRemaining(), 0);
+    }
+
+    @Test
+    public void testBurstAcquisitionPattern() {
+        TimedSingleThreadRateLimiter limiter = new 
TimedSingleThreadRateLimiter(50, 1, TimeUnit.SECONDS);
+        limiter.timingOpen(10, TimeUnit.SECONDS);
+        // Large burst acquisition
+        int acquired1 = limiter.acquire(30);
+        assertEquals(acquired1, 30);
+        assertEquals(limiter.getRemaining(), 20);
+        // Another burst
+        int acquired2 = limiter.acquire(25);
+        assertEquals(acquired2, 20); // Only remaining permits
+        assertEquals(limiter.getRemaining(), 0);
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index c05b1d796df..b61664d9571 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -124,7 +124,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Received cmd {}", ctx.channel(), 
cmd.getType());
             }
-            messageReceived();
+            messageReceived(cmd);
 
             switch (cmd.getType()) {
             case PARTITIONED_METADATA:
@@ -486,7 +486,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         }
     }
 
-    protected abstract void messageReceived();
+    protected abstract void messageReceived(BaseCommand cmd);
 
     private ServerError getServerError(int errorCode) {
         ServerError serverError = ServerError.valueOf(errorCode);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index 020b753086f..e8010ea1a51 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -26,6 +26,7 @@ import io.netty.util.concurrent.ScheduledFuture;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
 import lombok.Setter;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
@@ -61,7 +62,7 @@ public abstract class PulsarHandler extends PulsarDecoder {
     }
 
     @Override
-    protected void messageReceived() {
+    protected void messageReceived(BaseCommand cmd) {
         waitingForPingResponse = false;
     }
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
index 6fd77c16624..af68f6cd28b 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/PulsarDecoderTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange;
 import org.testng.annotations.Test;
 
@@ -43,7 +44,7 @@ public class PulsarDecoderTest {
             }
 
             @Override
-            protected void messageReceived() {
+            protected void messageReceived(BaseCommand cmd) {
             }
         });
         decoder.channelRead(mock(ChannelHandlerContext.class), cmdBuf);
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
index 775108a75e6..5f4456d356e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java
@@ -55,6 +55,7 @@ import 
org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.AuthData;
+import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAuthChallenge;
 import org.apache.pulsar.common.api.proto.CommandConnected;
 import org.apache.pulsar.common.api.proto.FeatureFlags;
@@ -395,7 +396,7 @@ public class DirectProxyHandler {
         }
 
         @Override
-        protected void messageReceived() {
+        protected void messageReceived(BaseCommand cmd) {
             // no-op
         }
 


Reply via email to