This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 93518252fe9 [fix][broker] replication does not work due to the mixed 
and repetitive sending of user messages and replication markers (#24453)
93518252fe9 is described below

commit 93518252fe97e37ab6e2dd318b601c83d3df26e6
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 7 17:27:44 2025 +0800

    [fix][broker] replication does not work due to the mixed and repetitive 
sending of user messages and replication markers (#24453)
    
    (cherry picked from commit af27c43a72e7f91f9346c572dab5ec999ea6e553)
---
 .../service/OneWayReplicatorDeduplicationTest.java | 108 ++++++++++++++++++++-
 .../client/impl/GeoReplicationProducerImpl.java    |  25 ++++-
 2 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
index 48b9adb52a7..237935f05c6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorDeduplicationTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -50,8 +51,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import 
org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
@@ -72,6 +75,7 @@ import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
 import org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
+import org.apache.pulsar.common.api.proto.CommandSendReceipt;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
@@ -154,17 +158,117 @@ public class OneWayReplicatorDeduplicationTest extends 
OneWayReplicatorTestBase
         final var replicationClients = brokerService.getReplicationClients();
         PulsarClientImpl internalClient = (PulsarClientImpl) 
replicationClients.get(cluster2);
         PulsarClientImpl injectedClient = 
InjectedClientCnxClientBuilder.create(clientBuilder2, clientCnxFactory);
-        assertTrue(replicationClients.remove(cluster2, internalClient));
+        if (internalClient != null) {
+            assertTrue(replicationClients.remove(cluster2, internalClient));
+        }
         assertNull(replicationClients.putIfAbsent(cluster2, injectedClient));
 
         // Return a cleanup injection task;
         return () -> {
             assertTrue(replicationClients.remove(cluster2, injectedClient));
-            assertNull(replicationClients.putIfAbsent(cluster2, 
internalClient));
+            if (internalClient != null) {
+                assertNull(replicationClients.putIfAbsent(cluster2, 
internalClient));
+            }
             injectedClient.closeAsync();
         };
     }
 
+    @Test
+    public void testRepeatedlyPublishMixedMessageAndReplMarkers() throws 
Exception {
+        // Inject a mechanism to drop all send receipt, to implement a 
scenario: the messages of the internal producer
+        // of the replicator will be resent after a disconnection.
+        AtomicBoolean stuckSendReceipt = new AtomicBoolean(true);
+        Runnable cleanInjection = injectReplicatorClientCnx((conf, 
eventLoopGroup) -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) 
{
+            @Override
+            protected void handleSendReceipt(CommandSendReceipt sendReceipt) {
+                if (stuckSendReceipt.get()) {
+                    // discard all send receipt, to make the producer 
republish.
+                } else {
+                    super.handleSendReceipt(sendReceipt);
+                }
+            }
+        });
+
+        // Create topics.
+        // - Enable deduplication on the remote cluster.
+        // - Enable replicated subscription.
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+        final String subscription = "s1";
+        admin1.topics().createNonPartitionedTopic(topicName);
+        Consumer<Integer> consumer1 = 
client1.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
+                .replicateSubscriptionState(true).subscribe();
+        admin2.topics().createSubscription(topicName, subscription, 
MessageId.earliest);
+        consumer1.close();
+        waitReplicatorStarted(topicName);
+        PersistentTopic persistentTopic1 =
+                (PersistentTopic) 
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+        PersistentTopic persistentTopic2 =
+                (PersistentTopic) 
pulsar2.getBrokerService().getTopic(topicName, false).join().get();
+        admin2.topicPolicies().setDeduplicationStatus(topicName, true);
+        Awaitility.await().untilAsserted(() -> {
+            MessageDeduplication messageDeduplication2 = 
persistentTopic2.getMessageDeduplication();
+            assertEquals(String.valueOf(messageDeduplication2.getStatus()), 
"Enabled");
+        });
+        
assertTrue(persistentTopic1.getSubscriptions().get(subscription).isReplicated());
+        
assertTrue(persistentTopic1.getReplicatedSubscriptionController().isPresent());
+        ReplicatedSubscriptionsController replicatedSubscriptionsController =
+                persistentTopic1.getReplicatedSubscriptionController().get();
+        admin2.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+
+        // Publish mixed user messages and replicated markers/
+        // - Publish a user msg.
+        // - Replicated marker will be sent internally per seconds
+        Producer<Integer> producer = 
client1.newProducer(Schema.INT32).topic(topicName).create();
+        int messageCount = 15;
+        for (int i = 0; i < messageCount; i++) {
+            producer.send(i);
+            log.info("Sent message: {}", i);
+            // The replicated marker will be sent internally per seconds. We 
wait 1.2s here, a replicated marker will
+            // be sent.
+            Thread.sleep(1200);
+            log.info("latest replication snapshot: {}", 
replicatedSubscriptionsController.getLastCompletedSnapshotId());
+        }
+        producer.close();
+
+        //
+        GeoPersistentReplicator replicator = (GeoPersistentReplicator) 
persistentTopic1.getReplicators().get(cluster2);
+        long backlog = 
replicator.getCursor().getNumberOfEntriesInBacklog(true);
+        Awaitility.await().untilAsserted(() -> {
+            log.info("replication backlog: {}", backlog);
+            assertTrue(backlog >= messageCount * 2);
+        });
+
+        // 1. Remove the injection, which discarded send receipt of the 
internal producer of the replicator.
+        // 2. Trigger a reconnection of the internal producer of the 
replicator.
+        // The internal producer will retry to publish.
+        stuckSendReceipt.set(false);
+        replicator.producer.getClientCnx().ctx().channel().close();
+        Producer<Integer> producer2 = 
client1.newProducer(Schema.INT32).topic(topicName).create();
+        producer2.close();
+        waitReplicatorStarted(topicName);
+
+        // Verify: all messages are sent, and no duplicated messages.
+        Awaitility.await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
+            
assertEquals(replicator.getCursor().getNumberOfEntriesInBacklog(true), 0);
+        });
+        Consumer<Integer> consumer2 = 
client2.newConsumer(Schema.INT32).topic(topicName).subscriptionName(subscription)
+                .replicateSubscriptionState(true).subscribe();
+        for (int i = 0; i < messageCount; i++) {
+            Message<Integer> msg = consumer2.receive(2, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            log.info("Received message: {}", msg.getValue());
+            assertEquals(msg.getValue(), i);
+        }
+
+        // cleanup.
+        cleanInjection.run();
+        admin1.topics().deleteSubscription(topicName, subscription);
+        admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+        admin2.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster2));
+        admin1.topics().unload(topicName);
+        admin2.topics().unload(topicName);
+    }
+
     @DataProvider(name = "deduplicationArgs")
     public Object[][] deduplicationArgs() {
         return new Object[][] {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
index 2595ce4ba61..b5d6afc5432 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.api.proto.KeyValue;
+import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Markers;
 
@@ -160,9 +161,27 @@ public class GeoReplicationProducerImpl extends 
ProducerImpl{
             return;
         }
 
-        // Case-4: Unexpected
-        //   4-1: got null source cluster's entry position, which is 
unexpected.
-        //   4-2: unknown error, which is unexpected.
+        // Case-4: Received an out-of-order msg send receipt, and the first 
item in pending queue is a marker.
+        if (op.msg.getMessageBuilder().hasMarkerType()
+                && 
Markers.isReplicationMarker(op.msg.getMessageBuilder().getMarkerType())) {
+            log.warn("[{}] [{}] Received an out-of-order msg send receipt 
because enabled replicated subscription,"
+                    + " which is expected, it always happens when repeatedly 
publishing a pair of mixed"
+                    + " replication marker messages and user messages."
+                    + " source position {}:{}, pending send[marker type: {}]: 
{}:{}, latest persisted: {}:{}."
+                    + " Drop the pending publish marker command because it is 
a marker and it almost no effect.",
+                    topic, producerName, sourceLId, sourceEId,
+                    
MarkerType.valueOf(op.msg.getMessageBuilder().getMarkerType()), pendingLId, 
pendingEId,
+                    lastPersistedSourceLedgerId, lastPersistedSourceEntryId);
+            // Drop pending marker. The next ack receipt of this marker 
message will be dropped after it come in.
+            ackReceivedReplMarker(cnx, op, op.sequenceId, -1 /*non-batch 
message*/, -1, -1);
+            // Handle the current send receipt.
+            ackReceived(cnx, sourceLId, sourceEId, targetLId, targetEid);
+            return;
+        }
+
+        // Case-5: Unexpected
+        //   5-1: got null source cluster's entry position, which is 
unexpected.
+        //   5-2: unknown error, which is unexpected.
         log.error("[{}] [{}] Received an msg send receipt[error]: source entry 
{}:{}, target entry: {}:{},"
                 + " pending send: {}:{}, latest persisted: {}:{}, queue-size: 
{}",
                 topic, producerName, sourceLId, sourceEId, targetLId, 
targetEid, pendingLId, pendingEId,

Reply via email to