This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 524db74c58f [improve][broker] PIP-434: add configurations to
broker.conf (#24800)
524db74c58f is described below
commit 524db74c58f86691d85f50b032dd0b0d32abe06b
Author: fengyubiao <[email protected]>
AuthorDate: Thu Oct 9 12:30:37 2025 +0800
[improve][broker] PIP-434: add configurations to broker.conf (#24800)
---
conf/broker.conf | 31 ++++++++++++++++++++++
.../apache/pulsar/broker/ServiceConfiguration.java | 6 ++---
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../pulsar/broker/service/StandaloneTest.java | 6 +++++
.../common/naming/ServiceConfigurationTest.java | 7 ++++-
.../configurations/pulsar_broker_test.conf | 7 +++++
.../pulsar_broker_test_standalone.conf | 8 ++++++
7 files changed, 62 insertions(+), 5 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index a1f59bd3eed..8fd0e18af36 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -330,6 +330,37 @@ maxTopicsPerNamespace=0
# The maximum number of connections in the broker. If it exceeds, new
connections are rejected.
brokerMaxConnections=0
+# It relates to configuration "WriteBufferHighWaterMark" of Netty Channel
Config. If the number of bytes
+# queued in the write buffer exceeds this value, channel writable state will
start to return "false".
+pulsarChannelWriteBufferHighWaterMark=65536
+
+# It relates to configuration "WriteBufferLowWaterMark" of Netty Channel
Config. If the number of bytes"
+# queued in the write buffer is smaller than this value, channel writable
state will start to return "true".
+pulsarChannelWriteBufferLowWaterMark=32768
+
+# If enabled, the broker will pause reading from the channel to deal with new
request once the writer
+# buffer is full, until it is changed to writable.
+pulsarChannelPauseReceivingRequestsIfUnwritable=false
+
+# After the connection is recovered from a pause receiving state, the channel
will be rate-limited
+# for a time window to avoid overwhelming due to the backlog of requests. This
parameter defines
+# how long the rate limiting should last, in millis. Once the bytes that are
waiting to be sent out
+# reach the "pulsarChannelWriteBufferHighWaterMark"? the timer will be reset.
Setting a negative
+# value will disable the rate limiting.
+pulsarChannelPauseReceivingCooldownMs=5000
+
+# After the connection is recovered from a pause receiving state, the channel
will be rate-limited for a
+# period of time to avoid overwhelming due to the backlog of requests. This
parameter defines how
+# many requests should be allowed in the rate limiting period.
+pulsarChannelPauseReceivingCooldownRateLimitPermits=5
+
+# After the connection is recovered from a pause receiving state, the channel
will be rate-limited for a
+# period of time defined by pulsarChannelPauseReceivingCooldownMs to avoid
overwhelming due to the
+# backlog of requests. This parameter defines the period of the rate limiter
in milliseconds. If the rate
+# limit period is set to 1000, then the unit is requests per 1000
milliseconds. When it's 10, the unit
+# is requests per every 10ms.
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=10
+
# The maximum number of connections per IP. If it exceeds, new connections are
rejected.
brokerMaxConnectionsPerIp=0
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a59cf07075a..5ca0db944a4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -951,8 +951,8 @@ public class ServiceConfiguration implements
PulsarConfiguration {
@FieldContext(
category = CATEGORY_POLICIES,
- doc = "After the connection is recovered from an pause receiving
state, the channel will be rate-limited"
- + " for a of time window to avoid overwhelming due to the
backlog of requests. This parameter defines"
+ doc = "After the connection is recovered from a pause receiving
state, the channel will be rate-limited"
+ + " for a time window to avoid overwhelming due to the backlog
of requests. This parameter defines"
+ " how long the rate limiting should last, in millis. Once
the bytes that are waiting to be sent out"
+ " reach the \"pulsarChannelWriteBufferHighWaterMark\", the
timer will be reset. Setting a negative"
+ " value will disable the rate limiting."
@@ -973,7 +973,7 @@ public class ServiceConfiguration implements
PulsarConfiguration {
doc = "After the connection is recovered from a pause receiving state,
the channel will be rate-limited for a"
+ " period of time defined by
pulsarChannelPauseReceivingCooldownMs to avoid overwhelming due to the"
+ " backlog of requests. This parameter defines the period of the
rate limiter in milliseconds. If the rate"
- + " limit period is set to 1000, then the unit is requests per
1000 milli seconds. When it's 10, the unit"
+ + " limit period is set to 1000, then the unit is requests per
1000 milliseconds. When it's 10, the unit"
+ " is requests per every 10ms."
)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 011b54c0b0a..4d21b2810cd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -489,7 +489,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
ctx.channel().config().setAutoRead(true);
pausedDueToRateLimitation = false;
}
- }, 1, TimeUnit.SECONDS);
+ }, requestRateLimiter.getPeriodAtMs(), TimeUnit.MILLISECONDS);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
index dc35e2d382d..9ab972e6be5 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/StandaloneTest.java
@@ -65,5 +65,11 @@ public class StandaloneTest {
assertEquals(standalone.getConfig().getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(standalone.getConfig().getTopicNameCacheMaxCapacity(),
200);
assertEquals(standalone.getConfig().isCreateTopicToRemoteClusterForReplication(),
true);
+
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferHighWaterMark(),
60000);
+
assertEquals(standalone.getConfig().getPulsarChannelWriteBufferLowWaterMark(),
120000);
+
assertEquals(standalone.getConfig().isPulsarChannelPauseReceivingRequestsIfUnwritable(),
true);
+
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownMs(),
10_000);
+
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPermits(),
100);
+
assertEquals(standalone.getConfig().getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(),
200);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
index 1802bd6f59c..ed551c8c1bb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java
@@ -78,7 +78,12 @@ public class ServiceConfigurationTest {
assertEquals(config.isDispatcherPauseOnAckStatePersistentEnabled(),
true);
assertEquals(config.getMaxSecondsToClearTopicNameCache(), 1);
assertEquals(config.getTopicNameCacheMaxCapacity(), 200);
- assertEquals(config.isCreateTopicToRemoteClusterForReplication(),
false);
+ assertEquals(config.getPulsarChannelWriteBufferHighWaterMark(), 60000);
+ assertEquals(config.getPulsarChannelWriteBufferLowWaterMark(), 120000);
+
assertEquals(config.isPulsarChannelPauseReceivingRequestsIfUnwritable(), true);
+ assertEquals(config.getPulsarChannelPauseReceivingCooldownMs(),
10_000);
+
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPermits(),
100);
+
assertEquals(config.getPulsarChannelPauseReceivingCooldownRateLimitPeriodMs(),
200);
OffloadPoliciesImpl offloadPolicies =
OffloadPoliciesImpl.create(config.getProperties());
assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().getValue(),
"bookkeeper-first");
}
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index 0fdb29e0686..5ce477550e5 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -95,6 +95,13 @@
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
+pulsarChannelWriteBufferHighWaterMark=60000
+pulsarChannelWriteBufferLowWaterMark=120000
+pulsarChannelPauseReceivingRequestsIfUnwritable=true
+pulsarChannelPauseReceivingCooldownMs=10000
+pulsarChannelPauseReceivingCooldownRateLimitPermits=100
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200
+
### --- Transaction config variables --- ###
transactionLogBatchedWriteEnabled=true
transactionLogBatchedWriteMaxRecords=11
diff --git
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
index d3f9430f29b..0c393b1db62 100644
---
a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
+++
b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone.conf
@@ -95,6 +95,14 @@
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
dispatcherPauseOnAckStatePersistentEnabled=true
+
+pulsarChannelWriteBufferHighWaterMark=60000
+pulsarChannelWriteBufferLowWaterMark=120000
+pulsarChannelPauseReceivingRequestsIfUnwritable=true
+pulsarChannelPauseReceivingCooldownMs=10000
+pulsarChannelPauseReceivingCooldownRateLimitPermits=100
+pulsarChannelPauseReceivingCooldownRateLimitPeriodMs=200
+
topicNameCacheMaxCapacity=200
maxSecondsToClearTopicNameCache=1
createTopicToRemoteClusterForReplication=true