This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 263a90b  [PIP-82] [pulsar-broker] Add publish ratelimiter to resource 
group (#11184)
263a90b is described below

commit 263a90badbad3425599fe57bc2586fe37be6c9e6
Author: Bharani Chadalavada <bharanic....@gmail.com>
AuthorDate: Thu Jul 15 10:40:41 2021 -0700

    [PIP-82] [pulsar-broker] Add publish ratelimiter to resource group (#11184)
    
    * Add ratelimiter to resource group and attach namespace to resource group 
rate limiter.
    Co-authored-by: Bharani Chadalavada <bchadalav...@splunk.com>
---
 .../pulsar/broker/resourcegroup/ResourceGroup.java |  10 ++
 .../resourcegroup/ResourceGroupPublishLimiter.java | 151 ++++++++++++++++++++
 .../broker/resourcegroup/ResourceGroupService.java |   6 +
 .../pulsar/broker/service/AbstractTopic.java       |  54 ++++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |   8 ++
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +
 .../service/nonpersistent/NonPersistentTopic.java  |   3 +
 .../broker/service/persistent/PersistentTopic.java |   3 +
 .../ResourceGroupConfigListenerTest.java           |  10 +-
 .../ResourceGroupRateLimiterTest.java              | 155 +++++++++++++++++++++
 10 files changed, 394 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 5560216..0a48bee 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import lombok.Getter;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus;
 import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
 import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
  * publish, another one for dispatch, etc.
  */
 public class ResourceGroup {
+
     /**
      * Convenience class for bytes and messages counts, which are used 
together in a lot of the following code.
      */
@@ -78,6 +80,9 @@ public class ResourceGroup {
         this.setResourceGroupMonitoringClassFields();
         this.setResourceGroupConfigParameters(rgConfig);
         this.setDefaultResourceUsageTransportHandlers();
+        this.resourceGroupPublishLimiter = new 
ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
+        log.info("attaching publish rate limiter {} to {} get {}", 
this.resourceGroupPublishLimiter.toString(), name,
+          this.getResourceGroupPublishLimiter());
     }
 
     // ctor for overriding the transport-manager fill/set buffer.
@@ -90,6 +95,7 @@ public class ResourceGroup {
         this.resourceGroupName = rgName;
         this.setResourceGroupMonitoringClassFields();
         this.setResourceGroupConfigParameters(rgConfig);
+        this.resourceGroupPublishLimiter = new 
ResourceGroupPublishLimiter(rgConfig, rgs.getPulsar().getExecutor());
         this.ruPublisher = rgPublisher;
         this.ruConsumer = rgConsumer;
     }
@@ -99,6 +105,7 @@ public class ResourceGroup {
     public ResourceGroup(ResourceGroup other) {
         this.resourceGroupName = other.resourceGroupName;
         this.rgs = other.rgs;
+        this.resourceGroupPublishLimiter = other.resourceGroupPublishLimiter;
         this.setResourceGroupMonitoringClassFields();
 
         // ToDo: copy the monitoring class fields, and ruPublisher/ruConsumer 
from other, if required.
@@ -534,6 +541,9 @@ public class ResourceGroup {
             .help("Number of times local usage was reported (vs. suppressed 
due to negligible change)")
             .labelNames(resourceGroupMontoringclassLabels)
             .register();
+    // Publish rate limiter for the resource group
+    @Getter
+    protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;
 
     protected static class PerMonitoringClassFields {
         // This lock covers all the "local" counts (i.e., except for the 
per-broker usage stats).
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
new file mode 100644
index 0000000..e0df150
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.service.PublishRateLimiter;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.common.util.RateLimitFunction;
+import org.apache.pulsar.common.util.RateLimiter;
+
+public class ResourceGroupPublishLimiter implements PublishRateLimiter, 
RateLimitFunction, AutoCloseable  {
+    protected volatile int publishMaxMessageRate = 0;
+    protected volatile long publishMaxByteRate = 0;
+    protected volatile boolean publishThrottlingEnabled = false;
+    private volatile RateLimiter publishRateLimiterOnMessage;
+    private volatile RateLimiter publishRateLimiterOnByte;
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    ConcurrentHashMap<String, RateLimitFunction> rateLimitFunctionMap = new 
ConcurrentHashMap<>();
+
+    public ResourceGroupPublishLimiter(ResourceGroup resourceGroup, 
ScheduledExecutorService scheduledExecutorService) {
+        this.scheduledExecutorService = scheduledExecutorService;
+        update(resourceGroup);
+    }
+
+    @Override
+    public void checkPublishRate() {
+        // No-op
+    }
+
+    @Override
+    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+        // No-op
+    }
+
+    @Override
+    public boolean resetPublishCount() {
+        return true;
+    }
+
+    @Override
+    public boolean isPublishRateExceeded() {
+        return false;
+    }
+
+    @Override
+    public void update(Policies policies, String clusterName) {
+      // No-op
+    }
+
+    @Override
+    public void update(PublishRate maxPublishRate) {
+      // No-op
+    }
+
+    public void update(ResourceGroup resourceGroup) {
+        replaceLimiters(() -> {
+            if (resourceGroup != null
+                && (resourceGroup.getPublishRateInMsgs() > 0 || 
resourceGroup.getPublishRateInBytes() > 0)) {
+                this.publishThrottlingEnabled = true;
+                this.publishMaxMessageRate = 
Math.max(resourceGroup.getPublishRateInMsgs(), 0);
+                this.publishMaxByteRate = 
Math.max(resourceGroup.getPublishRateInBytes(), 0);
+                if (this.publishMaxMessageRate > 0) {
+                    // TODO: pass the executor
+                    publishRateLimiterOnMessage =
+                        new RateLimiter(publishMaxMessageRate, 1, 
TimeUnit.SECONDS, this::apply);
+                }
+                if (this.publishMaxByteRate > 0) {
+                    // TODO: pass the executor
+                    publishRateLimiterOnByte =
+                        new RateLimiter(publishMaxByteRate, 1, 
TimeUnit.SECONDS, this::apply);
+                }
+            } else {
+                this.publishMaxMessageRate = 0;
+                this.publishMaxByteRate = 0;
+                this.publishThrottlingEnabled = false;
+                publishRateLimiterOnMessage = null;
+                publishRateLimiterOnByte = null;
+            }
+        });
+    }
+
+    public boolean tryAcquire(int numbers, long bytes) {
+        return (publishRateLimiterOnMessage == null || 
publishRateLimiterOnMessage.tryAcquire(numbers))
+            && (publishRateLimiterOnByte == null || 
publishRateLimiterOnByte.tryAcquire(bytes));
+    }
+
+    public void registerRateLimitFunction(String name, RateLimitFunction func) 
{
+        rateLimitFunctionMap.put(name, func);
+    }
+
+    public void unregisterRateLimitFunction(String name) {
+        rateLimitFunctionMap.remove(name);
+    }
+
+    private void replaceLimiters(Runnable updater) {
+        RateLimiter previousPublishRateLimiterOnMessage = 
publishRateLimiterOnMessage;
+        publishRateLimiterOnMessage = null;
+        RateLimiter previousPublishRateLimiterOnByte = 
publishRateLimiterOnByte;
+        publishRateLimiterOnByte = null;
+        try {
+            if (updater != null) {
+                updater.run();
+            }
+        } finally {
+            // Close previous limiters to prevent resource leakages.
+            // Delay closing of previous limiters after new ones are in place 
so that updating the limiter
+            // doesn't cause unavailability.
+            if (previousPublishRateLimiterOnMessage != null) {
+                previousPublishRateLimiterOnMessage.close();
+            }
+            if (previousPublishRateLimiterOnByte != null) {
+                previousPublishRateLimiterOnByte.close();
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+      this.apply();
+      replaceLimiters(null);
+    }
+
+    @Override
+    public void apply() {
+        for (Map.Entry<String, RateLimitFunction> entry: 
rateLimitFunctionMap.entrySet()) {
+            entry.getValue().apply();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index 058f16a..aaee8b8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import 
org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
@@ -163,6 +164,8 @@ public class ResourceGroupService {
             throw new PulsarAdminException(errMesg);
         }
 
+        rg.resourceGroupPublishLimiter.close();
+        rg.resourceGroupPublishLimiter = null;
         resourceGroupsMap.remove(name);
     }
 
@@ -651,7 +654,10 @@ public class ResourceGroupService {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(ResourceGroupService.class);
+
+    @Getter
     private final PulsarService pulsar;
+
     protected final ResourceQuotaCalculator quotaCalculator;
     private ResourceUsageTransportManager resourceUsageTransportManagerMgr;
     private final ResourceGroupConfigListener rgConfigListener;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index f98b57b..65f70ff 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -35,10 +35,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.Getter;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
@@ -103,8 +106,13 @@ public abstract class AbstractTopic implements Topic {
 
     protected volatile PublishRateLimiter topicPublishRateLimiter;
 
+    protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
+
     protected boolean preciseTopicPublishRateLimitingEnable;
 
+    @Getter
+    protected boolean resourceGroupRateLimitingEnabled;
+
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
 
@@ -746,6 +754,17 @@ public abstract class AbstractTopic implements Topic {
     }
 
     @Override
+    public boolean isResourceGroupPublishRateExceeded(int numMessages, int 
bytes) {
+        return this.resourceGroupRateLimitingEnabled
+            && !this.resourceGroupPublishLimiter.tryAcquire(numMessages, 
bytes);
+    }
+
+    @Override
+    public boolean isResourceGroupRateLimitingEnabled() {
+        return this.resourceGroupRateLimitingEnabled;
+    }
+
+    @Override
     public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
         // whether topic publish rate exceed if precise rate limit is enable
         return preciseTopicPublishRateLimitingEnable && 
!this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
@@ -787,14 +806,39 @@ public abstract class AbstractTopic implements Topic {
 
         //both namespace-level and topic-level policy are not set, try to use 
broker-level policy
         ServiceConfiguration serviceConfiguration = 
brokerService.pulsar().getConfiguration();
-        if (publishRate == null) {
+        if (publishRate != null) {
+            //publishRate is not null , use namespace-level policy
+            updatePublishDispatcher(publishRate);
+        } else {
             PublishRate brokerPublishRate = new 
PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
-                    , serviceConfiguration.getMaxPublishRatePerTopicInBytes());
+              , serviceConfiguration.getMaxPublishRatePerTopicInBytes());
             updatePublishDispatcher(brokerPublishRate);
-            return;
         }
-        //publishRate is not null , use namespace-level policy
-        updatePublishDispatcher(publishRate);
+
+        // attach the resource-group level rate limiters, if set
+        String rgName = policies != null && policies.resource_group_name != 
null
+          ? policies.resource_group_name
+          : null;
+        if (rgName != null) {
+            final ResourceGroup resourceGroup =
+              
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
+            if (resourceGroup != null) {
+                this.resourceGroupRateLimitingEnabled = true;
+                this.resourceGroupPublishLimiter = 
resourceGroup.getResourceGroupPublishLimiter();
+                
this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(),
+                  () -> this.enableCnxAutoRead());
+                log.info("Using resource group {} rate limiter for topic {}", 
rgName, topic);
+                return;
+            }
+        } else {
+            if (this.resourceGroupRateLimitingEnabled) {
+                
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+                this.resourceGroupPublishLimiter = null;
+                this.resourceGroupRateLimitingEnabled = false;
+            }
+            /* Namespace detached from resource group. Enable the producer 
read */
+            enableProducerReadForPublishRateLimiting();
+        }
     }
 
     public long getMsgInCounter() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index fe5f087..9b51158 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2211,6 +2211,14 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             }
             isPublishRateExceeded = 
producer.getTopic().isBrokerPublishRateExceeded();
         } else {
+            if (producer.getTopic().isResourceGroupRateLimitingEnabled()) {
+                final boolean resourceGroupPublishRateExceeded =
+                  
producer.getTopic().isResourceGroupPublishRateExceeded(numMessages, msgSize);
+                if (resourceGroupPublishRateExceeded) {
+                    producer.getTopic().disableCnxAutoRead();
+                    return;
+                }
+            }
             isPublishRateExceeded = 
producer.getTopic().isPublishRateExceeded();
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index e0cc873..6c4d694 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -170,6 +170,10 @@ public interface Topic {
 
     boolean isTopicPublishRateExceeded(int msgSize, int numMessages);
 
+    boolean isResourceGroupRateLimitingEnabled();
+
+    boolean isResourceGroupPublishRateExceeded(int msgSize, int numMessages);
+
     boolean isBrokerPublishRateExceeded();
 
     void disableCnxAutoRead();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 2751d1e..8f9ba86 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -445,6 +445,9 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic {
         replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+        if (this.resourceGroupPublishLimiter != null) {
+            
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+        }
 
         CompletableFuture<Void> clientCloseFuture =
                 closeWithoutWaitingClientDisconnect ? 
CompletableFuture.completedFuture(null)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0f0d967..1a84258 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1150,6 +1150,9 @@ public class PersistentTopic extends AbstractTopic
         replicators.forEach((cluster, replicator) -> 
futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> 
futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
+        if (this.resourceGroupPublishLimiter != null) {
+            
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
+        }
 
         CompletableFuture<Void> clientCloseFuture = 
closeWithoutWaitingClientDisconnect
                 ? CompletableFuture.completedFuture(null)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
index 3071edb..1a85aee 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupConfigListenerTest.java
@@ -115,18 +115,18 @@ public class ResourceGroupConfigListenerTest extends 
MockedPulsarServiceBaseTest
     @Test
     public void testResourceGroupAttachToNamespace() throws Exception {
         createResourceGroup(rgName, testAddRg);
-
         admin.tenants().createTenant(tenantName,
                 new TenantInfoImpl(Sets.newHashSet("fake-admin-role"), 
Sets.newHashSet(clusterName)));
         admin.namespaces().createNamespace(namespaceName);
-        admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
 
+        admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
         Awaitility.await().untilAsserted(() ->
-                        assertNotNull(pulsar
-                                .getResourceGroupServiceManager()
-                                .getNamespaceResourceGroup(namespaceName)));
+            
assertNotNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName)));
 
         admin.namespaces().removeNamespaceResourceGroup(namespaceName);
+        Awaitility.await().untilAsserted(() ->
+            
assertNull(pulsar.getResourceGroupServiceManager().getNamespaceResourceGroup(namespaceName)));
+
         admin.namespaces().deleteNamespace(namespaceName);
         deleteResourceGroup(rgName);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
new file mode 100644
index 0000000..b51ebf6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupRateLimiterTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class ResourceGroupRateLimiterTest extends BrokerTestBase {
+
+    final String rgName = "testRG";
+    org.apache.pulsar.common.policies.data.ResourceGroup testAddRg =
+    new org.apache.pulsar.common.policies.data.ResourceGroup();
+    final String namespaceName = "prop/ns-abc";
+    final String persistentTopicString = "persistent://prop/ns-abc/test-topic";
+    final String nonPersistentTopicString = 
"non-persistent://prop/ns-abc/test-topic";
+    final int MESSAGE_SIZE = 10;
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        prepareData();
+
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    public void createResourceGroup(String rgName, 
org.apache.pulsar.common.policies.data.ResourceGroup rg) throws 
PulsarAdminException {
+        admin.resourcegroups().createResourceGroup(rgName, rg);
+
+        Awaitility.await().untilAsserted(() -> {
+            final org.apache.pulsar.broker.resourcegroup.ResourceGroup 
resourceGroup = pulsar
+                .getResourceGroupServiceManager().resourceGroupGet(rgName);
+            assertNotNull(resourceGroup);
+            assertEquals(rgName, resourceGroup.resourceGroupName);
+        });
+
+    }
+
+    public void deleteResourceGroup(String rgName) throws PulsarAdminException 
{
+        admin.resourcegroups().deleteResourceGroup(rgName);
+        Awaitility.await().atMost(1, TimeUnit.SECONDS)
+            .untilAsserted(() -> 
assertNull(pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName)));
+    }
+
+    public void testRateLimit(String topicString) throws PulsarAdminException, 
PulsarClientException,
+      InterruptedException, ExecutionException, TimeoutException {
+        createResourceGroup(rgName, testAddRg);
+        admin.namespaces().setNamespaceResourceGroup(namespaceName, rgName);
+
+        Awaitility.await().untilAsserted(() ->
+          assertNotNull(pulsar.getResourceGroupServiceManager()
+            .getNamespaceResourceGroup(namespaceName)));
+
+        Awaitility.await().untilAsserted(() ->
+          assertNotNull(pulsar.getResourceGroupServiceManager()
+            .resourceGroupGet(rgName).getResourceGroupPublishLimiter()));
+
+        Producer<byte[]> producer = null;
+        try {
+            producer = pulsarClient.newProducer()
+              .topic(persistentTopicString)
+              .create();
+        } catch (PulsarClientException p) {
+            final String errMesg = String.format("Got exception while building 
producer: ex=%s", p.getMessage());
+            Assert.fail(errMesg);
+        }
+
+        MessageId messageId = null;
+        try {
+            // first will be success
+            messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, 
TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(messageId);
+        } catch (TimeoutException e) {
+            Assert.fail("should not fail");
+        }
+
+        // Second message should fail with timeout.
+        Producer<byte[]> finalProducer = producer;
+        Assert.assertThrows(TimeoutException.class, () -> {
+            finalProducer.sendAsync(new byte[MESSAGE_SIZE]).get(500, 
TimeUnit.MILLISECONDS);});
+
+        // In the next interval, the above message will be accepted. Wait for 
one more second (total 2s),
+        // to publish the next message.
+        Thread.sleep(2000);
+
+        try {
+            // third one should succeed
+            messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, 
TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(messageId);
+        } catch (TimeoutException e) {
+            Assert.fail("should not fail");
+        }
+
+        // Now detach the namespace
+        admin.namespaces().removeNamespaceResourceGroup(namespaceName);
+        deleteResourceGroup(rgName);
+
+        // No rate limits should be applied.
+        for (int i = 0; i < 5; i++) {
+            messageId = producer.sendAsync(new byte[MESSAGE_SIZE]).get(100, 
TimeUnit.MILLISECONDS);
+            Assert.assertNotNull(messageId);
+        }
+        producer.close();
+    }
+
+    @Test
+    public void testResourceGroupPublishRateLimit() throws Exception {
+        testRateLimit(persistentTopicString);
+        testRateLimit(nonPersistentTopicString);
+    }
+
+    private void prepareData() {
+        testAddRg.setPublishRateInBytes(MESSAGE_SIZE);
+        testAddRg.setPublishRateInMsgs(1);
+        testAddRg.setDispatchRateInMsgs(-1);
+        testAddRg.setDispatchRateInBytes(-1);
+    }
+}

Reply via email to