This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new d2560f36e6d [fix][broker]Leaving orphan schemas and topic-level
policies after partitioned topic is deleted by GC (#24971)
d2560f36e6d is described below
commit d2560f36e6d09992a54b4fcf66b22c83d40e4b1c
Author: fengyubiao <[email protected]>
AuthorDate: Wed Nov 12 05:26:01 2025 +0800
[fix][broker]Leaving orphan schemas and topic-level policies after
partitioned topic is deleted by GC (#24971)
(cherry picked from commit 27ce9551c1a06efac422a16f31e1613516377a83)
---
.../broker/service/persistent/PersistentTopic.java | 33 +++++++++++++++++++++-
.../ReplicationTopicGcUsingGlobalZKTest.java | 5 ----
2 files changed, 32 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 931b4cdefc8..bd503f896bf 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -100,6 +100,8 @@ import
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateDataConflictResolver;
import org.apache.pulsar.broker.namespace.NamespaceService;
import
org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
@@ -186,6 +188,7 @@ import
org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@@ -3407,6 +3410,34 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
}
+ private CompletableFuture<Void>
deleteSchemaAndPoliciesIfAllPartitionsDeleted() {
+ if (!TopicName.get(topic).isPartitioned()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName pTopicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ final BrokerService broker = getBrokerService();
+ final PulsarResources pulsarResources =
broker.pulsar().getPulsarResources();
+ final TopicResources topicResources =
pulsarResources.getTopicResources();
+ final TopicPoliciesService topicPoliciesService =
broker.getPulsar().getTopicPoliciesService();
+ final SchemaStorage schemaStorage =
broker.getPulsar().getSchemaStorage();
+ return
topicResources.listPersistentTopicsAsync(pTopicName.getNamespaceObject()).thenApply(list
-> {
+ for (String s : list) {
+ TopicName item = TopicName.get(s);
+ if (item.isPartitioned() &&
item.getPartitionedTopicName().equals(pTopicName.toString())) {
+ return true;
+ }
+ }
+ return false;
+ }).thenCompose(partitionExists -> {
+ if (partitionExists) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return
schemaStorage.delete(pTopicName.getSchemaName()).thenCompose(__ -> {
+ return
topicPoliciesService.deleteTopicPoliciesAsync(pTopicName, false);
+ });
+ });
+ }
+
private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
if (TopicName.get(topic).isPartitioned() &&
!deletePartitionedTopicMetadataWhileInactive()) {
return CompletableFuture.completedFuture(null);
@@ -3418,7 +3449,7 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
.thenCompose(partitionedTopicExist -> {
if (!partitionedTopicExist) {
- return CompletableFuture.completedFuture(null);
+ return deleteSchemaAndPoliciesIfAllPartitionsDeleted();
} else {
return
getBrokerService().pulsar().getPulsarResources().getNamespaceResources()
.getPartitionedTopicResources().runWithMarkDeleteAsync(topicName, () ->
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
index fabfb3333e9..0857368c3d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicationTopicGcUsingGlobalZKTest.java
@@ -43,11 +43,6 @@ public class ReplicationTopicGcUsingGlobalZKTest extends
ReplicationTopicGcTest
@Test(dataProvider = "topicTypes")
public void testTopicGC(TopicType topicType) throws Exception {
- if (topicType.equals(TopicType.PARTITIONED)) {
- // Pulsar does not support the feature
"brokerDeleteInactivePartitionedTopicMetadataEnabled" when enabling
- // Geo-Replication with Global ZK.
- return;
- }
super.testTopicGC(topicType);
}