This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new eef82db59f3 [fix][broker]Failed to create partitions after the 
partitions were deleted because topic GC (#24651)
eef82db59f3 is described below

commit eef82db59f37e0911c507171d2316c9685caca51
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Aug 28 16:04:29 2025 +0800

    [fix][broker]Failed to create partitions after the partitions were deleted 
because topic GC (#24651)
    
    (cherry picked from commit 7c576832631525e6baee264bb485e7fa625db2e3)
---
 .../pulsar/broker/service/BrokerService.java       |  15 +++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   2 +-
 .../BrokerServiceAutoTopicCreationTest.java        | 100 +++++++++++++++++----
 .../nonpersistent/NonPersistentTopicTest.java      |   2 +-
 .../service/persistent/PersistentTopicTest.java    |   2 +-
 5 files changed, 100 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2bdad2bcc7b..75da0b16d93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -3593,11 +3593,20 @@ public class BrokerService implements Closeable {
             allowed = pulsar.getConfiguration().isAllowAutoTopicCreation();
         }
 
-        if (allowed && topicName.isPartitioned()) {
+        if (topicName.isPartitioned()) {
+            TopicName partitionedTopic = 
TopicName.get(topicName.getPartitionedTopicName());
             // cannot re-create topic while it is being deleted
             return 
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                    .isPartitionedTopicBeingDeletedAsync(topicName)
-                    .thenApply(beingDeleted -> !beingDeleted);
+                .getPartitionedTopicMetadataAsync(partitionedTopic, true)
+                .thenApply(partitionedTopicMetadata -> {
+                    if (partitionedTopicMetadata.isEmpty()) {
+                        return allowed;
+                    }
+                    if (partitionedTopicMetadata.get().deleted) {
+                        return false;
+                    }
+                    return partitionedTopicMetadata.get().partitions > 
topicName.getPartitionIndex();
+                });
         } else {
             return CompletableFuture.completedFuture(allowed);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 801b888f91c..38a8a374e2f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3273,7 +3273,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
             admin.topics().createSubscription(partitionedTopicName + 
"-partition-" + startPartitions, subName1,
                     MessageId.earliest);
             fail("Unexpected behaviour");
-        } catch (PulsarAdminException.ConflictException ex) {
+        } catch (PulsarAdminException.PreconditionFailedException ex) {
             // OK
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index a88e4b66d87..89f30d68e21 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -30,7 +30,9 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
@@ -41,6 +43,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.awaitility.Awaitility;
@@ -434,15 +438,19 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
     }
 
     @Test
-    public void testDynamicConfigurationTopicAutoCreationDisable() throws 
PulsarAdminException {
+    public void testDynamicConfigurationTopicAutoCreationDisable() throws 
Exception {
         // test disable AllowAutoTopicCreation
         pulsar.getConfiguration().setAllowAutoTopicCreation(true);
         admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", 
"false");
         final String namespaceName = "prop/ns-abc";
         final String topic = "persistent://" + namespaceName + 
"/test-dynamicConfiguration-topic-auto-creation-"
                 + UUID.randomUUID();
-        Assert.assertThrows(PulsarClientException.NotFoundException.class,
-                ()-> pulsarClient.newProducer().topic(topic).create());
+        try {
+            pulsarClient.newProducer().topic(topic).create();
+            fail("expected a topic not found exception");
+        } catch (PulsarClientException.NotFoundException e) {
+            // expected.
+        }
     }
 
     @Test
@@ -459,8 +467,12 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         Producer<byte[]> producer = pulsarClient.newProducer()
                 .topic(topic)
                 .create();
-        List<String> topics = admin.topics().getList(namespaceName);
-        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName);
+        List<String> topics = 
admin.topics().getList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();
+        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();
         assertEquals(topics.size(), 1);
         assertEquals(partitionedTopicList.size(), 0);
         producer.close();
@@ -489,8 +501,12 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
 
         });
         Producer<byte[]> producer  = 
pulsarClient.newProducer().topic(topic).create();
-        List<String> topics = admin.topics().getList(namespaceName);
-        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName);
+        List<String> topics = 
admin.topics().getList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();;
+        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();;
         PartitionedTopicMetadata partitionedTopicMetadata = 
admin.topics().getPartitionedTopicMetadata(topic);
         assertEquals(topics.size(), 4);
         assertEquals(partitionedTopicList.size(), 1);
@@ -508,15 +524,19 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0);
         final String namespaceName = "prop/ns-abc";
