This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 88df040ed34 [fix][broker] Skip topic auto-creation for 
ExtensibleLoadManager internal topics (#21729)
88df040ed34 is described below

commit 88df040ed34e6863f2c255ace1b050030a3d54e7
Author: Heesung Sohn <103456639+heesung...@users.noreply.github.com>
AuthorDate: Fri Dec 15 12:34:59 2023 -0800

    [fix][broker] Skip topic auto-creation for ExtensibleLoadManager internal 
topics (#21729)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  2 +-
 .../pulsar/broker/service/BrokerService.java       |  8 +--
 .../BrokerServiceAutoTopicCreationTest.java        | 63 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 8e0a58a0f45..ae1a4e606bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -781,7 +781,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         }
     }
 
-    private boolean isInternalTopic(String topic) {
+    public static boolean isInternalTopic(String topic) {
         return topic.startsWith(ServiceUnitStateChannelImpl.TOPIC)
                 || topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
                 || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b15ffc755a6..4077762bb06 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -104,7 +104,7 @@ import 
org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
-import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
@@ -3288,10 +3288,10 @@ public class BrokerService implements Closeable {
             return CompletableFuture.completedFuture(false);
         }
 
-        // ServiceUnitStateChannelImpl.TOPIC expects to be a 
non-partitioned-topic now.
+        // ExtensibleLoadManagerImpl.internal topics expects to be 
non-partitioned-topics now.
         // We don't allow the auto-creation here.
-        // ServiceUnitStateChannelImpl.start() is responsible to create the 
topic.
-        if (ServiceUnitStateChannelImpl.TOPIC.equals(topicName.toString())) {
+        // ExtensibleLoadManagerImpl.start() is responsible to create the 
internal system topics.
+        if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
             return CompletableFuture.completedFuture(false);
         }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index a28b60bbae3..0a6cffc7685 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -25,18 +25,27 @@ import static org.testng.Assert.fail;
 
 
 import java.util.List;
+import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
+import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
 import org.apache.pulsar.client.admin.ListNamespaceTopicsOptions;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -526,4 +535,58 @@ public class BrokerServiceAutoTopicCreationTest extends 
BrokerTestBase{
         }
     }
 
+    @Test
+    public void testExtensibleLoadManagerImplInternalTopicAutoCreations()
+            throws PulsarAdminException, PulsarClientException {
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(5);
+        final String namespaceName = NamespaceName.SYSTEM_NAMESPACE.toString();
+        TenantInfoImpl tenantInfo = new TenantInfoImpl();
+        tenantInfo.setAllowedClusters(Set.of(configClusterName));
+        admin.tenants().createTenant("pulsar", tenantInfo);
+        admin.namespaces().createNamespace(namespaceName);
+        
admin.topics().createNonPartitionedTopic(ServiceUnitStateChannelImpl.TOPIC);
+        
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC);
+        
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+
+        // clear the topics to test the auto creation of non-persistent topics.
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics =
+                pulsar.getBrokerService().getTopics();
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
oldTopics = new ConcurrentOpenHashMap<>();
+        topics.forEach((key, val) -> oldTopics.put(key, val));
+        topics.clear();
+
+        // The created persistent topic correctly can be found by
+        // 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+        Producer producer = 
pulsarClient.newProducer().topic(ServiceUnitStateChannelImpl.TOPIC).create();
+
+        // The created non-persistent topics cannot be found, as we did 
topics.clear()
+        try {
+            
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC).create();
+            Assert.fail("Create should have failed.");
+        } catch (PulsarClientException.TopicDoesNotExistException e) {
+            // expected
+        }
+        try {
+            
pulsarClient.newProducer().topic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC).create();
+            Assert.fail("Create should have failed.");
+        } catch (PulsarClientException.TopicDoesNotExistException e) {
+            // expected
+        }
+
+        oldTopics.forEach((key, val) -> topics.put(key, val));
+
+        Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+            List<String> partitionedTopicList = 
admin.topics().getPartitionedTopicList(namespaceName);
+            assertEquals(partitionedTopicList.size(), 0);
+        });
+
+        producer.close();
+        admin.namespaces().deleteNamespace(namespaceName);
+        admin.tenants().deleteTenant("pulsar");
+
+    }
+
 }

Reply via email to