This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 11506521033 [fix][broker]Leaving orphan schemas and topic-level 
policies after partitioned topic is deleted by GC (#24971)
11506521033 is described below

commit 1150652103374b1407defb6018a94ce575a0af13
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 12 05:26:01 2025 +0800

    [fix][broker]Leaving orphan schemas and topic-level policies after 
partitioned topic is deleted by GC (#24971)
    
    (cherry picked from commit 27ce9551c1a06efac422a16f31e1613516377a83)
---
 .../broker/service/persistent/PersistentTopic.java | 33 +++++++++++++++++++++-
 .../ReplicationTopicGcUsingGlobalZKTest.java       |  5 ----
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2f0255f4f2a..f320f8a6e7b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -99,6 +99,8 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateDataConflictResolver;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -185,6 +187,7 @@ import 
org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@@ -3453,6 +3456,34 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         }
     }
 
+    private CompletableFuture<Void> 
deleteSchemaAndPoliciesIfAllPartitionsDeleted() {
+        if (!TopicName.get(topic).isPartitioned()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName pTopicName = 
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        final BrokerService broker = getBrokerService();
+        final PulsarResources pulsarResources = 
broker.pulsar().getPulsarResources();
+        final TopicResources topicResources = 
pulsarResources.getTopicResources();
+        final TopicPoliciesService topicPoliciesService = 
broker.getPulsar().getTopicPoliciesService();
+        final SchemaStorage schemaStorage = 
broker.getPulsar().getSchemaStorage();
+        return 
topicResources.listPersistentTopicsAsync(pTopicName.getNamespaceObject()).thenApply(list
 -> {
+            for (String s : list) {
+                TopicName item = TopicName.get(s);
+                if (item.isPartitioned() && 
item.getPartitionedTopicName().equals(pTopicName.toString())) {
+                    return true;
+                }
+            }
+            return false;
+        }).thenCompose(partitionExists -> {
+            if (partitionExists) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return 
schemaStorage.delete(pTopicName.getSchemaName()).thenCompose(__ -> {
+                return 
topicPoliciesService.deleteTopicPoliciesAsync(pTopicName, false);
+            });
+        });
+    }
+
     private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
         if (TopicName.get(topic).isPartitioned() && 
!deletePartitionedTopicMetadataWhileInactive()) {
             return CompletableFuture.completedFuture(null);
@@ -3464,7 +3495,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
                 .thenCompose(partitionedTopicExist -> {
                     if (!partitionedTopicExist) {
-                        return CompletableFuture.completedFuture(null);
+                        return deleteSchemaAndPoliciesIfAllPartitionsDeleted();
                     } else {
                         return 
getBrokerService().pulsar().getPulsarResources().getNamespaceResources()
                                 
.getPartitionedTopicResources().runWithMarkDeleteAsync(topicName, () ->
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
index fabfb3333e9..0857368c3d8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
@@ -43,11 +43,6 @@ public class ReplicationTopicGcUsingGlobalZKTest extends 
ReplicationTopicGcTest
 
     @Test(dataProvider = "topicTypes")
     public void testTopicGC(TopicType topicType) throws Exception {
-        if (topicType.equals(TopicType.PARTITIONED)) {
-            // Pulsar does not support the feature 
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
-            // Geo-Replication with Global ZK.
-            return;
-        }
         super.testTopicGC(topicType);
     }
 

Reply via email to