-        String topic = "persistent://" + namespaceName + 
"/test-dynamicConfiguration-topic-auto-creation-"
+        final String topic = "persistent://" + namespaceName + 
"/test-dynamicConfiguration-topic-auto-creation-"
                 + UUID.randomUUID();
         // test enable AllowAutoTopicCreation,
         // partitioned when maxNumPartitionsPerPartitionedTopic < 
defaultNumPartitions
         
admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic",
 "2");
         admin.brokers().updateDynamicConfiguration("defaultNumPartitions", 
"6");
         Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
-        List<String> topics = admin.topics().getList(namespaceName);
-        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName);
+        List<String> topics = 
admin.topics().getList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();
+        List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic);
+        }).toList();
         PartitionedTopicMetadata partitionedTopicMetadata = 
admin.topics().getPartitionedTopicMetadata(topic);
         assertEquals(topics.size(), 2);
         assertEquals(partitionedTopicList.size(), 1);
@@ -532,13 +552,15 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         Awaitility.await().untilAsserted(() ->
                 assertEquals(admin.brokers().getAllDynamicConfigurations()
                         .get("maxNumPartitionsPerPartitionedTopic"), "1"));
-        topic = "persistent://" + namespaceName + 
"/test-dynamicConfiguration-topic-auto-creation-"
+        final String topic2 = "persistent://" + namespaceName + 
"/test-dynamicConfiguration-topic-auto-creation-"
                 + UUID.randomUUID();
-        producer = pulsarClient.newProducer().topic(topic).create();
-        topics = admin.topics().getList(namespaceName);
-        assertEquals(topics.size(), 1);
+        producer = pulsarClient.newProducer().topic(topic2).create();
+        List<String> topics2 = 
admin.topics().getList(namespaceName).stream().filter(tp -> {
+            return TopicName.get(tp).getPartitionedTopicName().equals(topic2);
+        }).toList();
+        assertEquals(topics2.size(), 1);
         producer.close();
-        for (String t : topics) {
+        for (String t : topics2) {
             admin.topics().delete(t);
         }
     }
@@ -596,4 +618,52 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
 
     }
 
+    @Test
+    public void testCreateTopicAfterGC() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
+        final String topicP0 = TopicName.get(topic).getPartition(0).toString();
+        // Disable topic auto-creation.
+        boolean originalAllowAutoTopicCreation = 
pulsar.getConfiguration().isAllowAutoTopicCreation();
+        boolean originalDeleteInactiveTopics =
+                
pulsar.getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
+        boolean originalDeleteInactivePartitionedTopicMetadataE =
+                
pulsar.getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(true);
+        
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
+        // Create partitioned topic.
+        Assert.assertThrows(PulsarClientException.NotFoundException.class,
+                () -> pulsarClient.newProducer().topic(topic).create());
+        admin.topics().createPartitionedTopic(topic, 1);
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topicP0, 
false).join().get();
+        // Enable topic GC.
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new 
InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, 
true);
+        admin.topicPolicies().setInactiveTopicPolicies(topic, 
inactiveTopicPolicies);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .getInactiveTopicDeleteMode(), 
InactiveTopicDeleteMode.delete_when_no_subscriptions);
+            
assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .getMaxInactiveDurationSeconds(), 1);
+            
assertTrue(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue()
+                    .isDeleteWhileInactive());
+        });
+        // Trigger topic GC.
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            persistentTopic.checkGC();
+            assertFalse(pulsar.getPulsarResources().getTopicResources()
+                    .persistentTopicExists(TopicName.get(topicP0)).get());
+        });
+
+        // Verify: the missed partition can be loaded up since the partitioned 
topic metadata exists.
+        pulsarClient.newProducer().topic(topic).create().close();
+
+        // cleanup.
+        admin.topics().deletePartitionedTopic(topic);
+        
pulsar.getConfiguration().setAllowAutoTopicCreation(originalAllowAutoTopicCreation);
+        
pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(originalDeleteInactiveTopics);
+        
pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(
+                originalDeleteInactivePartitionedTopicMetadataE);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
index 54d411314f8..0acd574bb09 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java
@@ -122,7 +122,7 @@ public class NonPersistentTopicTest extends BrokerTestBase {
                     .topic(partition.toString())
                     .create();
             fail("unexpected behaviour");
-        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
+        } catch (PulsarClientException.NotFoundException ignored) {
 
         }
         
assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 
4);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index bc33e6d6f39..462a5002bd3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -575,7 +575,7 @@ public class PersistentTopicTest extends BrokerTestBase {
                     .topic(partition.toString())
                     .create();
             fail("unexpected behaviour");
-        } catch (PulsarClientException.NotAllowedException ex) {
+        } catch (PulsarClientException.NotFoundException ex) {
         }
         
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions,
 4);
     }

Reply via email to