This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new eef82db59f3 [fix][broker]Failed to create partitions after the partitions were deleted because topic GC (#24651) eef82db59f3 is described below commit eef82db59f37e0911c507171d2316c9685caca51 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Aug 28 16:04:29 2025 +0800 [fix][broker]Failed to create partitions after the partitions were deleted because topic GC (#24651) (cherry picked from commit 7c576832631525e6baee264bb485e7fa625db2e3) --- .../pulsar/broker/service/BrokerService.java | 15 +++- .../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +- .../BrokerServiceAutoTopicCreationTest.java | 100 +++++++++++++++++---- .../nonpersistent/NonPersistentTopicTest.java | 2 +- .../service/persistent/PersistentTopicTest.java | 2 +- 5 files changed, 100 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2bdad2bcc7b..75da0b16d93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3593,11 +3593,20 @@ public class BrokerService implements Closeable { allowed = pulsar.getConfiguration().isAllowAutoTopicCreation(); } - if (allowed && topicName.isPartitioned()) { + if (topicName.isPartitioned()) { + TopicName partitionedTopic = TopicName.get(topicName.getPartitionedTopicName()); // cannot re-create topic while it is being deleted return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() - .isPartitionedTopicBeingDeletedAsync(topicName) - .thenApply(beingDeleted -> !beingDeleted); + .getPartitionedTopicMetadataAsync(partitionedTopic, true) + .thenApply(partitionedTopicMetadata -> { + if (partitionedTopicMetadata.isEmpty()) { + return allowed; + } + if (partitionedTopicMetadata.get().deleted) { + return false; + } + return partitionedTopicMetadata.get().partitions > topicName.getPartitionIndex(); + }); } else { return CompletableFuture.completedFuture(allowed); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 801b888f91c..38a8a374e2f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3273,7 +3273,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, MessageId.earliest); fail("Unexpected behaviour"); - } catch (PulsarAdminException.ConflictException ex) { + } catch (PulsarAdminException.PreconditionFailedException ex) { // OK } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index a88e4b66d87..89f30d68e21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -30,7 +30,9 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; @@ -41,6 +43,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; import org.awaitility.Awaitility; @@ -434,15 +438,19 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ } @Test - public void testDynamicConfigurationTopicAutoCreationDisable() throws PulsarAdminException { + public void testDynamicConfigurationTopicAutoCreationDisable() throws Exception { // test disable AllowAutoTopicCreation pulsar.getConfiguration().setAllowAutoTopicCreation(true); admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "false"); final String namespaceName = "prop/ns-abc"; final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-" + UUID.randomUUID(); - Assert.assertThrows(PulsarClientException.NotFoundException.class, - ()-> pulsarClient.newProducer().topic(topic).create()); + try { + pulsarClient.newProducer().topic(topic).create(); + fail("expected a topic not found exception"); + } catch (PulsarClientException.NotFoundException e) { + // expected. + } } @Test @@ -459,8 +467,12 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ Producer<byte[]> producer = pulsarClient.newProducer() .topic(topic) .create(); - List<String> topics = admin.topics().getList(namespaceName); - List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName); + List<String> topics = admin.topics().getList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList(); + List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList(); assertEquals(topics.size(), 1); assertEquals(partitionedTopicList.size(), 0); producer.close(); @@ -489,8 +501,12 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ }); Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); - List<String> topics = admin.topics().getList(namespaceName); - List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName); + List<String> topics = admin.topics().getList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList();; + List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList();; PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); assertEquals(topics.size(), 4); assertEquals(partitionedTopicList.size(), 1); @@ -508,15 +524,19 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED); pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(0); final String namespaceName = "prop/ns-abc"; - String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-" + final String topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-" + UUID.randomUUID(); // test enable AllowAutoTopicCreation, // partitioned when maxNumPartitionsPerPartitionedTopic < defaultNumPartitions admin.brokers().updateDynamicConfiguration("maxNumPartitionsPerPartitionedTopic", "2"); admin.brokers().updateDynamicConfiguration("defaultNumPartitions", "6"); Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create(); - List<String> topics = admin.topics().getList(namespaceName); - List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName); + List<String> topics = admin.topics().getList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList(); + List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic); + }).toList(); PartitionedTopicMetadata partitionedTopicMetadata = admin.topics().getPartitionedTopicMetadata(topic); assertEquals(topics.size(), 2); assertEquals(partitionedTopicList.size(), 1); @@ -532,13 +552,15 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ Awaitility.await().untilAsserted(() -> assertEquals(admin.brokers().getAllDynamicConfigurations() .get("maxNumPartitionsPerPartitionedTopic"), "1")); - topic = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-" + final String topic2 = "persistent://" + namespaceName + "/test-dynamicConfiguration-topic-auto-creation-" + UUID.randomUUID(); - producer = pulsarClient.newProducer().topic(topic).create(); - topics = admin.topics().getList(namespaceName); - assertEquals(topics.size(), 1); + producer = pulsarClient.newProducer().topic(topic2).create(); + List<String> topics2 = admin.topics().getList(namespaceName).stream().filter(tp -> { + return TopicName.get(tp).getPartitionedTopicName().equals(topic2); + }).toList(); + assertEquals(topics2.size(), 1); producer.close(); - for (String t : topics) { + for (String t : topics2) { admin.topics().delete(t); } } @@ -596,4 +618,52 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ } + @Test + public void testCreateTopicAfterGC() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp"); + final String topicP0 = TopicName.get(topic).getPartition(0).toString(); + // Disable topic auto-creation. + boolean originalAllowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation(); + boolean originalDeleteInactiveTopics = + pulsar.getConfiguration().isBrokerDeleteInactiveTopicsEnabled(); + boolean originalDeleteInactivePartitionedTopicMetadataE = + pulsar.getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled(); + pulsar.getConfiguration().setAllowAutoTopicCreation(false); + pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(true); + pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false); + // Create partitioned topic. + Assert.assertThrows(PulsarClientException.NotFoundException.class, + () -> pulsarClient.newProducer().topic(topic).create()); + admin.topics().createPartitionedTopic(topic, 1); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicP0, false).join().get(); + // Enable topic GC. + InactiveTopicPolicies inactiveTopicPolicies = + new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true); + admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue() + .getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions); + assertEquals(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue() + .getMaxInactiveDurationSeconds(), 1); + assertTrue(persistentTopic.topicPolicies.getInactiveTopicPolicies().getTopicValue() + .isDeleteWhileInactive()); + }); + // Trigger topic GC. + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + persistentTopic.checkGC(); + assertFalse(pulsar.getPulsarResources().getTopicResources() + .persistentTopicExists(TopicName.get(topicP0)).get()); + }); + + // Verify: the missed partition can be loaded up since the partitioned topic metadata exists. + pulsarClient.newProducer().topic(topic).create().close(); + + // cleanup. + admin.topics().deletePartitionedTopic(topic); + pulsar.getConfiguration().setAllowAutoTopicCreation(originalAllowAutoTopicCreation); + pulsar.getConfiguration().setBrokerDeleteInactiveTopicsEnabled(originalDeleteInactiveTopics); + pulsar.getConfiguration().setBrokerDeleteInactivePartitionedTopicMetadataEnabled( + originalDeleteInactivePartitionedTopicMetadataE); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 54d411314f8..0acd574bb09 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -122,7 +122,7 @@ public class NonPersistentTopicTest extends BrokerTestBase { .topic(partition.toString()) .create(); fail("unexpected behaviour"); - } catch (PulsarClientException.TopicDoesNotExistException ignored) { + } catch (PulsarClientException.NotFoundException ignored) { } assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index bc33e6d6f39..462a5002bd3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -575,7 +575,7 @@ public class PersistentTopicTest extends BrokerTestBase { .topic(partition.toString()) .create(); fail("unexpected behaviour"); - } catch (PulsarClientException.NotAllowedException ex) { + } catch (PulsarClientException.NotFoundException ex) { } Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); }