This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c1b0454614b [fix][broker]Delete subscription and disconnect 
replicators after topic migration (#21029)
c1b0454614b is described below

commit c1b0454614b7903913cb0311bdcacf2118893fc9
Author: vineeth1995 <vineethreddypo...@gmail.com>
AuthorDate: Tue Aug 22 09:52:30 2023 -0700

    [fix][broker]Delete subscription and disconnect replicators after topic 
migration (#21029)
    
    Co-authored-by: Vineeth Polamreddy <vineeth.polamre...@verizonmedia.com>
---
 .../service/nonpersistent/NonPersistentTopic.java  | 21 ++++++++++
 .../broker/service/persistent/PersistentTopic.java | 47 +++++++++++++++++-----
 .../broker/service/ClusterMigrationTest.java       |  9 +++++
 3 files changed, 66 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c764283cb44..836e5655168 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -959,10 +959,31 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
                     consumer.topicMigrated(url);
                 });
             });
+            return disconnectReplicators().thenCompose(__ -> 
checkAndUnsubscribeSubscriptions());
         }
         return CompletableFuture.completedFuture(null);
     }
 
+    private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        subscriptions.forEach((s, subscription) -> {
+            if (subscription.getConsumers().isEmpty()) {
+                futures.add(subscription.delete());
+            }
+        });
+
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private CompletableFuture<Void> disconnectReplicators() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = 
getReplicators();
+        replicators.forEach((r, replicator) -> {
+            futures.add(replicator.disconnect());
+        });
+        return FutureUtil.waitForAll(futures);
+    }
+
     @Override
     public void checkGC() {
         if (!isDeleteWhileInactive()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57a4989b4d3..f5679665d46 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -180,6 +180,7 @@ import org.apache.pulsar.utils.StatsOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class PersistentTopic extends AbstractTopic implements Topic, 
AddEntryCallback {
 
     // Managed ledger associated with the topic
@@ -2575,25 +2576,49 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     @Override
     public CompletableFuture<Void> checkClusterMigration() {
         Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();
-        if (!isMigrated() && clusterUrl.isPresent()) {
-            return ledger.asyncMigrate().thenApply(__ -> {
-                subscriptions.forEach((name, sub) -> {
-                    if (sub.isSubsciptionMigrated()) {
-                        
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
-                    }
-                });
-                return null;
-            });
-        } else {
+        if (!clusterUrl.isPresent()) {
             return CompletableFuture.completedFuture(null);
         }
+        CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate() :
+                CompletableFuture.completedFuture(null);
+        return migrated.thenApply(__ -> {
+            subscriptions.forEach((name, sub) -> {
+                if (sub.isSubsciptionMigrated()) {
+                    
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
+                }
+            });
+            return null;
+        }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ 
-> checkAndUnsubscribeSubscriptions());
+    }
+
+    private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        subscriptions.forEach((s, subscription) -> {
+            if (subscription.getNumberOfEntriesInBacklog(true) == 0
+                    && subscription.getConsumers().isEmpty()) {
+                futures.add(subscription.delete());
+            }
+        });
+
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private CompletableFuture<Void> checkAndDisconnectReplicators() {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        ConcurrentOpenHashMap<String, Replicator> replicators = 
getReplicators();
+        replicators.forEach((r, replicator) -> {
+            if (replicator.getNumberOfEntriesInBacklog() <= 0) {
+                futures.add(replicator.disconnect());
+            }
+        });
+        return FutureUtil.waitForAll(futures);
     }
 
     public boolean isReplicationBacklogExist() {
         ConcurrentOpenHashMap<String, Replicator> replicators = 
getReplicators();
         if (replicators != null) {
             for (Replicator replicator : replicators.values()) {
-                if (replicator.getNumberOfEntriesInBacklog() != 0) {
+                if (replicator.getNumberOfEntriesInBacklog() > 0) {
                     return true;
                 }
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
index c1807a15661..980a2c01d95 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -312,6 +313,14 @@ public class ClusterMigrationTest {
         retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 
500);
         assertFalse(topic2.getSubscriptions().isEmpty());
 
+        topic1.checkClusterMigration().get();
+        ConcurrentOpenHashMap<String, ? extends Replicator> replicators = 
topic1.getReplicators();
+        replicators.forEach((r, replicator) -> {
+            assertFalse(replicator.isConnected());
+        });
+
+        assertTrue(topic1.getSubscriptions().isEmpty());
+
         // not also create a new consumer which should also reconnect to 
cluster-2
         Consumer<byte[]> consumer2 = 
client1.newConsumer().topic(topicName).subscriptionType(subType)
                 .subscriptionName("s2").subscribe();

Reply via email to