This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c1b0454614b [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029) c1b0454614b is described below commit c1b0454614b7903913cb0311bdcacf2118893fc9 Author: vineeth1995 <vineethreddypo...@gmail.com> AuthorDate: Tue Aug 22 09:52:30 2023 -0700 [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029) Co-authored-by: Vineeth Polamreddy <vineeth.polamre...@verizonmedia.com> --- .../service/nonpersistent/NonPersistentTopic.java | 21 ++++++++++ .../broker/service/persistent/PersistentTopic.java | 47 +++++++++++++++++----- .../broker/service/ClusterMigrationTest.java | 9 +++++ 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c764283cb44..836e5655168 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -959,10 +959,31 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol consumer.topicMigrated(url); }); }); + return disconnectReplicators().thenCompose(__ -> checkAndUnsubscribeSubscriptions()); } return CompletableFuture.completedFuture(null); } + private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + subscriptions.forEach((s, subscription) -> { + if (subscription.getConsumers().isEmpty()) { + futures.add(subscription.delete()); + } + }); + + return FutureUtil.waitForAll(futures); + } + + private CompletableFuture<Void> disconnectReplicators() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators = getReplicators(); + replicators.forEach((r, replicator) -> { + futures.add(replicator.disconnect()); + }); + return FutureUtil.waitForAll(futures); + } + @Override public void checkGC() { if (!isDeleteWhileInactive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 57a4989b4d3..f5679665d46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -180,6 +180,7 @@ import org.apache.pulsar.utils.StatsOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { // Managed ledger associated with the topic @@ -2575,25 +2576,49 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture<Void> checkClusterMigration() { Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl(); - if (!isMigrated() && clusterUrl.isPresent()) { - return ledger.asyncMigrate().thenApply(__ -> { - subscriptions.forEach((name, sub) -> { - if (sub.isSubsciptionMigrated()) { - sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); - } - }); - return null; - }); - } else { + if (!clusterUrl.isPresent()) { return CompletableFuture.completedFuture(null); } + CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate() : + CompletableFuture.completedFuture(null); + return migrated.thenApply(__ -> { + subscriptions.forEach((name, sub) -> { + if (sub.isSubsciptionMigrated()) { + sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); + } + }); + return null; + }).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions()); + } + + private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + subscriptions.forEach((s, subscription) -> { + if (subscription.getNumberOfEntriesInBacklog(true) == 0 + && subscription.getConsumers().isEmpty()) { + futures.add(subscription.delete()); + } + }); + + return FutureUtil.waitForAll(futures); + } + + private CompletableFuture<Void> checkAndDisconnectReplicators() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators(); + replicators.forEach((r, replicator) -> { + if (replicator.getNumberOfEntriesInBacklog() <= 0) { + futures.add(replicator.disconnect()); + } + }); + return FutureUtil.waitForAll(futures); } public boolean isReplicationBacklogExist() { ConcurrentOpenHashMap<String, Replicator> replicators = getReplicators(); if (replicators != null) { for (Replicator replicator : replicators.values()) { - if (replicator.getNumberOfEntriesInBacklog() != 0) { + if (replicator.getNumberOfEntriesInBacklog() > 0) { return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index c1807a15661..980a2c01d95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -312,6 +313,14 @@ public class ClusterMigrationTest { retryStrategically((test) -> !topic2.getSubscriptions().isEmpty(), 10, 500); assertFalse(topic2.getSubscriptions().isEmpty()); + topic1.checkClusterMigration().get(); + ConcurrentOpenHashMap<String, ? extends Replicator> replicators = topic1.getReplicators(); + replicators.forEach((r, replicator) -> { + assertFalse(replicator.isConnected()); + }); + + assertTrue(topic1.getSubscriptions().isEmpty()); + // not also create a new consumer which should also reconnect to cluster-2 Consumer<byte[]> consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType) .subscriptionName("s2").subscribe();