This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b219ccac334 [improve][broker] System topic writer/reader connection
not counted. (#18369)
b219ccac334 is described below
commit b219ccac334c4b57df48d07e78ea381f54db1a7d
Author: Jiwei Guo <[email protected]>
AuthorDate: Wed Nov 23 14:23:57 2022 +0800
[improve][broker] System topic writer/reader connection not counted.
(#18369)
---
.../pulsar/broker/service/AbstractTopic.java | 24 ++++++++++++---
.../broker/service/persistent/PersistentTopic.java | 3 ++
.../pulsar/broker/service/ReplicatorTest.java | 33 ++++++++++++++++++++
.../systopic/PartitionedSystemTopicTest.java | 36 +++++++++++++++++++++-
4 files changed, 90 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 864fe7f5d65..75b15c15df2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -438,14 +438,21 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
return new PublishRate(config.getMaxPublishRatePerTopicInMessages(),
config.getMaxPublishRatePerTopicInBytes());
}
- protected boolean isProducersExceeded() {
+ protected boolean isProducersExceeded(Producer producer) {
+ if (isSystemTopic() || producer.isRemote()) {
+ return false;
+ }
Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get();
- if (maxProducers > 0 && maxProducers <= producers.size()) {
+ if (maxProducers != null && maxProducers > 0 && maxProducers <=
getUserCreatedProducersSize()) {
return true;
}
return false;
}
+ private long getUserCreatedProducersSize() {
+ return producers.values().stream().filter(p -> !p.isRemote()).count();
+ }
+
protected void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
@@ -487,14 +494,21 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
protected boolean isConsumersExceededOnTopic() {
- int maxConsumersPerTopic =
topicPolicies.getMaxConsumerPerTopic().get();
- if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <=
getNumberOfConsumers()) {
+ if (isSystemTopic()) {
+ return false;
+ }
+ Integer maxConsumersPerTopic =
topicPolicies.getMaxConsumerPerTopic().get();
+ if (maxConsumersPerTopic != null && maxConsumersPerTopic > 0
+ && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
}
return false;
}
protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer)
{
+ if (isSystemTopic()) {
+ return false;
+ }
final int maxSameAddressConsumers =
brokerService.pulsar().getConfiguration()
.getMaxSameAddressConsumersPerTopic();
@@ -951,7 +965,7 @@ public abstract class AbstractTopic implements Topic,
TopicPolicyListener<TopicP
}
protected void internalAddProducer(Producer producer) throws
BrokerServiceException {
- if (isProducersExceeded()) {
+ if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached
max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic
reached max producers limit");
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e4bcb92b58d..ea20d413484 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3321,6 +3321,9 @@ public class PersistentTopic extends AbstractTopic
implements Topic, AddEntryCal
}
private boolean checkMaxSubscriptionsPerTopicExceed(String
subscriptionName) {
+ if (isSystemTopic()) {
+ return false;
+ }
//Existing subscriptions are not affected
if (StringUtils.isNotEmpty(subscriptionName) &&
getSubscription(subscriptionName) != null) {
return false;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index b8f8abc9a62..7f31ce39c96 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -75,6 +75,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
@@ -1505,4 +1506,36 @@ public class ReplicatorTest extends ReplicatorTestBase {
assertTrue(topic.getReplicators().isEmpty());
});
}
+
+ @Test
+ public void testReplicatorProducerNotExceed() throws Exception {
+ log.info("--- testReplicatorProducerNotExceed ---");
+ String namespace1 = "pulsar/ns11";
+ admin1.namespaces().createNamespace(namespace1);
+ admin1.namespaces().setNamespaceReplicationClusters(namespace1,
Sets.newHashSet("r1", "r2"));
+ final TopicName dest1 = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace1 +
"/testReplicatorProducerNotExceed1"));
+ String namespace2 = "pulsar/ns22";
+ admin2.namespaces().createNamespace(namespace2);
+ admin2.namespaces().setNamespaceReplicationClusters(namespace2,
Sets.newHashSet("r1", "r2"));
+ final TopicName dest2 = TopicName.get(
+ BrokerTestUtil.newUniqueName("persistent://" + namespace1 +
"/testReplicatorProducerNotExceed2"));
+ admin1.topics().createPartitionedTopic(dest1.toString(), 1);
+ admin1.topicPolicies().setMaxProducers(dest1.toString(), 1);
+ admin2.topics().createPartitionedTopic(dest2.toString(), 1);
+ admin2.topicPolicies().setMaxProducers(dest2.toString(), 1);
+ @Cleanup
+ MessageProducer producer1 = new MessageProducer(url1, dest1);
+ log.info("--- Starting producer1 --- " + url1);
+
+ producer1.produce(1);
+
+ @Cleanup
+ MessageProducer producer2 = new MessageProducer(url2, dest2);
+ log.info("--- Starting producer2 --- " + url2);
+
+ producer2.produce(1);
+
+ Assert.assertThrows(PulsarClientException.ProducerBusyException.class,
() -> new MessageProducer(url2, dest2));
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index 9ab32d5ffa7..e79197bb1b6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
@@ -202,7 +203,7 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
}
@Test
- private void testSetBacklogCausedCreatingProducerFailure() throws
Exception {
+ public void testSetBacklogCausedCreatingProducerFailure() throws Exception
{
final String ns = "prop/ns-test";
final String topic = ns + "/topic-1";
@@ -260,4 +261,37 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
Assert.fail("failed to create producer");
}
}
+
+ @Test
+ public void testSystemTopicNotCheckExceed() throws Exception {
+ final String ns = "prop/ns-test";
+ final String topic = ns + "/topic-1";
+
+ admin.namespaces().createNamespace(ns, 2);
+ admin.topics().createPartitionedTopic(String.format("persistent://%s",
topic), 1);
+
+ admin.namespaces().setMaxConsumersPerTopic(ns, 1);
+ admin.topicPolicies().setMaxConsumers(topic, 1);
+ NamespaceEventsSystemTopicFactory systemTopicFactory = new
NamespaceEventsSystemTopicFactory(pulsarClient);
+ TopicPoliciesSystemTopicClient systemTopicClientForNamespace =
systemTopicFactory
+ .createTopicPoliciesSystemTopicClient(NamespaceName.get(ns));
+ SystemTopicClient.Reader reader1 =
systemTopicClientForNamespace.newReader();
+ SystemTopicClient.Reader reader2 =
systemTopicClientForNamespace.newReader();
+
+ admin.topicPolicies().setMaxProducers(topic, 1);
+
+ CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 =
systemTopicClientForNamespace.newWriterAsync();
+ CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 =
systemTopicClientForNamespace.newWriterAsync();
+ CompletableFuture<Void> f1 =
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
+
+ FutureUtil.waitForAll(List.of(writer1, writer2, f1)).join();
+ Assert.assertTrue(reader1.hasMoreEvents());
+ Assert.assertNotNull(reader1.readNext());
+ Assert.assertTrue(reader2.hasMoreEvents());
+ Assert.assertNotNull(reader2.readNext());
+ reader1.close();
+ reader2.close();
+ writer1.get().close();
+ writer2.get().close();
+ }
}