Demogorgon314 commented on code in PR #22378:
URL: https://github.com/apache/pulsar/pull/22378#discussion_r1619707648


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -1857,52 +1857,76 @@ public CompletableFuture<Void> checkReplication() {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Checking replication status", name);
         }
-
         List<String> configuredClusters = 
topicPolicies.getReplicationClusters().get();
         if (CollectionUtils.isEmpty(configuredClusters)) {
             log.warn("[{}] No replication clusters configured", name);
             return CompletableFuture.completedFuture(null);
         }
 
-        int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
-
         String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
 
-        // if local cluster is removed from global namespace cluster-list : 
then delete topic forcefully
-        // because pulsar doesn't serve global topic without local 
repl-cluster configured.
-        if (TopicName.get(topic).isGlobal() && 
!configuredClusters.contains(localCluster)) {
-            log.info("Deleting topic [{}] because local cluster is not part of 
"
-                    + " global namespace repl list {}", topic, 
configuredClusters);
-            return deleteForcefully();
-        }
-
-        removeTerminatedReplicators(replicators);
-        List<CompletableFuture<Void>> futures = new ArrayList<>();
-
-        // Check for missing replicators
-        for (String cluster : configuredClusters) {
-            if (cluster.equals(localCluster)) {
-                continue;
-            }
-            if (!replicators.containsKey(cluster)) {
-                futures.add(startReplicator(cluster));
-            }
-        }
-
-        // Check for replicators to be stopped
-        replicators.forEach((cluster, replicator) -> {
-            // Update message TTL
-            ((PersistentReplicator) 
replicator).updateMessageTTL(newMessageTTLInSeconds);
-            if (!cluster.equals(localCluster)) {
-                if (!configuredClusters.contains(cluster)) {
-                    futures.add(removeReplicator(cluster));
+        return checkAllowedCluster(localCluster).thenCompose(__ -> {

Review Comment:
   Good point, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to