This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8259c3f2d6f [fix][broker] Fix the non-persistenttopic's replicator 
always get error "Producer send queue is full" if set a small value of the 
config replicationProducerQueueSize (#24424)
8259c3f2d6f is described below

commit 8259c3f2d6fa2f76e605cea3783d5bcfe3730857
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 7 11:23:49 2025 +0800

    [fix][broker] Fix the non-persistenttopic's replicator always get error 
"Producer send queue is full" if set a small value of the config 
replicationProducerQueueSize (#24424)
    
    Co-authored-by: Penghui Li <[email protected]>
    (cherry picked from commit b41599e43949b23ef4743c89885058b86807f59d)
---
 .../pulsar/broker/service/AbstractReplicator.java  |  3 ++-
 .../nonpersistent/NonPersistentReplicator.java     | 21 +++++++++------
 .../broker/service/OneWayReplicatorTest.java       | 30 ++++++++++++++++++++++
 ...OneWayReplicatorUsingGlobalPartitionedTest.java |  6 +++++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  6 +++++
 .../impl/conf/ProducerConfigurationData.java       |  4 +++
 6 files changed, 61 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index b438b848b25..8a3b2ea471d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -57,7 +57,8 @@ public abstract class AbstractReplicator implements 
Replicator {
     protected final PulsarClientImpl client;
     protected String replicatorId;
     protected final Topic localTopic;
-
+    @VisibleForTesting
+    @Getter
     protected volatile ProducerImpl producer;
     public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index c857f13c433..79325361f76 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -23,24 +23,25 @@ import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import 
org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
 import org.apache.pulsar.common.stats.Rate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.util.FutureUtil;
 
+@Slf4j
 public class NonPersistentReplicator extends AbstractReplicator implements 
Replicator {
 
     private final Rate msgOut = new Rate();
@@ -52,9 +53,9 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
             BrokerService brokerService, PulsarClientImpl replicationClient) 
throws PulsarServerException {
         super(localCluster, topic, remoteCluster, topic.getName(), 
topic.getReplicatorPrefix(), brokerService,
                 replicationClient);
-
+        // NonPersistentReplicator does not support limitation so far, so 
reset pending queue size to the default value.
+        producerBuilder.maxPendingMessages(1000);
         producerBuilder.blockIfQueueFull(false);
-
         startProducer();
     }
 
@@ -174,7 +175,13 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         @Override
         public void sendComplete(Exception exception) {
             if (exception != null) {
-                log.error("[{}] Error producing on remote broker", 
replicator.replicatorId, exception);
+                Throwable actEx = 
FutureUtil.unwrapCompletionException(exception);
+                if (actEx instanceof 
PulsarClientException.ProducerQueueIsFullError) {
+                    log.warn("[{}] Discard to replicate non-persistent 
messages to the remote cluster because the"
+                        + " producer pending queue is full", 
replicator.replicatorId);
+                } else {
+                    log.error("[{}] Error producing on remote broker", 
replicator.replicatorId, exception);
+                }
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Message persisted on remote broker", 
replicator.replicatorId);
@@ -238,8 +245,6 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
         }
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(PersistentReplicator.class);
-
     @Override
     protected Position getReplicatorReadPosition() {
         // No-op
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 57c46aab7ba..4426f0a08f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -74,6 +74,8 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import 
org.apache.pulsar.broker.service.persistent.BrokerServicePersistInternalMethodInvoker;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
@@ -244,6 +246,34 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
         });
     }
 
+    /**
+     * Since {@link NonPersistentReplicator} never implement the rate 
limitation, the config
+     * "replicationProducerQueueSize" should not affect {@link 
NonPersistentReplicator}.
+     */
+    @Test
+    public void testNonPersistentReplicatorQueueSize() throws Exception {
+        
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize", 
"2");
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(pulsar1.getConfig().getReplicationProducerQueueSize(), 2);
+        });
+        final String topicName = 
BrokerTestUtil.newUniqueName("non-persistent://" + replicatedNamespace + 
"/tp_");
+        Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        // Wait for replicator started.
+        Awaitility.await().untilAsserted(() -> {
+            Optional<Topic> topicOptional2 = 
pulsar2.getBrokerService().getTopic(topicName, false).get();
+            assertTrue(topicOptional2.isPresent());
+            NonPersistentTopic persistentTopic2 = (NonPersistentTopic) 
topicOptional2.get();
+            assertFalse(persistentTopic2.getProducers().isEmpty());
+        });
+
+        NonPersistentTopic topic = (NonPersistentTopic) 
broker1.getTopic(topicName, false).get().get();
+        NonPersistentReplicator nonPersistentReplicator = 
topic.getReplicators().get(cluster2);
+        
assertEquals(nonPersistentReplicator.getProducer().getConfiguration().getMaxPendingMessages(),
 1000);
+        // cleanup.
+        producer1.close();
+        
admin1.brokers().updateDynamicConfiguration("replicationProducerQueueSize", 
"1000");
+    }
+
     @Test(timeOut = 45 * 1000)
     public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws 
Exception {
         final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index d69066f8c37..fcc80228bec 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -168,6 +168,12 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         super.testDifferentTopicCreationRule(replicationMode);
     }
 
+    @Test(enabled = false)
+    @Override
+    public void testNonPersistentReplicatorQueueSize() throws Exception {
+        super.testNonPersistentReplicatorQueueSize();
+    }
+
     @Test(timeOut = 60_000)
     public void testRemoveCluster() throws Exception {
         // Initialize.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index f83c6f2b136..be6aee4c494 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -180,6 +180,12 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
         super.testDifferentTopicCreationRule(replicationMode);
     }
 
+    @Test
+    @Override
+    public void testNonPersistentReplicatorQueueSize() throws Exception {
+        super.testNonPersistentReplicatorQueueSize();
+    }
+
     @Test
     public void testRemoveCluster() throws Exception {
         // Initialize.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 93261a3b7f6..a7da8259939 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.conf;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Sets;
 import io.swagger.annotations.ApiModelProperty;
 import java.io.Serializable;
@@ -30,6 +31,7 @@ import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.Getter;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatcherBuilder;
@@ -94,6 +96,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
                     + "By default, when the queue is full, all calls to the 
`Send` and `SendAsync` methods fail"
                     + " **unless** you set `BlockIfQueueFull` to `true`."
     )
+    @VisibleForTesting
+    @Getter
     private int maxPendingMessages = DEFAULT_MAX_PENDING_MESSAGES;
 
     @ApiModelProperty(

Reply via email to