This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8259c3f2d6f [fix][broker] Fix the non-persistenttopic's replicator
always get error "Producer send queue is full" if set a small value of the
config replicationProducerQueueSize (#24424)
8259c3f2d6f is described below
commit 8259c3f2d6fa2f76e605cea3783d5bcfe3730857
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 7 11:23:49 2025 +0800
[fix][broker] Fix the non-persistenttopic's replicator always get error
"Producer send queue is full" if set a small value of the config
replicationProducerQueueSize (#24424)
Co-authored-by: Penghui Li <[email protected]>
(cherry picked from commit b41599e43949b23ef4743c89885058b86807f59d)
---
.../pulsar/broker/service/AbstractReplicator.java | 3 ++-
.../nonpersistent/NonPersistentReplicator.java | 21 +++++++++------
.../broker/service/OneWayReplicatorTest.java | 30 ++++++++++++++++++++++
...OneWayReplicatorUsingGlobalPartitionedTest.java | 6 +++++
.../service/OneWayReplicatorUsingGlobalZKTest.java | 6 +++++
.../impl/conf/ProducerConfigurationData.java | 4 +++
6 files changed, 61 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index b438b848b25..8a3b2ea471d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -57,7 +57,8 @@ public abstract class AbstractReplicator implements
Replicator {
protected final PulsarClientImpl client;
protected String replicatorId;
protected final Topic localTopic;
-
+ @VisibleForTesting
+ @Getter
protected volatile ProducerImpl producer;
public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index c857f13c433..79325361f76 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -23,24 +23,25 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.stats.Rate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.util.FutureUtil;
+@Slf4j
public class NonPersistentReplicator extends AbstractReplicator implements
Replicator {
private final Rate msgOut = new Rate();
@@ -52,9 +53,9 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
super(localCluster, topic, remoteCluster, topic.getName(),
topic.getReplicatorPrefix(), brokerService,
replicationClient);
-
+ // NonPersistentReplicator does not support limitation so far, so
reset pending queue size to the default value.
+ producerBuilder.maxPendingMessages(1000);
producerBuilder.blockIfQueueFull(false);
-
startProducer();
}
@@ -174,7 +175,13 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
@Override
public void sendComplete(Exception exception) {
if (exception != null) {
- log.error("[{}] Error producing on remote broker",
replicator.replicatorId, exception);
+ Throwable actEx =
FutureUtil.unwrapCompletionException(exception);
+ if (actEx instanceof
PulsarClientException.ProducerQueueIsFullError) {
+ log.warn("[{}] Discard to replicate non-persistent
messages to the remote cluster because the"
+ + " producer pending queue is full",
replicator.replicatorId);
+ } else {
+ log.error("[{}] Error producing on remote broker",
replicator.replicatorId, exception);
+ }
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Message persisted on remote broker",
replicator.replicatorId);
@@ -238,8 +245,6 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
}
}
- private static final Logger log =
LoggerFactory.getLogger(PersistentReplicator.class);
-
@Override
protected Position getReplicatorReadPosition() {
// No-op
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 57c46aab7ba..4426f0a08f9 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -74,6 +74,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
import
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
@@ -244,6 +246,34 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
});
}
+ /**
+ * Since {@link NonPersistentReplicator} never implement the rate
limitation, the config
+ * "replicationProducerQueueSize" should not affect {@link
NonPersistentReplicator}.
+ */
+ @Test
+ public void testNonPersistentReplicatorQueueSize() throws Exception {
+
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize",
"2");
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(), 2);
+ });
+ final String topicName =
BrokerTestUtil.newUniqueName("non-persistent://" + replicatedNamespace +
"/tp_");
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topicName).create();
+ // Wait for replicator started.
+ Awaitility.await().untilAsserted(() -> {
+ Optional<Topic> topicOptional2 =
pulsar2.getBrokerService().getTopic(topicName, false).get();
+ assertTrue(topicOptional2.isPresent());
+ NonPersistentTopic persistentTopic2 = (NonPersistentTopic)
topicOptional2.get();
+ assertFalse(persistentTopic2.getProducers().isEmpty());
+ });
+
+ NonPersistentTopic topic = (NonPersistentTopic)
broker1.getTopic(topicName, false).get().get();
+ NonPersistentReplicator nonPersistentReplicator =
topic.getReplicators().get(cluster2);
+
assertEquals(nonPersistentReplicator.getProducer().getConfiguration().getMaxPendingMessages(),
1000);
+ // cleanup.
+ producer1.close();
+
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize",
"1000");
+ }
+
@Test(timeOut = 45 * 1000)
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index d69066f8c37..fcc80228bec 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -168,6 +168,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest
extends OneWayReplicator
super.testDifferentTopicCreationRule(replicationMode);
}
+ @Test(enabled = false)
+ @Override
+ public void testNonPersistentReplicatorQueueSize() throws Exception {
+ super.testNonPersistentReplicatorQueueSize();
+ }
+
@Test(timeOut = 60_000)
public void testRemoveCluster() throws Exception {
// Initialize.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index f83c6f2b136..be6aee4c494 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -180,6 +180,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends
OneWayReplicatorTest {
super.testDifferentTopicCreationRule(replicationMode);
}
+ @Test
+ @Override
+ public void testNonPersistentReplicatorQueueSize() throws Exception {
+ super.testNonPersistentReplicatorQueueSize();
+ }
+
@Test
public void testRemoveCluster() throws Exception {
// Initialize.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 93261a3b7f6..a7da8259939 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
@@ -30,6 +31,7 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
@@ -94,6 +96,8 @@ public class ProducerConfigurationData implements
Serializable, Cloneable {
+ "By default, when the queue is full, all calls to the
`Send` and `SendAsync` methods fail"
+ " **unless** you set `BlockIfQueueFull` to `true`."
)
+ @VisibleForTesting
+ @Getter
private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES;
@ApiModelProperty(