This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c9669bdb194dbfe0de61f5dedbb60706ae6744d3
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Wed Jun 19 15:13:57 2024 +0800

    [fix] [broker] Messages lost on the remote cluster when using topic level 
replication (#22890)
    
    (cherry picked from commit feae58988d672767c076daa0c7caa5613cbba36e)
---
 .../broker/service/persistent/PersistentTopic.java |  49 +++++-----
 .../broker/service/OneWayReplicatorTest.java       | 103 +++++++++++++++++++++
 .../broker/service/OneWayReplicatorTestBase.java   |  22 +++++
 .../service/OneWayReplicatorUsingGlobalZKTest.java |   5 +
 4 files changed, 153 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5f951e0b32d..f363132f944 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -405,13 +405,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             this.createPersistentSubscriptions();
         }));
 
-        for (ManagedCursor cursor : ledger.getCursors()) {
-            if (cursor.getName().startsWith(replicatorPrefix)) {
-                String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
-                String remoteCluster = 
PersistentReplicator.getRemoteCluster(cursor.getName());
-                futures.add(addReplicationCluster(remoteCluster, cursor, 
localCluster));
-            }
-        }
         return FutureUtil.waitForAll(futures).thenCompose(__ ->
             brokerService.pulsar().getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
@@ -442,6 +435,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                     isAllowAutoUpdateSchema = 
policies.is_allow_auto_update_schema;
                 }, getOrderedExecutor())
                 .thenCompose(ignore -> initTopicPolicy())
