315157973 commented on a change in pull request #9042: URL: https://github.com/apache/pulsar/pull/9042#discussion_r547976954
########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java ########## @@ -3182,5 +3182,26 @@ protected OffloadPolicies internalGetOffloadPolicies() { return policies.offload_policies; } - private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class); + protected int internalGetMaxTopicsPerNamespace() { + validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.READ); + return getNamespacePolicies(namespaceName).max_topics_per_namespace; + } + + protected void internalRemoveMaxTopicsPerNamespace() { + validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); + internalSetMaxTopicsPerNamespace(null); + } + + protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) { + validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_TOPICS, PolicyOperation.WRITE); + validatePoliciesReadOnlyAccess(); + + if (maxTopicsPerNamespace != null && maxTopicsPerNamespace < 0) { Review comment: When setting the value, it must be greater than or equal to 0, so there should be no case of <0. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java ########## @@ -811,9 +811,14 @@ protected boolean isNamespaceReplicated(NamespaceName namespaceName) { } protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) { - final int maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace(); - if (maxTopicsPerNamespace > 0) { - try { + Integer maxTopicsPerNamespace; + try { + maxTopicsPerNamespace = getNamespacePolicies(namespaceName).max_topics_per_namespace; + if (maxTopicsPerNamespace == null || maxTopicsPerNamespace < 0) { Review comment: When setting the value, it must be greater than or equal to 0, so there should be no case of <0. ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java ########## @@ -1413,6 +1414,132 @@ public void testRetentionPolicyValidation() throws Exception { admin.namespaces().deleteNamespace(namespace); } + @Test(timeOut = 30000) + public void testMaxTopicsPerNamespace() throws Exception { + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(15); + super.internalSetup(); + + String namespace = "testTenant/ns1"; + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString())); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), + Sets.newHashSet("use")); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("use")); + + admin.namespaces().setMaxTopicsPerNamespace(namespace, 10); + assertEquals(10, admin.namespaces().getMaxTopicsPerNamespace(namespace)); + + // check create partitioned/non-partitioned topics using namespace policy + String topic = "persistent://testTenant/ns1/test_create_topic_v"; + admin.topics().createPartitionedTopic(topic + "1", 2); + admin.topics().createPartitionedTopic(topic + "2", 3); + admin.topics().createPartitionedTopic(topic + "3", 4); + admin.topics().createNonPartitionedTopic(topic + "4"); + + try { + admin.topics().createPartitionedTopic(topic + "5", 2); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 412); + assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace."); + } + + // remove namespace policy limit, use broker configuration instead. + admin.namespaces().removeMaxTopicsPerNamespace(namespace); + admin.topics().createPartitionedTopic(topic + "6", 4); + try { + admin.topics().createPartitionedTopic(topic + "7", 3); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getStatusCode(), 412); + assertEquals(e.getHttpError(), "Exceed maximum number of topics in namespace."); + } + + admin.namespaces().setMaxTopicsPerNamespace(namespace, 0); + // set namespace policy to no limit + for (int i = 0; i< 10; ++i) { + admin.topics().createPartitionedTopic(topic + "_v" + i, 2); + admin.topics().createNonPartitionedTopic(topic + "_vn" + i); + } + + + // check producer/consumer auto create partitioned topic + super.internalCleanup(); + conf.setMaxTopicsPerNamespace(0); + conf.setDefaultNumPartitions(3); + conf.setAllowAutoTopicCreationType("partitioned"); + super.internalSetup(); + + admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString())); + admin.tenants().createTenant("testTenant", tenantInfo); + admin.namespaces().createNamespace(namespace, Sets.newHashSet("use")); + admin.namespaces().setMaxTopicsPerNamespace(namespace, 10); + + pulsarClient.newProducer().topic(topic + "1").create(); Review comment: Will there be a problem if the created producer is not closed? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org