This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5613c2b Make max clients per topic/subscription configurable (#1234) 5613c2b is described below commit 5613c2b92f6b7989d6a4c65e259713953e933e8f Author: hrsakai <hsa...@yahoo-corp.jp> AuthorDate: Thu Feb 15 08:00:23 2018 +0900 Make max clients per topic/subscription configurable (#1234) --- conf/broker.conf | 15 ++ conf/standalone.conf | 15 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 40 ++++- .../AbstractDispatcherSingleActiveConsumer.java | 27 ++-- .../broker/service/BrokerServiceException.java | 8 +- .../NonPersistentDispatcherMultipleConsumers.java | 39 ++++- ...onPersistentDispatcherSingleActiveConsumer.java | 22 ++- .../service/nonpersistent/NonPersistentTopic.java | 23 ++- .../PersistentDispatcherMultipleConsumers.java | 29 +++- .../PersistentDispatcherSingleActiveConsumer.java | 16 ++ .../broker/service/persistent/PersistentTopic.java | 22 +++ .../pulsar/broker/service/PersistentTopicTest.java | 172 ++++++++++++++++++++- 12 files changed, 406 insertions(+), 22 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index d18ebac..dbea41d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -168,6 +168,21 @@ enableRunBookieTogether=false # Enable to run bookie autorecovery along with broker enableRunBookieAutoRecoveryTogether=false +// Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers +// until the number of connected producers decrease. +// Using a value of 0, is disabling maxProducersPerTopic-limit check. +maxProducersPerTopic=0 + +// Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers +// until the number of connected consumers decrease. +// Using a value of 0, is disabling maxConsumersPerTopic-limit check. +maxConsumersPerTopic=0 + +// Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers +// until the number of connected consumers decrease. +// Using a value of 0, is disabling maxConsumersPerSubscription-limit check. +maxConsumersPerSubscription=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/conf/standalone.conf b/conf/standalone.conf index 1201dce..a6c915e 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -151,6 +151,21 @@ enablePersistentTopics=true # Enable broker to load non-persistent topics enableNonPersistentTopics=true +// Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers +// until the number of connected producers decrease. +// Using a value of 0, is disabling maxProducersPerTopic-limit check. +maxProducersPerTopic=0 + +// Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers +// until the number of connected consumers decrease. +// Using a value of 0, is disabling maxConsumersPerTopic-limit check. +maxConsumersPerTopic=0 + +// Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers +// until the number of connected consumers decrease. +// Using a value of 0, is disabling maxConsumersPerSubscription-limit check. +maxConsumersPerSubscription=0 + ### --- Authentication --- ### # Role names that are treated as "proxy roles". If the broker sees a request with #role as proxyRoles - it will demand to see a valid original principal. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index fcd9a92..4be2195 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -169,6 +169,21 @@ public class ServiceConfiguration implements PulsarConfiguration { // Enable to run bookie autorecovery along with broker private boolean enableRunBookieAutoRecoveryTogether = false; + // Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers + // until the number of connected producers decrease. + // Using a value of 0, is disabling maxProducersPerTopic-limit check. + private int maxProducersPerTopic = 0; + + // Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers + // until the number of connected consumers decrease. + // Using a value of 0, is disabling maxConsumersPerTopic-limit check. + private int maxConsumersPerTopic = 0; + + // Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers + // until the number of connected consumers decrease. + // Using a value of 0, is disabling maxConsumersPerSubscription-limit check. + private int maxConsumersPerSubscription = 0; + /***** --- TLS --- ****/ // Enable TLS private boolean tlsEnabled = false; @@ -408,7 +423,6 @@ public class ServiceConfiguration implements PulsarConfiguration { // If true, export topic level metrics otherwise namespace level private boolean exposeTopicLevelMetricsInPrometheus = true; - public String getZookeeperServers() { return zookeeperServers; } @@ -744,6 +758,30 @@ public class ServiceConfiguration implements PulsarConfiguration { this.enableRunBookieAutoRecoveryTogether = enableRunBookieAutoRecoveryTogether; } + public int getMaxProducersPerTopic() { + return maxProducersPerTopic; + } + + public void setMaxProducersPerTopic(int maxProducersPerTopic) { + this.maxProducersPerTopic = maxProducersPerTopic; + } + + public int getMaxConsumersPerTopic() { + return maxConsumersPerTopic; + } + + public void setMaxConsumersPerTopic(int maxConsumersPerTopic) { + this.maxConsumersPerTopic = maxConsumersPerTopic; + } + + public int getMaxConsumersPerSubscription() { + return maxConsumersPerSubscription; + } + + public void setMaxConsumersPerSubscription(int maxConsumersPerSubscription) { + this.maxConsumersPerSubscription = maxConsumersPerSubscription; + } + public boolean isTlsEnabled() { return tlsEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index d869fc5..5181df1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -19,29 +19,16 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; -import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException; -import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; -import org.apache.pulsar.broker.service.Dispatcher; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.utils.CopyOnWriteArrayList; import org.slf4j.Logger; @@ -81,6 +68,10 @@ public abstract class AbstractDispatcherSingleActiveConsumer { protected abstract void cancelPendingRead(); + protected abstract boolean isConsumersExceededOnTopic(); + + protected abstract boolean isConsumersExceededOnSubscription(); + protected void pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); @@ -106,6 +97,16 @@ public abstract class AbstractDispatcherSingleActiveConsumer { throw new ConsumerBusyException("Exclusive consumer is already connected"); } + if (isConsumersExceededOnTopic()) { + log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", this.topicName); + throw new ConsumerBusyException("Topic reached max consumers limit"); + } + + if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) { + log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", this.topicName); + throw new ConsumerBusyException("Subscription reached max consumers limit"); + } + consumers.add(consumer); // Pick an active consumer and start it diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index a8b4f35..1e72bf6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -41,6 +41,12 @@ public class BrokerServiceException extends Exception { } } + public static class ProducerBusyException extends BrokerServiceException { + public ProducerBusyException(String msg) { + super(msg); + } + } + public static class ServiceUnitNotReadyException extends BrokerServiceException { public ServiceUnitNotReadyException(String msg) { super(msg); @@ -155,4 +161,4 @@ public class BrokerServiceException extends Exception { return PulsarApi.ServerError.UnknownError; } } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 5fc7676..f40564b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -26,8 +26,10 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.util.Rate; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; @@ -40,6 +42,9 @@ import org.slf4j.LoggerFactory; public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements NonPersistentDispatcher { + private final NonPersistentTopic topic; + private final Subscription subscription; + private CompletableFuture<Void> closeFuture = null; private final String name; private final Rate msgDrop; @@ -48,26 +53,54 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher @SuppressWarnings("unused") private volatile int totalAvailablePermits = 0; - private final Subscription subscription; + private final ServiceConfiguration serviceConfig; public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) { + this.topic = topic; + this.subscription = subscription; this.name = topic.getName() + " / " + subscription.getName(); this.msgDrop = new Rate(); - this.subscription = subscription; + this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); } @Override - public synchronized void addConsumer(Consumer consumer) { + public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer); consumer.disconnect(); return; } + if (isConsumersExceededOnTopic()) { + log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", name); + throw new ConsumerBusyException("Topic reached max consumers limit"); + } + + if (isConsumersExceededOnSubscription()) { + log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); + throw new ConsumerBusyException("Subscription reached max consumers limit"); + } + consumerList.add(consumer); consumerSet.add(consumer); } + private boolean isConsumersExceededOnTopic() { + final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic(); + if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) { + return true; + } + return false; + } + + private boolean isConsumersExceededOnSubscription() { + final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription(); + if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) { + return true; + } + return false; + } + @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { if (consumerSet.removeAll(consumer) == 1) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index 107c6d6..5b68c67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.util.Rate; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; @@ -31,14 +32,18 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher { + private final NonPersistentTopic topic; private final Rate msgDrop; private final Subscription subscription; + private final ServiceConfiguration serviceConfig; public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, NonPersistentTopic topic, Subscription subscription) { super(subscriptionType, partitionIndex, topic.getName()); + this.topic = topic; this.subscription = subscription; this.msgDrop = new Rate(); + this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); } @Override @@ -57,6 +62,22 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD } } + protected boolean isConsumersExceededOnTopic() { + final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic(); + if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) { + return true; + } + return false; + } + + protected boolean isConsumersExceededOnSubscription() { + final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription(); + if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) { + return true; + } + return false; + } + @Override public Rate getMesssageDropRate() { return msgDrop; @@ -86,5 +107,4 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD protected void cancelPendingRead() { // No-op } - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 588c494..fdbadc6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -43,6 +43,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; +import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; @@ -235,6 +236,11 @@ public class NonPersistentTopic implements Topic { throw new TopicFencedException("Topic is temporarily unavailable"); } + if (isProducersExceeded()) { + log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); + throw new ProducerBusyException("Topic reached max producers limit"); + } + if (log.isDebugEnabled()) { log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); } @@ -255,6 +261,14 @@ public class NonPersistentTopic implements Topic { } } + private boolean isProducersExceeded() { + final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic(); + if (maxProducers > 0 && maxProducers <= producers.size()) { + return true; + } + return false; + } + @Override public void checkMessageDeduplicationInfo() { // No-op @@ -592,6 +606,14 @@ public class NonPersistentTopic implements Topic { return producers; } + public int getNumberOfConsumers() { + int count = 0; + for (NonPersistentSubscription subscription : subscriptions.values()) { + count += subscription.getConsumers().size(); + } + return count; + } + @Override public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() { return subscriptions; @@ -920,5 +942,4 @@ public class NonPersistentTopic implements Topic { } private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class); - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 359466b..5189465 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -41,6 +41,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Consumer.SendMessageInfo; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; @@ -98,7 +99,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu } @Override - public synchronized void addConsumer(Consumer consumer) { + public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { if (IS_CLOSED_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer); consumer.disconnect(); @@ -115,11 +116,37 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu messagesToReplay.clear(); } + if (isConsumersExceededOnTopic()) { + log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", name); + throw new ConsumerBusyException("Topic reached max consumers limit"); + } + + if (isConsumersExceededOnSubscription()) { + log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name); + throw new ConsumerBusyException("Subscription reached max consumers limit"); + } + consumerList.add(consumer); consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel()); consumerSet.add(consumer); } + private boolean isConsumersExceededOnTopic() { + final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic(); + if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) { + return true; + } + return false; + } + + private boolean isConsumersExceededOnSubscription() { + final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription(); + if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) { + return true; + } + return false; + } + @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { // decrement unack-message count for removed consumer diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 4e3fd00..716e332 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -107,6 +107,22 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp }, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS); } + protected boolean isConsumersExceededOnTopic() { + final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic(); + if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) { + return true; + } + return false; + } + + protected boolean isConsumersExceededOnSubscription() { + final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription(); + if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) { + return true; + } + return false; + } + protected void cancelPendingRead() { if (havePendingRead && cursor.cancelPendingReadRequest()) { havePendingRead = false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a17e2de..5c82ef3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -59,6 +59,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; +import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; @@ -315,6 +316,11 @@ public class PersistentTopic implements Topic, AddEntryCallback { throw new TopicTerminatedException("Topic was already terminated"); } + if (isProducersExceeded()) { + log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); + throw new ProducerBusyException("Topic reached max producers limit"); + } + if (log.isDebugEnabled()) { log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); } @@ -338,6 +344,14 @@ public class PersistentTopic implements Topic, AddEntryCallback { } } + private boolean isProducersExceeded() { + final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic(); + if (maxProducers > 0 && maxProducers <= producers.size()) { + return true; + } + return false; + } + private boolean hasLocalProducers() { AtomicBoolean foundLocal = new AtomicBoolean(false); producers.forEach(producer -> { @@ -942,6 +956,14 @@ public class PersistentTopic implements Topic, AddEntryCallback { return producers; } + public int getNumberOfConsumers() { + int count = 0; + for (PersistentSubscription subscription : subscriptions.values()) { + count += subscription.getConsumers().size(); + } + return count; + } + @Override public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() { return subscriptions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index f76bd90..55019a0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -39,7 +39,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.ImmutableMap; - +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URL; @@ -363,6 +363,36 @@ public class PersistentTopicTest { } @Test + public void testMaxProducers() throws Exception { + // set max clients + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + doReturn(2).when(svcConfig).getMaxProducersPerTopic(); + doReturn(svcConfig).when(pulsar).getConfiguration(); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + + String role = "appid1"; + // 1. add producer1 + Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null); + topic.addProducer(producer); + assertEquals(topic.getProducers().size(), 1); + + // 2. add producer2 + Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null); + topic.addProducer(producer2); + assertEquals(topic.getProducers().size(), 2); + + // 3. add producer3 but reached maxProducersPerTopic + try { + Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null); + topic.addProducer(producer3); + fail("should have failed"); + } catch (BrokerServiceException e) { + assertTrue(e instanceof BrokerServiceException.ProducerBusyException); + } + } + + @Test public void testSubscribeFail() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); @@ -445,6 +475,146 @@ public class PersistentTopicTest { } @Test + public void testMaxConsumersShared() throws Exception { + // set max clients + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + doReturn(2).when(svcConfig).getMaxConsumersPerSubscription(); + doReturn(3).when(svcConfig).getMaxConsumersPerTopic(); + doReturn(svcConfig).when(pulsar).getConfiguration(); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock); + + // for count consumers on topic + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + subscriptions.put("sub-1", sub); + subscriptions.put("sub-2", sub2); + Field field = topic.getClass().getDeclaredField("subscriptions"); + field.setAccessible(true); + field.set(topic, subscriptions); + + // 1. add consumer1 + Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, + "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer); + assertEquals(sub.getConsumers().size(), 1); + + // 2. add consumer2 + Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer2); + assertEquals(sub.getConsumers().size(), 2); + + // 3. add consumer3 but reach maxConsumersPerSubscription + try { + Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, + "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer3); + fail("should have failed"); + } catch (BrokerServiceException e) { + assertTrue(e instanceof BrokerServiceException.ConsumerBusyException); + } + + // check number of consumers on topic + assertEquals(topic.getNumberOfConsumers(), 2); + + // 4. add consumer4 to sub2 + Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, + "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub2.addConsumer(consumer4); + assertEquals(sub2.getConsumers().size(), 1); + + // check number of consumers on topic + assertEquals(topic.getNumberOfConsumers(), 3); + + // 5. add consumer5 to sub2 but reach maxConsumersPerTopic + try { + Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, + "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub2.addConsumer(consumer5); + fail("should have failed"); + } catch (BrokerServiceException e) { + assertTrue(e instanceof BrokerServiceException.ConsumerBusyException); + } + } + + @Test + public void testMaxConsumersFailover() throws Exception { + // set max clients + ServiceConfiguration svcConfig = spy(new ServiceConfiguration()); + doReturn(2).when(svcConfig).getMaxConsumersPerSubscription(); + doReturn(3).when(svcConfig).getMaxConsumersPerTopic(); + doReturn(svcConfig).when(pulsar).getConfiguration(); + + PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); + PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock); + + // for count consumers on topic + ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1); + subscriptions.put("sub-1", sub); + subscriptions.put("sub-2", sub2); + Field field = topic.getClass().getDeclaredField("subscriptions"); + field.setAccessible(true); + field.set(topic, subscriptions); + + // 1. add consumer1 + Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, + "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer); + assertEquals(sub.getConsumers().size(), 1); + + // 2. add consumer2 + Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer2); + assertEquals(sub.getConsumers().size(), 2); + + // 3. add consumer3 but reach maxConsumersPerSubscription + try { + Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, + "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub.addConsumer(consumer3); + fail("should have failed"); + } catch (BrokerServiceException e) { + assertTrue(e instanceof BrokerServiceException.ConsumerBusyException); + } + + // check number of consumers on topic + assertEquals(topic.getNumberOfConsumers(), 2); + + // 4. add consumer4 to sub2 + Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, + "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub2.addConsumer(consumer4); + assertEquals(sub2.getConsumers().size(), 1); + + // check number of consumers on topic + assertEquals(topic.getNumberOfConsumers(), 3); + + // 5. add consumer5 to sub2 but reach maxConsumersPerTopic + try { + Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, + "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), + false /* read compacted */); + sub2.addConsumer(consumer5); + fail("should have failed"); + } catch (BrokerServiceException e) { + assertTrue(e instanceof BrokerServiceException.ConsumerBusyException); + } + } + + @Test public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); -- To stop receiving notification emails like this one, please contact mme...@apache.org.