merlimat closed pull request #1234: Make max clients per topic/subscription 
configurable
URL: https://github.com/apache/incubator-pulsar/pull/1234
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index d18ebac88..dbea41db3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -168,6 +168,21 @@ enableRunBookieTogether=false
 # Enable to run bookie autorecovery along with broker
 enableRunBookieAutoRecoveryTogether=false
 
+// Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+// until the number of connected producers decrease.
+// Using a value of 0, is disabling maxProducersPerTopic-limit check.
+maxProducersPerTopic=0
+
+// Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+maxConsumersPerTopic=0
+
+// Max number of consumers allowed to connect to subscription. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+maxConsumersPerSubscription=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 1201dce71..a6c915e7a 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -151,6 +151,21 @@ enablePersistentTopics=true
 # Enable broker to load non-persistent topics
 enableNonPersistentTopics=true
 
+// Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+// until the number of connected producers decrease.
+// Using a value of 0, is disabling maxProducersPerTopic-limit check.
+maxProducersPerTopic=0
+
+// Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+maxConsumersPerTopic=0
+
+// Max number of consumers allowed to connect to subscription. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+maxConsumersPerSubscription=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 74077f48e..6d4389990 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -169,6 +169,21 @@
     // Enable to run bookie autorecovery along with broker
     private boolean enableRunBookieAutoRecoveryTogether = false;
 
+    // Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+    // until the number of connected producers decrease.
+    // Using a value of 0, is disabling maxProducersPerTopic-limit check.
+    private int maxProducersPerTopic = 0;
+
+    // Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+    // until the number of connected consumers decrease.
+    // Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+    private int maxConsumersPerTopic = 0;
+
+    // Max number of consumers allowed to connect to subscription. Once this 
limit reaches, Broker will reject new consumers
+    // until the number of connected consumers decrease.
+    // Using a value of 0, is disabling maxConsumersPerSubscription-limit 
check.
+    private int maxConsumersPerSubscription = 0;
+
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
@@ -402,7 +417,6 @@
     // If true, export topic level metrics otherwise namespace level
     private boolean exposeTopicLevelMetricsInPrometheus = true;
 
-
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -738,6 +752,30 @@ public void setEnableRunBookieAutoRecoveryTogether(boolean 
enableRunBookieAutoRe
         this.enableRunBookieAutoRecoveryTogether = 
enableRunBookieAutoRecoveryTogether;
     }
 
