This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fb863a4150fcbe7da10f5554b6aabb889b69b65d
Author: fengyubiao <[email protected]>
AuthorDate: Wed Apr 30 09:43:51 2025 +0800

    [fix][broker] Orphan schema after disabled a cluster for a namespace 
(#24223)
    
    (cherry picked from commit 2d78cbddbcf921fed9649203a32d98839346ff60)
---
 .../broker/service/persistent/PersistentTopic.java | 54 +++++++++++++++++++++-
 ...OneWayReplicatorUsingGlobalPartitionedTest.java | 45 ++++++++++++++----
 2 files changed, 90 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 306f7a08fc1..5dd2909def8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -1928,7 +1929,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             if (!success) {
                 // if local cluster is removed from global namespace 
cluster-list : then delete topic forcefully
                 // because pulsar doesn't serve global topic without local 
repl-cluster configured.
-                return deleteForcefully();
+                return deleteForcefully().thenCompose(ignore -> {
+                    return deleteSchemaAndPoliciesIfClusterRemoved();
+                });
             }
 
             int newMessageTTLInSeconds = 
topicPolicies.getMessageTTLInSeconds().get();
@@ -1971,6 +1974,55 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         });
     }
 
+    CompletableFuture<Void> deleteSchemaAndPoliciesIfClusterRemoved() {
+        TopicName tName = TopicName.get(topic);
+        if (!tName.isPartitioned()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName partitionedName = 
TopicName.get(tName.getPartitionedTopicName());
+        return 
brokerService.getPulsar().getPulsarResources().getNamespaceResources()
+            .getPartitionedTopicResources()
+            .getPartitionedTopicMetadataAsync(partitionedName)
+            .thenApply(metadataOp -> {
+                if (metadataOp.isEmpty()) {
+                    return null;
+                }
+                AtomicInteger checkedCounter = new 
AtomicInteger(metadataOp.get().partitions);
+                for (int i = 0; i < metadataOp.get().partitions; i++) {
+                    
brokerService.getPulsar().getPulsarResources().getTopicResources()
+                        
.persistentTopicExists(partitionedName.getPartition(i)).thenAccept(b -> {
+                            if (!b) {
+                                int leftPartitions = 
checkedCounter.decrementAndGet();
+                                log.info("[{}] partitions: {}, left: {}", 
tName, metadataOp.get().partitions,
+                                    leftPartitions);
+                                if (leftPartitions == 0) {
+                                    
brokerService.getPulsar().getSchemaStorage()
+                                        
.delete(partitionedName.getSchemaName())
+                                        .whenComplete((schemaVersion, ex) -> {
+                                            if (ex == null) {
+                                                log.info("Deleted schema[{}] 
after all partitions[{}] were removed"
+                                                    + " because the current 
cluster has bee removed from"
+                                                    + " topic/namespace 
policies",
+                                                    partitionedName, 
metadataOp.get().partitions);
+                                            } else {
+                                                log.error("Failed to delete 
schema[{}] after all partitions[{}] were"
+                                                    + " removed,  when the 
current cluster has bee removed from"
+                                                    + " topic/namespace 
policies",
+                                                    partitionedName, 
metadataOp.get().partitions, ex);
+                                            }
+
+                                    });
+                                    // TODO regarding the topic level 
policies, it will be deleted at a seperate PR.
+                                    //   Because there is an issue related to 
Global policies has not been solved so
+                                    //   far.
+                                }
+                            }
+                        });
+                }
+                return null;
+            });
+    }
+
     private CompletableFuture<Boolean> checkAllowedCluster(String 
localCluster) {
         List<String> replicationClusters = 
topicPolicies.getReplicationClusters().get();
         return 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
index a22067101c3..cc8be021a8f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -33,6 +35,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.awaitility.Awaitility;
@@ -174,17 +177,38 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         // Initialize.
         final String ns1 = defaultTenant + "/" + 
"ns_73b1a31afce34671a5ddc48fe5ad7fc8";
         final String topic = "persistent://" + ns1 + 
"/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
+        final String topicP0 = TopicName.get(topic).getPartition(0).toString();
+        final String topicP1 = TopicName.get(topic).getPartition(1).toString();
         final String topicChangeEvents = "persistent://" + ns1 + 
"/__change_events-partition-0";
         admin1.namespaces().createNamespace(ns1);
         admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster1, cluster2)));
-        admin1.topics().createNonPartitionedTopic(topic);
+        admin1.topics().createPartitionedTopic(topic, 2);
+        Awaitility.await().untilAsserted(() -> {
+           
assertTrue(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+                   .partitionedTopicExists(TopicName.get(topic)));
+            List<CompletableFuture<StoredSchema>> schemaList11
+                    = 
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList11.size(), 0);
+            List<CompletableFuture<StoredSchema>> schemaList21
+                    = 
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList21.size(), 0);
+        });
 
-        // Wait for loading topic up.
+        // Wait for copying messages.
         Producer<String> p = 
client1.newProducer(Schema.STRING).topic(topic).create();
+        p.send("msg-1");
+        p.close();
         Awaitility.await().untilAsserted(() -> {
             Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
-            assertTrue(tps.containsKey(topic));
+            assertTrue(tps.containsKey(topicP0));
+            assertTrue(tps.containsKey(topicP1));
             assertTrue(tps.containsKey(topicChangeEvents));
+            List<CompletableFuture<StoredSchema>> schemaList12
+                    = 
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList12.size(), 1);
+            List<CompletableFuture<StoredSchema>> schemaList22
+                    = 
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList12.size(), 1);
         });
 
         // The topics under the namespace of the cluster-1 will be deleted.
@@ -192,18 +216,23 @@ public class OneWayReplicatorUsingGlobalPartitionedTest 
extends OneWayReplicator
         admin1.namespaces().setNamespaceReplicationClusters(ns1, new 
HashSet<>(Arrays.asList(cluster2)));
         
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(()
 -> {
             Map<String, CompletableFuture<Optional<Topic>>> tps = 
pulsar1.getBrokerService().getTopics();
-            assertFalse(tps.containsKey(topic));
+            assertFalse(tps.containsKey(topicP0));
+            assertFalse(tps.containsKey(topicP1));
             assertFalse(tps.containsKey(topicChangeEvents));
-            
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
-                    .get(5, TimeUnit.SECONDS).isExists());
             assertFalse(pulsar1.getNamespaceService()
                     .checkTopicExists(TopicName.get(topicChangeEvents))
                     .get(5, TimeUnit.SECONDS).isExists());
+            // Verify: schema will be removed in local cluster, and remote 
cluster will not.
+            List<CompletableFuture<StoredSchema>> schemaList13
+                    = 
pulsar1.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList13.size(), 0);
+            List<CompletableFuture<StoredSchema>> schemaList23
+                    = 
pulsar2.getSchemaStorage().getAll(TopicName.get(topic).getSchemaName()).get();
+            assertEquals(schemaList23.size(), 1);
         });
 
         // cleanup.
-        p.close();
-        admin2.topics().delete(topic);
+        admin2.topics().deletePartitionedTopic(topic);
         admin2.namespaces().deleteNamespace(ns1);
     }
 }

Reply via email to