315157973 commented on a change in pull request #9042:
URL: https://github.com/apache/pulsar/pull/9042#discussion_r547976954



##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
##########
@@ -3182,5 +3182,26 @@ protected OffloadPolicies internalGetOffloadPolicies() {
         return policies.offload_policies;
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
+    protected int internalGetMaxTopicsPerNamespace() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.READ);
+        return getNamespacePolicies(namespaceName).max_topics_per_namespace;
+    }
+
+   protected void internalRemoveMaxTopicsPerNamespace() {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.WRITE);
+        internalSetMaxTopicsPerNamespace(null);
+   }
+
+   protected void internalSetMaxTopicsPerNamespace(Integer 
maxTopicsPerNamespace) {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, 
PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        if (maxTopicsPerNamespace != null && maxTopicsPerNamespace < 0) {

Review comment:
       When setting the value, it must be greater than or equal to 0, so there 
should be no case of <0.
   

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -811,9 +811,14 @@ protected boolean isNamespaceReplicated(NamespaceName 
namespaceName) {
     }
 
     protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, 
int numPartitions) {
-        final int maxTopicsPerNamespace = 
pulsar().getConfig().getMaxTopicsPerNamespace();
-        if (maxTopicsPerNamespace > 0) {
-            try {
+        Integer maxTopicsPerNamespace;
+        try {
+            maxTopicsPerNamespace = 
getNamespacePolicies(namespaceName).max_topics_per_namespace;
+            if (maxTopicsPerNamespace == null || maxTopicsPerNamespace < 0) {

Review comment:
       When setting the value, it must be greater than or equal to 0, so there 
should be no case of <0.
   

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
##########
@@ -1413,6 +1414,132 @@ public void testRetentionPolicyValidation() throws 
Exception {
         admin.namespaces().deleteNamespace(namespace);
     }
 
+    @Test(timeOut = 30000)
+    public void testMaxTopicsPerNamespace() throws Exception {
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(15);
+        super.internalSetup();
+
+        String namespace = "testTenant/ns1";
+        admin.clusters().createCluster("use", new 
ClusterData(brokerUrl.toString()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"),
+                Sets.newHashSet("use"));
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+        assertEquals(10, 
admin.namespaces().getMaxTopicsPerNamespace(namespace));
+
+        // check create partitioned/non-partitioned topics using namespace 
policy
+        String topic = "persistent://testTenant/ns1/test_create_topic_v";
+        admin.topics().createPartitionedTopic(topic + "1", 2);
+        admin.topics().createPartitionedTopic(topic + "2", 3);
+        admin.topics().createPartitionedTopic(topic + "3", 4);
+        admin.topics().createNonPartitionedTopic(topic + "4");
+
+        try {
+            admin.topics().createPartitionedTopic(topic + "5", 2);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getHttpError(), "Exceed maximum number of topics in 
namespace.");
+        }
+
+        // remove namespace policy limit, use broker configuration instead.
+        admin.namespaces().removeMaxTopicsPerNamespace(namespace);
+        admin.topics().createPartitionedTopic(topic + "6", 4);
+        try {
+            admin.topics().createPartitionedTopic(topic + "7", 3);
+            fail();
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 412);
+            assertEquals(e.getHttpError(), "Exceed maximum number of topics in 
namespace.");
+        }
+
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 0);
+        // set namespace policy to no limit
+        for (int i = 0; i< 10; ++i) {
+            admin.topics().createPartitionedTopic(topic + "_v" + i, 2);
+            admin.topics().createNonPartitionedTopic(topic + "_vn" + i);
+        }
+
+
+        // check producer/consumer auto create partitioned topic
+        super.internalCleanup();
+        conf.setMaxTopicsPerNamespace(0);
+        conf.setDefaultNumPartitions(3);
+        conf.setAllowAutoTopicCreationType("partitioned");
+        super.internalSetup();
+
+        admin.clusters().createCluster("use", new 
ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("testTenant", tenantInfo);
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("use"));
+        admin.namespaces().setMaxTopicsPerNamespace(namespace, 10);
+
+        pulsarClient.newProducer().topic(topic + "1").create();

Review comment:
       Will there be a problem if the created producer is not closed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to