This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 9ca74fcc844 [improve][broker] System topic writer/reader connection 
not counted (#18603)
9ca74fcc844 is described below

commit 9ca74fcc844aeb51be25d5b64a2d924fee0f6123
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Nov 28 09:44:20 2022 +0800

    [improve][broker] System topic writer/reader connection not counted (#18603)
    
    This PR is a supplement to #18369.
    - `AbstractTopic.isSameAddressProducersExceeded()`
    - `AbstractBaseDispatcher.isConsumersExceededOnSubscription()`
    
    (cherry picked from commit a2fb5622a11a82bc867083d9b3411567dacf369b)
---
 .../org/apache/pulsar/broker/service/AbstractBaseDispatcher.java | 9 ++++++---
 .../java/org/apache/pulsar/broker/service/AbstractTopic.java     | 3 +++
 .../nonpersistent/NonPersistentDispatcherMultipleConsumers.java  | 2 +-
 .../NonPersistentDispatcherSingleActiveConsumer.java             | 2 +-
 .../persistent/PersistentDispatcherMultipleConsumers.java        | 2 +-
 .../persistent/PersistentDispatcherSingleActiveConsumer.java     | 2 +-
 .../pulsar/broker/systopic/PartitionedSystemTopicTest.java       | 4 +++-
 7 files changed, 16 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 3f5932a2d34..1e3b04da37a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -202,18 +202,21 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
     protected abstract boolean isConsumersExceededOnSubscription();
 
     protected boolean isConsumersExceededOnSubscription(BrokerService 
brokerService,
-                                                        String topic, int 
consumerSize) {
+                                                        AbstractTopic topic, 
int consumerSize) {
+        if (topic.isSystemTopic()) {
+            return false;
+        }
         Policies policies = null;
         Integer maxConsumersPerSubscription = null;
         try {
             maxConsumersPerSubscription = brokerService
-                    .getTopicPolicies(TopicName.get(topic))
+                    .getTopicPolicies(TopicName.get(topic.getName()))
                     .map(TopicPolicies::getMaxConsumersPerSubscription)
                     .orElse(null);
             if (maxConsumersPerSubscription == null) {
                 // Use getDataIfPresent from zk cache to make the call 
non-blocking and prevent deadlocks in addConsumer
                 policies = 
brokerService.pulsar().getPulsarResources().getNamespaceResources()
-                        
.getPoliciesIfCached(TopicName.get(topic).getNamespaceObject()).orElse(null);
+                        
.getPoliciesIfCached(TopicName.get(topic.getName()).getNamespaceObject()).orElse(null);
             }
         } catch (Exception e) {
             log.debug("Get topic or namespace policies fail", e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index eb06a41d42f..00c7a494ed3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -189,6 +189,9 @@ public abstract class AbstractTopic implements Topic {
     }
 
     protected boolean isSameAddressProducersExceeded(Producer producer) {
+        if (isSystemTopic() || producer.isRemote()) {
+            return false;
+        }
         final int maxSameAddressProducers = 
brokerService.pulsar().getConfiguration()
                 .getMaxSameAddressProducersPerTopic();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 56886833592..185912dda1e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -85,7 +85,7 @@ public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcher
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumerList.size());
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic, consumerList.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 5cdbff17b81..5579e4dd154 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -73,7 +73,7 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumers.size());
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic, consumers.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02fc8050763..bcfab873669 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -168,7 +168,7 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumerList.size());
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic, consumerList.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 727c4f09af9..9a3c9fa3485 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -131,7 +131,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
     @Override
     protected boolean isConsumersExceededOnSubscription() {
-        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic.getName(), consumers.size());
+        return isConsumersExceededOnSubscription(topic.getBrokerService(), 
topic, consumers.size());
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index ce4f44dd339..3a6529f8964 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -244,6 +244,7 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         admin.namespaces().createNamespace(ns, 2);
         admin.topics().createPartitionedTopic(String.format("persistent://%s", 
topic), 1);
 
+        conf.setMaxSameAddressConsumersPerTopic(1);
         admin.namespaces().setMaxConsumersPerTopic(ns, 1);
         admin.topicPolicies().setMaxConsumers(topic, 1);
         NamespaceEventsSystemTopicFactory systemTopicFactory = new 
NamespaceEventsSystemTopicFactory(pulsarClient);
@@ -252,8 +253,9 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         SystemTopicClient.Reader reader1 = 
systemTopicClientForNamespace.newReader();
         SystemTopicClient.Reader reader2 = 
systemTopicClientForNamespace.newReader();
 
+        conf.setMaxSameAddressProducersPerTopic(1);
+        admin.namespaces().setMaxProducersPerTopic(ns, 1);
         admin.topicPolicies().setMaxProducers(topic, 1);
-
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer1 = 
systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = 
systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<Void> f1 = 
admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);

Reply via email to