gaoran10 commented on code in PR #21729:
URL: https://github.com/apache/pulsar/pull/21729#discussion_r1428379167


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java:
##########
@@ -526,4 +534,58 @@ public void 
testDynamicConfigurationTopicAutoCreationPartitionedWhenDefaultMoreT
         }
     }
 
+    @Test
+    public void testExtensibleLoadManagerImplInternalTopicAutoCreations()
+            throws PulsarAdminException, PulsarClientException {
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        
pulsar.getConfiguration().setAllowAutoTopicCreationType(TopicType.PARTITIONED);
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+        pulsar.getConfiguration().setMaxNumPartitionsPerPartitionedTopic(5);
+        final String namespaceName = NamespaceName.SYSTEM_NAMESPACE.toString();
+        TenantInfoImpl tenantInfo = new TenantInfoImpl();
+        tenantInfo.setAllowedClusters(Set.of(configClusterName));
+        admin.tenants().createTenant("pulsar", tenantInfo);
+        admin.namespaces().createNamespace(namespaceName);
+        
admin.topics().createNonPartitionedTopic(ServiceUnitStateChannelImpl.TOPIC);
+        
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC);
+        
admin.topics().createNonPartitionedTopic(ExtensibleLoadManagerImpl.TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
+
+        // clear the topics to test the auto creation of non-persistent topics.
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
topics =
+                pulsar.getBrokerService().getTopics();
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> 
oldTopics = new ConcurrentOpenHashMap<>();
+        topics.forEach((key, val) -> oldTopics.put(key, val));
+        topics.clear();
+
+        // The created persistent topic correctly can be found by
+        // 
pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
+        Producer producer = 
pulsarClient.newProducer().topic(ServiceUnitStateChannelImpl.TOPIC).create();

Review Comment:
   I checked the logic, it seems that the reason why the producer can be 
created successfully is not 
"pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic)".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to