+                .thenCompose(ignore -> removeOrphanReplicationCursors())
                 .exceptionally(ex -> {
                     log.warn("[{}] Error getting policies {} and 
isEncryptionRequired will be set to false",
                             topic, ex.getMessage());
@@ -519,6 +513,21 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         checkReplicatedSubscriptionControllerState();
     }
 
+    private CompletableFuture<Void> removeOrphanReplicationCursors() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        List<String> replicationClusters = 
topicPolicies.getReplicationClusters().get();
+        for (ManagedCursor cursor : ledger.getCursors()) {
+            if (cursor.getName().startsWith(replicatorPrefix)) {
+                String remoteCluster = 
PersistentReplicator.getRemoteCluster(cursor.getName());
+                if (!replicationClusters.contains(remoteCluster)) {
+                    log.warn("Remove the orphan replicator because the cluster 
'{}' does not exist", remoteCluster);
+                    futures.add(removeReplicator(remoteCluster));
+                }
+            }
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
     /**
      * Unload a subscriber.
      * @throws SubscriptionNotFoundException If subscription not founded.
@@ -2001,30 +2010,18 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return future;
     }
 
-    private CompletableFuture<Boolean> checkReplicationCluster(String 
remoteCluster) {
-        return 
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
-                .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
-                .thenApply(optPolicies -> optPolicies.map(policies -> 
policies.replication_clusters)
-                        .orElse(Collections.emptySet()).contains(remoteCluster)
-                        || 
topicPolicies.getReplicationClusters().get().contains(remoteCluster));
-    }
-
     protected CompletableFuture<Void> addReplicationCluster(String 
remoteCluster, ManagedCursor cursor,
             String localCluster) {
         return 
AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(),
 brokerService)
-                .thenCompose(__ -> checkReplicationCluster(remoteCluster))
-                .thenCompose(clusterExists -> {
-                    if (!clusterExists) {
-                        log.warn("Remove the replicator because the cluster 
'{}' does not exist", remoteCluster);
-                        return removeReplicator(remoteCluster).thenApply(__ -> 
null);
-                    }
-                    return 
brokerService.pulsar().getPulsarResources().getClusterResources()
-                            .getClusterAsync(remoteCluster)
-                            .thenApply(clusterData ->
-                                    
brokerService.getReplicationClient(remoteCluster, clusterData));
-                })
+                .thenCompose(__ -> 
brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(remoteCluster)
+                        .thenApply(clusterData ->
+                                
brokerService.getReplicationClient(remoteCluster, clusterData)))
                 .thenAccept(replicationClient -> {
                     if (replicationClient == null) {
+                        log.error("[{}] Can not create replicator because the 
remote client can not be created."
+                                        + " remote cluster: {}. State of 
transferring : {}",
+                                topic, remoteCluster, transferring);
                         return;
                     }
                     lock.readLock().lock();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 9b3e9d81d84..364afcda7f3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.SneakyThrows;
@@ -55,7 +56,10 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -75,6 +79,7 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -774,4 +779,102 @@ public class OneWayReplicatorTest extends 
OneWayReplicatorTestBase {
             admin2.topics().deletePartitionedTopic(topicName, false);
         });
     }
+
+    private String getTheLatestMessage(String topic, PulsarClient client, 
PulsarAdmin admin) throws Exception {
+        String dummySubscription = "s_" + 
UUID.randomUUID().toString().replace("-", "");
+        admin.topics().createSubscription(topic, dummySubscription, 
MessageId.earliest);
+        Consumer<String> c = 
client.newConsumer(Schema.STRING).topic(topic).subscriptionName(dummySubscription)
+                .subscribe();
+        String lastMsgValue = null;
+        while (true) {
+            Message<String> msg = c.receive(2, TimeUnit.SECONDS);
+            if (msg == null) {
+                break;
+            }
+            lastMsgValue = msg.getValue();
+        }
+        c.unsubscribe();
+        return lastMsgValue;
+    }
+
+    enum ReplicationLevel {
+        TOPIC_LEVEL,
+        NAMESPACE_LEVEL;
+    }
+
+    @DataProvider(name = "replicationLevels")
+    public Object[][] replicationLevels() {
+        return new Object[][]{
+            {ReplicationLevel.TOPIC_LEVEL},
+            {ReplicationLevel.NAMESPACE_LEVEL}
+        };
+    }
+
+    @Test(dataProvider = "replicationLevels")
+    public void testReloadWithTopicLevelGeoReplication(ReplicationLevel 
replicationLevel) throws Exception {
+        final String topicName = ((Supplier<String>) () -> {
+            if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
+                return BrokerTestUtil.newUniqueName("persistent://" + 
nonReplicatedNamespace + "/tp_");
+            } else {
+                return BrokerTestUtil.newUniqueName("persistent://" + 
replicatedNamespace + "/tp_");
+            }
+        }).get();
+        admin1.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createNonPartitionedTopic(topicName);
+        admin2.topics().createSubscription(topicName, "s1", 
MessageId.earliest);
+        if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+        } else {
+            pulsar1.getConfig().setTopicLevelPoliciesEnabled(false);
+        }
+        verifyReplicationWorks(topicName);
+
+        /**
+         * Verify:
+         * 1. Inject an error to make the replicator is not able to work.
+         * 2. Send one message, since the replicator does not work anymore, 
this message will not be replicated.
+         * 3. Unload topic, the replicator will be re-created.
+         * 4. Verify: the message can be replicated to the remote cluster.
+         */
+        // Step 1: Inject an error to make the replicator is not able to work.
+        Replicator replicator = broker1.getTopic(topicName, 
false).join().get().getReplicators().get(cluster2);
+        replicator.terminate();
+
+        // Step 2: Send one message, since the replicator does not work 
anymore, this message will not be replicated.
+        String msg = UUID.randomUUID().toString();
+        Producer p1 = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        p1.send(msg);
+        p1.close();
+        // The result of "peek message" will be the messages generated, so it 
is not the same as the message just sent.
+        Thread.sleep(3000);
+        assertNotEquals(getTheLatestMessage(topicName, client2, admin2), msg);
+        
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(),
 1);
+
+        // Step 3: Unload topic, the replicator will be re-created.
+        admin1.topics().unload(topicName);
+
+        // Step 4. Verify: the message can be replicated to the remote cluster.
+        Awaitility.await().atMost(Duration.ofSeconds(300)).untilAsserted(() -> 
{
+            log.info("replication backlog: {}",
+                    
admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog());
+            
assertEquals(admin1.topics().getStats(topicName).getReplication().get(cluster2).getReplicationBacklog(),
 0);
+            assertEquals(getTheLatestMessage(topicName, client2, admin2), msg);
+        });
+
+        // Cleanup.
+        if (replicationLevel.equals(ReplicationLevel.TOPIC_LEVEL)) {
+            admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+            Awaitility.await().untilAsserted(() -> {
+                assertEquals(broker1.getTopic(topicName, 
false).join().get().getReplicators().size(), 0);
+            });
+            admin1.topics().delete(topicName, false);
+            admin2.topics().delete(topicName, false);
+        } else {
+            pulsar1.getConfig().setTopicLevelPoliciesEnabled(true);
+            cleanupTopics(() -> {
+                admin1.topics().delete(topicName);
+                admin2.topics().delete(topicName);
+            });
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 7372b2e4784..ffe6147412e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -350,6 +350,28 @@ public abstract class OneWayReplicatorTestBase extends 
TestRetrySupport {
     }
 
     protected void verifyReplicationWorks(String topic) throws Exception {
+        // Wait for replicator starting.
+        Awaitility.await().until(() -> {
+            try {
+                PersistentTopic persistentTopic = (PersistentTopic) 
pulsar1.getBrokerService()
+                                .getTopic(topic, false).join().get();
+                if (persistentTopic.getReplicators().size() > 0) {
+                    return true;
+                }
+            } catch (Exception ex) {}
+
+            try {
+                String partition0 = 
TopicName.get(topic).getPartition(0).toString();
+                PersistentTopic persistentTopic = (PersistentTopic) 
pulsar1.getBrokerService()
+                        .getTopic(partition0, false).join().get();
+                if (persistentTopic.getReplicators().size() > 0) {
+                    return true;
+                }
+            } catch (Exception ex) {}
+
+            return false;
+        });
+        // Verify: pub & sub.
         final String subscription = "__subscribe_1";
         final String msgValue = "__msg1";
         Producer<String> producer1 = 
client1.newProducer(Schema.STRING).topic(topic).create();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
index b4747a8bd0e..b8f8edce247 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -104,4 +104,9 @@ public class OneWayReplicatorUsingGlobalZKTest extends 
OneWayReplicatorTest {
     public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
         super.testExpandTopicPartitionsOnNamespaceLevelReplication();
     }
+
+    @Test(enabled = false)
+    public void testReloadWithTopicLevelGeoReplication(ReplicationLevel 
replicationLevel) throws Exception {
+        super.testReloadWithTopicLevelGeoReplication(replicationLevel);
+    }
 }

Reply via email to