+    public int getMaxProducersPerTopic() {
+        return maxProducersPerTopic;
+    }
+
+    public void setMaxProducersPerTopic(int maxProducersPerTopic) {
+        this.maxProducersPerTopic = maxProducersPerTopic;
+    }
+
+    public int getMaxConsumersPerTopic() {
+        return maxConsumersPerTopic;
+    }
+
+    public void setMaxConsumersPerTopic(int maxConsumersPerTopic) {
+        this.maxConsumersPerTopic = maxConsumersPerTopic;
+    }
+
+    public int getMaxConsumersPerSubscription() {
+        return maxConsumersPerSubscription;
+    }
+
+    public void setMaxConsumersPerSubscription(int 
maxConsumersPerSubscription) {
+        this.maxConsumersPerSubscription = maxConsumersPerSubscription;
+    }
+
     public boolean isTlsEnabled() {
         return tlsEnabled;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index d869fc599..5181df177 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -19,29 +19,16 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
-import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -81,6 +68,10 @@ public AbstractDispatcherSingleActiveConsumer(SubType 
subscriptionType, int part
 
     protected abstract void cancelPendingRead();
 
+    protected abstract boolean isConsumersExceededOnTopic();
+
+    protected abstract boolean isConsumersExceededOnSubscription();
+
     protected void pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
@@ -106,6 +97,16 @@ public synchronized void addConsumer(Consumer consumer) 
throws BrokerServiceExce
             throw new ConsumerBusyException("Exclusive consumer is already 
connected");
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", this.topicName);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (subscriptionType == SubType.Failover && 
isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", this.topicName);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumers.add(consumer);
 
         // Pick an active consumer and start it
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index a8b4f35c7..1e72bf6c3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -41,6 +41,12 @@ public ConsumerBusyException(String msg) {
         }
     }
 
+    public static class ProducerBusyException extends BrokerServiceException {
+        public ProducerBusyException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class ServiceUnitNotReadyException extends 
BrokerServiceException {
         public ServiceUnitNotReadyException(String msg) {
             super(msg);
@@ -155,4 +161,4 @@ public TooManyRequestsException(String msg) {
             return PulsarApi.ServerError.UnknownError;
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5fc7676cf..f40564bf6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -26,8 +26,10 @@
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -40,6 +42,9 @@
 public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMultipleConsumers
         implements NonPersistentDispatcher {
 
+    private final NonPersistentTopic topic;
+    private final Subscription subscription;
+
     private CompletableFuture<Void> closeFuture = null;
     private final String name;
     private final Rate msgDrop;
@@ -48,26 +53,54 @@
     @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    private final Subscription subscription;
+    private final ServiceConfiguration serviceConfig;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, 
Subscription subscription) {
+        this.topic = topic;
+        this.subscription = subscription;
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
-        this.subscription = subscription;
+        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) {
+    public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer ", 
name, consumer);
             consumer.disconnect();
             return;
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", name);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumerList.add(consumer);
         consumerSet.add(consumer);
     }
 
+    private boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (consumerSet.removeAll(consumer) == 1) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 107c6d6cb..5b68c673f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -24,6 +24,7 @@
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
@@ -31,14 +32,18 @@
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
+    private final NonPersistentTopic topic;
     private final Rate msgDrop;
     private final Subscription subscription;
+    private final ServiceConfiguration serviceConfig;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName());
+        this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
+        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
     }
 
     @Override
@@ -57,6 +62,22 @@ public void sendMessages(List<Entry> entries) {
         }
     }
 
+    protected boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public Rate getMesssageDropRate() {
         return msgDrop;
@@ -86,5 +107,4 @@ protected void readMoreEntries(Consumer consumer) {
     protected void cancelPendingRead() {
         // No-op
     }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 44a9e1443..4a5f4175c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -42,6 +42,7 @@
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
@@ -234,6 +235,11 @@ public void addProducer(Producer producer) throws 
BrokerServiceException {
                 throw new TopicFencedException("Topic is temporarily 
unavailable");
             }
 
+            if (isProducersExceeded()) {
+                log.warn("[{}] Attempting to add producer to topic which 
reached max producers limit", topic);
+                throw new ProducerBusyException("Topic reached max producers 
limit");
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("[{}] {} Got request to create producer ", topic, 
producer.getProducerName());
             }
@@ -254,6 +260,14 @@ public void addProducer(Producer producer) throws 
BrokerServiceException {
         }
     }
 
+    private boolean isProducersExceeded() {
+        final int maxProducers = 
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        if (maxProducers > 0 && maxProducers <= producers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public void checkMessageDeduplicationInfo() {
         // No-op
@@ -591,6 +605,14 @@ public String toString() {
         return producers;
     }
 
+    public int getNumberOfConsumers() {
+        int count = 0;
+        for (NonPersistentSubscription subscription : subscriptions.values()) {
+            count += subscription.getConsumers().size();
+        }
+        return count;
+    }
+
     @Override
     public ConcurrentOpenHashMap<String, NonPersistentSubscription> 
getSubscriptions() {
         return subscriptions;
@@ -914,5 +936,4 @@ public void markBatchMessagePublished() {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 359466be4..518946564 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Codec;
@@ -98,7 +99,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic 
topic, ManagedCurso
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) {
+    public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer ", 
name, consumer);
             consumer.disconnect();
@@ -115,11 +116,37 @@ public synchronized void addConsumer(Consumer consumer) {
             messagesToReplay.clear();
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", name);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumerList.add(consumer);
         consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
         consumerSet.add(consumer);
     }
 
+    private boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         // decrement unack-message count for removed consumer
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4e3fd00b5..716e3325e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -107,6 +107,22 @@ protected void scheduleReadOnActiveConsumer() {
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
     }
 
+    protected boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     protected void cancelPendingRead() {
         if (havePendingRead && cursor.cancelPendingReadRequest()) {
             havePendingRead = false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 20ae52726..05c0b18cb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -59,6 +59,7 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
@@ -315,6 +316,11 @@ public void addProducer(Producer producer) throws 
BrokerServiceException {
                 throw new TopicTerminatedException("Topic was already 
terminated");
             }
 
+            if (isProducersExceeded()) {
+                log.warn("[{}] Attempting to add producer to topic which 
reached max producers limit", topic);
+                throw new ProducerBusyException("Topic reached max producers 
limit");
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("[{}] {} Got request to create producer ", topic, 
producer.getProducerName());
             }
@@ -338,6 +344,14 @@ public void addProducer(Producer producer) throws 
BrokerServiceException {
         }
     }
 
+    private boolean isProducersExceeded() {
+        final int maxProducers = 
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        if (maxProducers > 0 && maxProducers <= producers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     private boolean hasLocalProducers() {
         AtomicBoolean foundLocal = new AtomicBoolean(false);
         producers.forEach(producer -> {
@@ -942,6 +956,14 @@ public String toString() {
         return producers;
     }
 
+    public int getNumberOfConsumers() {
+        int count = 0;
+        for (PersistentSubscription subscription : subscriptions.values()) {
+            count += subscription.getConsumers().size();
+        }
+        return count;
+    }
+
     @Override
     public ConcurrentOpenHashMap<String, PersistentSubscription> 
getSubscriptions() {
         return subscriptions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f76bd9074..55019a0d6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -39,7 +39,7 @@
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.ImmutableMap;
-
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URL;
@@ -362,6 +362,36 @@ public void testAddRemoveProducer() throws Exception {
         topic.removeProducer(producer); /* noop */
     }
 
+    @Test
+    public void testMaxProducers() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+
+        String role = "appid1";
+        // 1. add producer1
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name1", role, false, null);
+        topic.addProducer(producer);
+        assertEquals(topic.getProducers().size(), 1);
+
+        // 2. add producer2
+        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id 
*/, "prod-name2", role, false, null);
+        topic.addProducer(producer2);
+        assertEquals(topic.getProducers().size(), 2);
+
+        // 3. add producer3 but reached maxProducersPerTopic
+        try {
+            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer 
id */, "prod-name3", role, false, null);
+            topic.addProducer(producer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ProducerBusyException);
+        }
+    }
+
     @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
@@ -444,6 +474,146 @@ public void testAddRemoveConsumer() throws Exception {
         }
     }
 
+    @Test
+    public void testMaxConsumersShared() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursorMock);
+
+        // for count consumers on topic
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = 
new ConcurrentOpenHashMap<>(16, 1);
+        subscriptions.put("sub-1", sub);
+        subscriptions.put("sub-2", sub2);
+        Field field = topic.getClass().getDeclaredField("subscriptions");
+        field.setAccessible(true);
+        field.set(topic, subscriptions);
+
+        // 1. add consumer1
+        Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 
1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer);
+        assertEquals(sub.getConsumers().size(), 1);
+
+        // 2. add consumer2
+        Consumer consumer2 = new Consumer(sub, SubType.Shared, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer2);
+        assertEquals(sub.getConsumers().size(), 2);
+
+        // 3. add consumer3 but reach maxConsumersPerSubscription
+        try {
+            Consumer consumer3 = new Consumer(sub, SubType.Shared, 
topic.getName(), 3 /* consumer id */, 0,
+                    "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub.addConsumer(consumer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 2);
+
+        // 4. add consumer4 to sub2
+        Consumer consumer4 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 4 /* consumer id */, 0,
+                "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub2.addConsumer(consumer4);
+        assertEquals(sub2.getConsumers().size(), 1);
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 3);
+
+        // 5. add consumer5 to sub2 but reach maxConsumersPerTopic
+        try {
+            Consumer consumer5 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 5 /* consumer id */, 0,
+                    "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub2.addConsumer(consumer5);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+    }
+
+    @Test
+    public void testMaxConsumersFailover() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursorMock);
+
+        // for count consumers on topic
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = 
new ConcurrentOpenHashMap<>(16, 1);
+        subscriptions.put("sub-1", sub);
+        subscriptions.put("sub-2", sub2);
+        Field field = topic.getClass().getDeclaredField("subscriptions");
+        field.setAccessible(true);
+        field.set(topic, subscriptions);
+
+        // 1. add consumer1
+        Consumer consumer = new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer);
+        assertEquals(sub.getConsumers().size(), 1);
+
+        // 2. add consumer2
+        Consumer consumer2 = new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer2);
+        assertEquals(sub.getConsumers().size(), 2);
+
+        // 3. add consumer3 but reach maxConsumersPerSubscription
+        try {
+            Consumer consumer3 = new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0,
+                    "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub.addConsumer(consumer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 2);
+
+        // 4. add consumer4 to sub2
+        Consumer consumer4 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 4 /* consumer id */, 0,
+                "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub2.addConsumer(consumer4);
+        assertEquals(sub2.getConsumers().size(), 1);
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 3);
+
+        // 5. add consumer5 to sub2 but reach maxConsumersPerTopic
+        try {
+            Consumer consumer5 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 5 /* consumer id */, 0,
+                    "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub2.addConsumer(consumer5);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+    }
+
     @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to