This is an automated email from the ASF dual-hosted git repository. heesung pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 88df040ed34 [fix][broker] Skip topic auto-creation for ExtensibleLoadManager internal topics (#21729) 88df040ed34 is described below commit 88df040ed34e6863f2c255ace1b050030a3d54e7 Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com> AuthorDate: Fri Dec 15 12:34:59 2023 -0800 [fix][broker] Skip topic auto-creation for ExtensibleLoadManager internal topics (#21729) --- .../extensions/ExtensibleLoadManagerImpl.java | 2 +- .../pulsar/broker/service/BrokerService.java | 8 +-- .../BrokerServiceAutoTopicCreationTest.java | 63 ++++++++++++++++++++++ 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 8e0a58a0f45..ae1a4e606bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -781,7 +781,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { } } - private boolean isInternalTopic(String topic) { + public static boolean isInternalTopic(String topic) { return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC) || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC) || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b15ffc755a6..4077762bb06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -104,7 +104,7 @@ import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; -import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.DynamicConfigurationResources; import org.apache.pulsar.broker.resources.LocalPoliciesResources; @@ -3288,10 +3288,10 @@ public class BrokerService implements Closeable { return CompletableFuture.completedFuture(false); } - // ServiceUnitStateChannelImpl.TOPIC expects to be a non-partitioned-topic now. + // ExtensibleLoadManagerImpl.internal topics expects to be non-partitioned-topics now. // We don't allow the auto-creation here. - // ServiceUnitStateChannelImpl.start() is responsible to create the topic. - if (ServiceUnitStateChannelImpl.TOPIC.equals(topicName.toString())) { + // ExtensibleLoadManagerImpl.start() is responsible to create the internal system topics. + if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) { return CompletableFuture.completedFuture(false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index a28b60bbae3..0a6cffc7685 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -25,18 +25,27 @@ import static org.testng.Assert.fail; import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -526,4 +535,58 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ } } + @Test + public void testExtensibleLoadManagerImplInternalTopicAutoCreations() + throws PulsarAdminException, PulsarClientException { + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED); + pulsar.getConfiguration().setDefaultNumPartitions(3); + pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(5); + final String namespaceName = NamespaceName.SYSTEM_NAMESPACE.toString(); + TenantInfoImpl tenantInfo = new TenantInfoImpl(); + tenantInfo.setAllowedClusters(Set.of(configClusterName)); + admin.tenants().createTenant("pulsar", tenantInfo); + admin.namespaces().createNamespace(namespaceName); + admin.topics().createNonPartitionedTopic(ServiceUnitStateChannelImpl.TOPIC); + admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC); + admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC); + + // clear the topics to test the auto creation of non-persistent topics. + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics = + pulsar.getBrokerService().getTopics(); + ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> oldTopics = new ConcurrentOpenHashMap<>(); + topics.forEach((key, val) -> oldTopics.put(key, val)); + topics.clear(); + + // The created persistent topic correctly can be found by + // pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); + Producer producer = pulsarClient.newProducer().topic(ServiceUnitStateChannelImpl.TOPIC).create(); + + // The created non-persistent topics cannot be found, as we did topics.clear() + try { + pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create(); + Assert.fail("Create should have failed."); + } catch (PulsarClientException.TopicDoesNotExistException e) { + // expected + } + try { + pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create(); + Assert.fail("Create should have failed."); + } catch (PulsarClientException.TopicDoesNotExistException e) { + // expected + } + + oldTopics.forEach((key, val) -> topics.put(key, val)); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + List<String> partitionedTopicList = admin.topics().getPartitionedTopicList(namespaceName); + assertEquals(partitionedTopicList.size(), 0); + }); + + producer.close(); + admin.namespaces().deleteNamespace(namespaceName); + admin.tenants().deleteTenant("pulsar"); + + } + }