This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 263a90b [PIP-82] [pulsar-broker] Add publish ratelimiter to resource group (#11184) 263a90b is described below commit 263a90badbad3425599fe57bc2586fe37be6c9e6 Author: Bharani Chadalavada <bharanic....@gmail.com> AuthorDate: Thu Jul 15 10:40:41 2021 -0700 [PIP-82] [pulsar-broker] Add publish ratelimiter to resource group (#11184) * Add ratelimiter to resource group and attach namespace to resource group rate limiter. Co-authored-by: Bharani Chadalavada <bchadalav...@splunk.com> --- .../pulsar/broker/resourcegroup/ResourceGroup.java | 10 ++ .../resourcegroup/ResourceGroupPublishLimiter.java | 151 ++++++++++++++++++++ .../broker/resourcegroup/ResourceGroupService.java | 6 + .../pulsar/broker/service/AbstractTopic.java | 54 ++++++- .../apache/pulsar/broker/service/ServerCnx.java | 8 ++ .../org/apache/pulsar/broker/service/Topic.java | 4 + .../service/nonpersistent/NonPersistentTopic.java | 3 + .../broker/service/persistent/PersistentTopic.java | 3 + .../ResourceGroupConfigListenerTest.java | 10 +- .../ResourceGroupRateLimiterTest.java | 155 +++++++++++++++++++++ 10 files changed, 394 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 5560216..0a48bee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import lombok.Getter; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus; import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; * publish, another one for dispatch, etc. */ public class ResourceGroup { + /** * Convenience class for bytes and messages counts, which are used together in a lot of the following code. */ @@ -78,6 +80,9 @@ public class ResourceGroup { this.setResourceGroupMonitoringClassFields(); this.setResourceGroupConfigParameters(rgConfig); this.setDefaultResourceUsageTransportHandlers(); + this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); + log.info("attaching publish rate limiter {} to {} get {}", this.resourceGroupPublishLimiter.toString(), name, + this.getResourceGroupPublishLimiter()); } // ctor for overriding the transport-manager fill/set buffer. @@ -90,6 +95,7 @@ public class ResourceGroup { this.resourceGroupName = rgName; this.setResourceGroupMonitoringClassFields(); this.setResourceGroupConfigParameters(rgConfig); + this.resourceGroupPublishLimiter = new ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor()); this.ruPublisher = rgPublisher; this.ruConsumer = rgConsumer; } @@ -99,6 +105,7 @@ public class ResourceGroup { public ResourceGroup(ResourceGroup other) { this.resourceGroupName = other.resourceGroupName; this.rgs = other.rgs; + this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter; this.setResourceGroupMonitoringClassFields(); // ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer from other, if required. @@ -534,6 +541,9 @@ public class ResourceGroup { .help("Number of times local usage was reported (vs. suppressed due to negligible change)") .labelNames(resourceGroupMontoringclassLabels) .register(); + // Publish rate limiter for the resource group + @Getter + protected ResourceGroupPublishLimiter resourceGroupPublishLimiter; protected static class PerMonitoringClassFields { // This lock covers all the "local" counts (i.e., except for the per-broker usage stats). diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java new file mode 100644 index 0000000..e0df150 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.service.PublishRateLimiter; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.policies.data.ResourceGroup; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; + +public class ResourceGroupPublishLimiter implements PublishRateLimiter, RateLimitFunction, AutoCloseable { + protected volatile int publishMaxMessageRate = 0; + protected volatile long publishMaxByteRate = 0; + protected volatile boolean publishThrottlingEnabled = false; + private volatile RateLimiter publishRateLimiterOnMessage; + private volatile RateLimiter publishRateLimiterOnByte; + private final ScheduledExecutorService scheduledExecutorService; + + ConcurrentHashMap<String, RateLimitFunction> rateLimitFunctionMap = new ConcurrentHashMap<>(); + + public ResourceGroupPublishLimiter(ResourceGroup resourceGroup, ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = scheduledExecutorService; + update(resourceGroup); + } + + @Override + public void checkPublishRate() { + // No-op + } + + @Override + public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { + // No-op + } + + @Override + public boolean resetPublishCount() { + return true; + } + + @Override + public boolean isPublishRateExceeded() { + return false; + } + + @Override + public void update(Policies policies, String clusterName) { + // No-op + } + + @Override + public void update(PublishRate maxPublishRate) { + // No-op + } + + public void update(ResourceGroup resourceGroup) { + replaceLimiters(() -> { + if (resourceGroup != null + && (resourceGroup.getPublishRateInMsgs() > 0 || resourceGroup.getPublishRateInBytes() > 0)) { + this.publishThrottlingEnabled = true; + this.publishMaxMessageRate = Math.max(resourceGroup.getPublishRateInMsgs(), 0); + this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0); + if (this.publishMaxMessageRate > 0) { + // TODO: pass the executor + publishRateLimiterOnMessage = + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, this::apply); + } + if (this.publishMaxByteRate > 0) { + // TODO: pass the executor + publishRateLimiterOnByte = + new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, this::apply); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; + this.publishThrottlingEnabled = false; + publishRateLimiterOnMessage = null; + publishRateLimiterOnByte = null; + } + }); + } + + public boolean tryAcquire(int numbers, long bytes) { + return (publishRateLimiterOnMessage == null || publishRateLimiterOnMessage.tryAcquire(numbers)) + && (publishRateLimiterOnByte == null || publishRateLimiterOnByte.tryAcquire(bytes)); + } + + public void registerRateLimitFunction(String name, RateLimitFunction func) { + rateLimitFunctionMap.put(name, func); + } + + public void unregisterRateLimitFunction(String name) { + rateLimitFunctionMap.remove(name); + } + + private void replaceLimiters(Runnable updater) { + RateLimiter previousPublishRateLimiterOnMessage = publishRateLimiterOnMessage; + publishRateLimiterOnMessage = null; + RateLimiter previousPublishRateLimiterOnByte = publishRateLimiterOnByte; + publishRateLimiterOnByte = null; + try { + if (updater != null) { + updater.run(); + } + } finally { + // Close previous limiters to prevent resource leakages. + // Delay closing of previous limiters after new ones are in place so that updating the limiter + // doesn't cause unavailability. + if (previousPublishRateLimiterOnMessage != null) { + previousPublishRateLimiterOnMessage.close(); + } + if (previousPublishRateLimiterOnByte != null) { + previousPublishRateLimiterOnByte.close(); + } + } + } + + @Override + public void close() { + this.apply(); + replaceLimiters(null); + } + + @Override + public void apply() { + for (Map.Entry<String, RateLimitFunction> entry: rateLimitFunctionMap.entrySet()) { + entry.getValue().apply(); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 058f16a..aaee8b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; @@ -163,6 +164,8 @@ public class ResourceGroupService { throw new PulsarAdminException(errMesg); } + rg.resourceGroupPublishLimiter.close(); + rg.resourceGroupPublishLimiter = null; resourceGroupsMap.remove(name); } @@ -651,7 +654,10 @@ public class ResourceGroupService { } private static final Logger log = LoggerFactory.getLogger(ResourceGroupService.class); + + @Getter private final PulsarService pulsar; + protected final ResourceQuotaCalculator quotaCalculator; private ResourceUsageTransportManager resourceUsageTransportManagerMgr; private final ResourceGroupConfigListener rgConfigListener; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index f98b57b..65f70ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -35,10 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; +import lombok.Getter; import org.apache.bookkeeper.mledger.util.StatsBuckets; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup; +import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; @@ -103,8 +106,13 @@ public abstract class AbstractTopic implements Topic { protected volatile PublishRateLimiter topicPublishRateLimiter; + protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter; + protected boolean preciseTopicPublishRateLimitingEnable; + @Getter + protected boolean resourceGroupRateLimitingEnabled; + private LongAdder bytesInCounter = new LongAdder(); private LongAdder msgInCounter = new LongAdder(); @@ -746,6 +754,17 @@ public abstract class AbstractTopic implements Topic { } @Override + public boolean isResourceGroupPublishRateExceeded(int numMessages, int bytes) { + return this.resourceGroupRateLimitingEnabled + && !this.resourceGroupPublishLimiter.tryAcquire(numMessages, bytes); + } + + @Override + public boolean isResourceGroupRateLimitingEnabled() { + return this.resourceGroupRateLimitingEnabled; + } + + @Override public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) { // whether topic publish rate exceed if precise rate limit is enable return preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes); @@ -787,14 +806,39 @@ public abstract class AbstractTopic implements Topic { //both namespace-level and topic-level policy are not set, try to use broker-level policy ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration(); - if (publishRate == null) { + if (publishRate != null) { + //publishRate is not null , use namespace-level policy + updatePublishDispatcher(publishRate); + } else { PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages() - , serviceConfiguration.getMaxPublishRatePerTopicInBytes()); + , serviceConfiguration.getMaxPublishRatePerTopicInBytes()); updatePublishDispatcher(brokerPublishRate); - return; } - //publishRate is not null , use namespace-level policy - updatePublishDispatcher(publishRate); + + // attach the resource-group level rate limiters, if set + String rgName = policies != null && policies.resource_group_name != null + ? policies.resource_group_name + : null; + if (rgName != null) { + final ResourceGroup resourceGroup = + brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); + if (resourceGroup != null) { + this.resourceGroupRateLimitingEnabled = true; + this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); + this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), + () -> this.enableCnxAutoRead()); + log.info("Using resource group {} rate limiter for topic {}", rgName, topic); + return; + } + } else { + if (this.resourceGroupRateLimitingEnabled) { + this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); + this.resourceGroupPublishLimiter = null; + this.resourceGroupRateLimitingEnabled = false; + } + /* Namespace detached from resource group. Enable the producer read */ + enableProducerReadForPublishRateLimiting(); + } } public long getMsgInCounter() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index fe5f087..9b51158 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2211,6 +2211,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded(); } else { + if (producer.getTopic().isResourceGroupRateLimitingEnabled()) { + final boolean resourceGroupPublishRateExceeded = + producer.getTopic().isResourceGroupPublishRateExceeded(numMessages, msgSize); + if (resourceGroupPublishRateExceeded) { + producer.getTopic().disableCnxAutoRead(); + return; + } + } isPublishRateExceeded = producer.getTopic().isPublishRateExceeded(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index e0cc873..6c4d694 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -170,6 +170,10 @@ public interface Topic { boolean isTopicPublishRateExceeded(int msgSize, int numMessages); + boolean isResourceGroupRateLimitingEnabled(); + + boolean isResourceGroupPublishRateExceeded(int msgSize, int numMessages); + boolean isBrokerPublishRateExceeded(); void disableCnxAutoRead(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 2751d1e..8f9ba86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -445,6 +445,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); + if (this.resourceGroupPublishLimiter != null) { + this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); + } CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 0f0d967..1a84258 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1150,6 +1150,9 @@ public class PersistentTopic extends AbstractTopic replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); + if (this.resourceGroupPublishLimiter != null) { + this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); + } CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java index 3071edb..1a85aee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java @@ -115,18 +115,18 @@ public class ResourceGroupConfigListenerTest extends MockedPulsarServiceBaseTest @Test public void testResourceGroupAttachToNamespace() throws Exception { createResourceGroup(rgName, testAddRg); - admin.tenants().createTenant(tenantName, new TenantInfoImpl(Sets.newHashSet("fake-admin-role"), Sets.newHashSet(clusterName))); admin.namespaces().createNamespace(namespaceName); - admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName); + admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName); Awaitility.await().untilAsserted(() -> - assertNotNull(pulsar - .getResourceGroupServiceManager() - .getNamespaceResourceGroup(namespaceName))); + assertNotNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName))); admin.namespaces().removeNamespaceResourceGroup(namespaceName); + Awaitility.await().untilAsserted(() -> + assertNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName))); + admin.namespaces().deleteNamespace(namespaceName); deleteResourceGroup(rgName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java new file mode 100644 index 0000000..b51ebf6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resourcegroup; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ResourceGroupRateLimiterTest extends BrokerTestBase { + + final String rgName = "testRG"; + org.apache.pulsar.common.policies.data.ResourceGroup testAddRg = + new org.apache.pulsar.common.policies.data.ResourceGroup(); + final String namespaceName = "prop/ns-abc"; + final String persistentTopicString = "persistent://prop/ns-abc/test-topic"; + final String nonPersistentTopicString = "non-persistent://prop/ns-abc/test-topic"; + final int MESSAGE_SIZE = 10; + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setMaxPendingPublishRequestsPerConnection(0); + super.baseSetup(); + prepareData(); + + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + public void createResourceGroup(String rgName, org.apache.pulsar.common.policies.data.ResourceGroup rg) throws PulsarAdminException { + admin.resourcegroups().createResourceGroup(rgName, rg); + + Awaitility.await().untilAsserted(() -> { + final org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = pulsar + .getResourceGroupServiceManager().resourceGroupGet(rgName); + assertNotNull(resourceGroup); + assertEquals(rgName, resourceGroup.resourceGroupName); + }); + + } + + public void deleteResourceGroup(String rgName) throws PulsarAdminException { + admin.resourcegroups().deleteResourceGroup(rgName); + Awaitility.await().atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> assertNull(pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName))); + } + + public void testRateLimit(String topicString) throws PulsarAdminException, PulsarClientException, + InterruptedException, ExecutionException, TimeoutException { + createResourceGroup(rgName, testAddRg); + admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName); + + Awaitility.await().untilAsserted(() -> + assertNotNull(pulsar.getResourceGroupServiceManager() + .getNamespaceResourceGroup(namespaceName))); + + Awaitility.await().untilAsserted(() -> + assertNotNull(pulsar.getResourceGroupServiceManager() + .resourceGroupGet(rgName).getResourceGroupPublishLimiter())); + + Producer<byte[]> producer = null; + try { + producer = pulsarClient.newProducer() + .topic(persistentTopicString) + .create(); + } catch (PulsarClientException p) { + final String errMesg = String.format("Got exception while building producer: ex=%s", p.getMessage()); + Assert.fail(errMesg); + } + + MessageId messageId = null; + try { + // first will be success + messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, TimeUnit.MILLISECONDS); + Assert.assertNotNull(messageId); + } catch (TimeoutException e) { + Assert.fail("should not fail"); + } + + // Second message should fail with timeout. + Producer<byte[]> finalProducer = producer; + Assert.assertThrows(TimeoutException.class, () -> { + finalProducer.sendAsync(new byte[MESSAGE_SIZE]).get(500, TimeUnit.MILLISECONDS);}); + + // In the next interval, the above message will be accepted. Wait for one more second (total 2s), + // to publish the next message. + Thread.sleep(2000); + + try { + // third one should succeed + messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, TimeUnit.MILLISECONDS); + Assert.assertNotNull(messageId); + } catch (TimeoutException e) { + Assert.fail("should not fail"); + } + + // Now detach the namespace + admin.namespaces().removeNamespaceResourceGroup(namespaceName); + deleteResourceGroup(rgName); + + // No rate limits should be applied. + for (int i = 0; i < 5; i++) { + messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, TimeUnit.MILLISECONDS); + Assert.assertNotNull(messageId); + } + producer.close(); + } + + @Test + public void testResourceGroupPublishRateLimit() throws Exception { + testRateLimit(persistentTopicString); + testRateLimit(nonPersistentTopicString); + } + + private void prepareData() { + testAddRg.setPublishRateInBytes(MESSAGE_SIZE); + testAddRg.setPublishRateInMsgs(1); + testAddRg.setDispatchRateInMsgs(-1); + testAddRg.setDispatchRateInBytes(-1); + } +}