This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5613c2b  Make max clients per topic/subscription configurable (#1234)
5613c2b is described below

commit 5613c2b92f6b7989d6a4c65e259713953e933e8f
Author: hrsakai <hsa...@yahoo-corp.jp>
AuthorDate: Thu Feb 15 08:00:23 2018 +0900

    Make max clients per topic/subscription configurable (#1234)
---
 conf/broker.conf                                   |  15 ++
 conf/standalone.conf                               |  15 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  40 ++++-
 .../AbstractDispatcherSingleActiveConsumer.java    |  27 ++--
 .../broker/service/BrokerServiceException.java     |   8 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |  39 ++++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  22 ++-
 .../service/nonpersistent/NonPersistentTopic.java  |  23 ++-
 .../PersistentDispatcherMultipleConsumers.java     |  29 +++-
 .../PersistentDispatcherSingleActiveConsumer.java  |  16 ++
 .../broker/service/persistent/PersistentTopic.java |  22 +++
 .../pulsar/broker/service/PersistentTopicTest.java | 172 ++++++++++++++++++++-
 12 files changed, 406 insertions(+), 22 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d18ebac..dbea41d 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -168,6 +168,21 @@ enableRunBookieTogether=false
 # Enable to run bookie autorecovery along with broker
 enableRunBookieAutoRecoveryTogether=false
 
+// Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+// until the number of connected producers decrease.
+// Using a value of 0, is disabling maxProducersPerTopic-limit check.
+maxProducersPerTopic=0
+
+// Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+maxConsumersPerTopic=0
+
+// Max number of consumers allowed to connect to subscription. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+maxConsumersPerSubscription=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 1201dce..a6c915e 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -151,6 +151,21 @@ enablePersistentTopics=true
 # Enable broker to load non-persistent topics
 enableNonPersistentTopics=true
 
+// Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+// until the number of connected producers decrease.
+// Using a value of 0, is disabling maxProducersPerTopic-limit check.
+maxProducersPerTopic=0
+
+// Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+maxConsumersPerTopic=0
+
+// Max number of consumers allowed to connect to subscription. Once this limit 
reaches, Broker will reject new consumers
+// until the number of connected consumers decrease.
+// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
+maxConsumersPerSubscription=0
+
 ### --- Authentication --- ###
 # Role names that are treated as "proxy roles". If the broker sees a request 
with
 #role as proxyRoles - it will demand to see a valid original principal.
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fcd9a92..4be2195 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -169,6 +169,21 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // Enable to run bookie autorecovery along with broker
     private boolean enableRunBookieAutoRecoveryTogether = false;
 
+    // Max number of producers allowed to connect to topic. Once this limit 
reaches, Broker will reject new producers
+    // until the number of connected producers decrease.
+    // Using a value of 0, is disabling maxProducersPerTopic-limit check.
+    private int maxProducersPerTopic = 0;
+
+    // Max number of consumers allowed to connect to topic. Once this limit 
reaches, Broker will reject new consumers
+    // until the number of connected consumers decrease.
+    // Using a value of 0, is disabling maxConsumersPerTopic-limit check.
+    private int maxConsumersPerTopic = 0;
+
+    // Max number of consumers allowed to connect to subscription. Once this 
limit reaches, Broker will reject new consumers
+    // until the number of connected consumers decrease.
+    // Using a value of 0, is disabling maxConsumersPerSubscription-limit 
check.
+    private int maxConsumersPerSubscription = 0;
+
     /***** --- TLS --- ****/
     // Enable TLS
     private boolean tlsEnabled = false;
@@ -408,7 +423,6 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     // If true, export topic level metrics otherwise namespace level
     private boolean exposeTopicLevelMetricsInPrometheus = true;
 
-
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -744,6 +758,30 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
         this.enableRunBookieAutoRecoveryTogether = 
enableRunBookieAutoRecoveryTogether;
     }
 
+    public int getMaxProducersPerTopic() {
+        return maxProducersPerTopic;
+    }
+
+    public void setMaxProducersPerTopic(int maxProducersPerTopic) {
+        this.maxProducersPerTopic = maxProducersPerTopic;
+    }
+
+    public int getMaxConsumersPerTopic() {
+        return maxConsumersPerTopic;
+    }
+
+    public void setMaxConsumersPerTopic(int maxConsumersPerTopic) {
+        this.maxConsumersPerTopic = maxConsumersPerTopic;
+    }
+
+    public int getMaxConsumersPerSubscription() {
+        return maxConsumersPerSubscription;
+    }
+
+    public void setMaxConsumersPerSubscription(int 
maxConsumersPerSubscription) {
+        this.maxConsumersPerSubscription = maxConsumersPerSubscription;
+    }
+
     public boolean isTlsEnabled() {
         return tlsEnabled;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index d869fc5..5181df1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -19,29 +19,16 @@
 package org.apache.pulsar.broker.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import 
org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
-import 
org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
-import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
-import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
@@ -81,6 +68,10 @@ public abstract class AbstractDispatcherSingleActiveConsumer 
{
 
     protected abstract void cancelPendingRead();
 
+    protected abstract boolean isConsumersExceededOnTopic();
+
+    protected abstract boolean isConsumersExceededOnSubscription();
+
     protected void pickAndScheduleActiveConsumer() {
         checkArgument(!consumers.isEmpty());
 
@@ -106,6 +97,16 @@ public abstract class 
AbstractDispatcherSingleActiveConsumer {
             throw new ConsumerBusyException("Exclusive consumer is already 
connected");
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", this.topicName);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (subscriptionType == SubType.Failover && 
isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", this.topicName);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumers.add(consumer);
 
         // Pick an active consumer and start it
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index a8b4f35..1e72bf6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -41,6 +41,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class ProducerBusyException extends BrokerServiceException {
+        public ProducerBusyException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class ServiceUnitNotReadyException extends 
BrokerServiceException {
         public ServiceUnitNotReadyException(String msg) {
             super(msg);
@@ -155,4 +161,4 @@ public class BrokerServiceException extends Exception {
             return PulsarApi.ServerError.UnknownError;
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 5fc7676..f40564b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -26,8 +26,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -40,6 +42,9 @@ import org.slf4j.LoggerFactory;
 public class NonPersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMultipleConsumers
         implements NonPersistentDispatcher {
 
+    private final NonPersistentTopic topic;
+    private final Subscription subscription;
+
     private CompletableFuture<Void> closeFuture = null;
     private final String name;
     private final Rate msgDrop;
@@ -48,26 +53,54 @@ public class NonPersistentDispatcherMultipleConsumers 
extends AbstractDispatcher
     @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    private final Subscription subscription;
+    private final ServiceConfiguration serviceConfig;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, 
Subscription subscription) {
+        this.topic = topic;
+        this.subscription = subscription;
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
-        this.subscription = subscription;
+        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) {
+    public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer ", 
name, consumer);
             consumer.disconnect();
             return;
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", name);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumerList.add(consumer);
         consumerSet.add(consumer);
     }
 
+    private boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (consumerSet.removeAll(consumer) == 1) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 107c6d6..5b68c67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.util.Rate;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
@@ -31,14 +32,18 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends 
AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
 
+    private final NonPersistentTopic topic;
     private final Rate msgDrop;
     private final Subscription subscription;
+    private final ServiceConfiguration serviceConfig;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType 
subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName());
+        this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
+        this.serviceConfig = 
topic.getBrokerService().pulsar().getConfiguration();
     }
 
     @Override
@@ -57,6 +62,22 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         }
     }
 
+    protected boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public Rate getMesssageDropRate() {
         return msgDrop;
@@ -86,5 +107,4 @@ public final class 
NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     protected void cancelPendingRead() {
         // No-op
     }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 588c494..fdbadc6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -43,6 +43,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
@@ -235,6 +236,11 @@ public class NonPersistentTopic implements Topic {
                 throw new TopicFencedException("Topic is temporarily 
unavailable");
             }
 
+            if (isProducersExceeded()) {
+                log.warn("[{}] Attempting to add producer to topic which 
reached max producers limit", topic);
+                throw new ProducerBusyException("Topic reached max producers 
limit");
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("[{}] {} Got request to create producer ", topic, 
producer.getProducerName());
             }
@@ -255,6 +261,14 @@ public class NonPersistentTopic implements Topic {
         }
     }
 
+    private boolean isProducersExceeded() {
+        final int maxProducers = 
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        if (maxProducers > 0 && maxProducers <= producers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public void checkMessageDeduplicationInfo() {
         // No-op
@@ -592,6 +606,14 @@ public class NonPersistentTopic implements Topic {
         return producers;
     }
 
+    public int getNumberOfConsumers() {
+        int count = 0;
+        for (NonPersistentSubscription subscription : subscriptions.values()) {
+            count += subscription.getConsumers().size();
+        }
+        return count;
+    }
+
     @Override
     public ConcurrentOpenHashMap<String, NonPersistentSubscription> 
getSubscriptions() {
         return subscriptions;
@@ -920,5 +942,4 @@ public class NonPersistentTopic implements Topic {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 359466b..5189465 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -41,6 +41,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.Codec;
@@ -98,7 +99,7 @@ public class PersistentDispatcherMultipleConsumers  extends 
AbstractDispatcherMu
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) {
+    public synchronized void addConsumer(Consumer consumer) throws 
BrokerServiceException {
         if (IS_CLOSED_UPDATER.get(this) == TRUE) {
             log.warn("[{}] Dispatcher is already closed. Closing consumer ", 
name, consumer);
             consumer.disconnect();
@@ -115,11 +116,37 @@ public class PersistentDispatcherMultipleConsumers  
extends AbstractDispatcherMu
             messagesToReplay.clear();
         }
 
+        if (isConsumersExceededOnTopic()) {
+            log.warn("[{}] Attempting to add consumer to topic which reached 
max consumers limit", name);
+            throw new ConsumerBusyException("Topic reached max consumers 
limit");
+        }
+
+        if (isConsumersExceededOnSubscription()) {
+            log.warn("[{}] Attempting to add consumer to subscription which 
reached max consumers limit", name);
+            throw new ConsumerBusyException("Subscription reached max 
consumers limit");
+        }
+
         consumerList.add(consumer);
         consumerList.sort((c1, c2) -> c1.getPriorityLevel() - 
c2.getPriorityLevel());
         consumerSet.add(consumer);
     }
 
+    private boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumerList.size()) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public synchronized void removeConsumer(Consumer consumer) throws 
BrokerServiceException {
         // decrement unack-message count for removed consumer
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4e3fd00..716e332 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -107,6 +107,22 @@ public final class 
PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), 
TimeUnit.MILLISECONDS);
     }
 
+    protected boolean isConsumersExceededOnTopic() {
+        final int maxConsumersPerTopic = 
serviceConfig.getMaxConsumersPerTopic();
+        if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= 
topic.getNumberOfConsumers()) {
+            return true;
+        }
+        return false;
+    }
+
+    protected boolean isConsumersExceededOnSubscription() {
+        final int maxConsumersPerSubscription = 
serviceConfig.getMaxConsumersPerSubscription();
+        if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= 
consumers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     protected void cancelPendingRead() {
         if (havePendingRead && cursor.cancelPendingReadRequest()) {
             havePendingRead = false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a17e2de..5c82ef3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -59,6 +59,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
@@ -315,6 +316,11 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
                 throw new TopicTerminatedException("Topic was already 
terminated");
             }
 
+            if (isProducersExceeded()) {
+                log.warn("[{}] Attempting to add producer to topic which 
reached max producers limit", topic);
+                throw new ProducerBusyException("Topic reached max producers 
limit");
+            }
+
             if (log.isDebugEnabled()) {
                 log.debug("[{}] {} Got request to create producer ", topic, 
producer.getProducerName());
             }
@@ -338,6 +344,14 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         }
     }
 
+    private boolean isProducersExceeded() {
+        final int maxProducers = 
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        if (maxProducers > 0 && maxProducers <= producers.size()) {
+            return true;
+        }
+        return false;
+    }
+
     private boolean hasLocalProducers() {
         AtomicBoolean foundLocal = new AtomicBoolean(false);
         producers.forEach(producer -> {
@@ -942,6 +956,14 @@ public class PersistentTopic implements Topic, 
AddEntryCallback {
         return producers;
     }
 
+    public int getNumberOfConsumers() {
+        int count = 0;
+        for (PersistentSubscription subscription : subscriptions.values()) {
+            count += subscription.getConsumers().size();
+        }
+        return count;
+    }
+
     @Override
     public ConcurrentOpenHashMap<String, PersistentSubscription> 
getSubscriptions() {
         return subscriptions;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f76bd90..55019a0 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -39,7 +39,7 @@ import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.ImmutableMap;
-
+import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.net.URL;
@@ -363,6 +363,36 @@ public class PersistentTopicTest {
     }
 
     @Test
+    public void testMaxProducers() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxProducersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+
+        String role = "appid1";
+        // 1. add producer1
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id 
*/, "prod-name1", role, false, null);
+        topic.addProducer(producer);
+        assertEquals(topic.getProducers().size(), 1);
+
+        // 2. add producer2
+        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id 
*/, "prod-name2", role, false, null);
+        topic.addProducer(producer2);
+        assertEquals(topic.getProducers().size(), 2);
+
+        // 3. add producer3 but reached maxProducersPerTopic
+        try {
+            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer 
id */, "prod-name3", role, false, null);
+            topic.addProducer(producer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ProducerBusyException);
+        }
+    }
+
+    @Test
     public void testSubscribeFail() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
 
@@ -445,6 +475,146 @@ public class PersistentTopicTest {
     }
 
     @Test
+    public void testMaxConsumersShared() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursorMock);
+
+        // for count consumers on topic
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = 
new ConcurrentOpenHashMap<>(16, 1);
+        subscriptions.put("sub-1", sub);
+        subscriptions.put("sub-2", sub2);
+        Field field = topic.getClass().getDeclaredField("subscriptions");
+        field.setAccessible(true);
+        field.set(topic, subscriptions);
+
+        // 1. add consumer1
+        Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 
1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer);
+        assertEquals(sub.getConsumers().size(), 1);
+
+        // 2. add consumer2
+        Consumer consumer2 = new Consumer(sub, SubType.Shared, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer2);
+        assertEquals(sub.getConsumers().size(), 2);
+
+        // 3. add consumer3 but reach maxConsumersPerSubscription
+        try {
+            Consumer consumer3 = new Consumer(sub, SubType.Shared, 
topic.getName(), 3 /* consumer id */, 0,
+                    "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub.addConsumer(consumer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 2);
+
+        // 4. add consumer4 to sub2
+        Consumer consumer4 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 4 /* consumer id */, 0,
+                "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub2.addConsumer(consumer4);
+        assertEquals(sub2.getConsumers().size(), 1);
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 3);
+
+        // 5. add consumer5 to sub2 but reach maxConsumersPerTopic
+        try {
+            Consumer consumer5 = new Consumer(sub2, SubType.Shared, 
topic.getName(), 5 /* consumer id */, 0,
+                    "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub2.addConsumer(consumer5);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+    }
+
+    @Test
+    public void testMaxConsumersFailover() throws Exception {
+        // set max clients
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        doReturn(2).when(svcConfig).getMaxConsumersPerSubscription();
+        doReturn(3).when(svcConfig).getMaxConsumersPerTopic();
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
+        PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, 
"sub-2", cursorMock);
+
+        // for count consumers on topic
+        ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = 
new ConcurrentOpenHashMap<>(16, 1);
+        subscriptions.put("sub-1", sub);
+        subscriptions.put("sub-2", sub2);
+        Field field = topic.getClass().getDeclaredField("subscriptions");
+        field.setAccessible(true);
+        field.set(topic, subscriptions);
+
+        // 1. add consumer1
+        Consumer consumer = new Consumer(sub, SubType.Failover, 
topic.getName(), 1 /* consumer id */, 0,
+                "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer);
+        assertEquals(sub.getConsumers().size(), 1);
+
+        // 2. add consumer2
+        Consumer consumer2 = new Consumer(sub, SubType.Failover, 
topic.getName(), 2 /* consumer id */, 0,
+                "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub.addConsumer(consumer2);
+        assertEquals(sub.getConsumers().size(), 2);
+
+        // 3. add consumer3 but reach maxConsumersPerSubscription
+        try {
+            Consumer consumer3 = new Consumer(sub, SubType.Failover, 
topic.getName(), 3 /* consumer id */, 0,
+                    "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub.addConsumer(consumer3);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 2);
+
+        // 4. add consumer4 to sub2
+        Consumer consumer4 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 4 /* consumer id */, 0,
+                "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                false /* read compacted */);
+        sub2.addConsumer(consumer4);
+        assertEquals(sub2.getConsumers().size(), 1);
+
+        // check number of consumers on topic
+        assertEquals(topic.getNumberOfConsumers(), 3);
+
+        // 5. add consumer5 to sub2 but reach maxConsumersPerTopic
+        try {
+            Consumer consumer5 = new Consumer(sub2, SubType.Failover, 
topic.getName(), 5 /* consumer id */, 0,
+                    "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", 
Collections.emptyMap(),
+                    false /* read compacted */);
+            sub2.addConsumer(consumer5);
+            fail("should have failed");
+        } catch (BrokerServiceException e) {
+            assertTrue(e instanceof 
BrokerServiceException.ConsumerBusyException);
+        }
+    }
+
+    @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, 
ledgerMock, brokerService);
         PersistentSubscription sub = new PersistentSubscription(topic, 
"sub-1", cursorMock);

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to