This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66688d1  support message publish rate on topic level (#7948)
66688d1 is described below

commit 66688d15801119b7c62efa504e8267564acbe403
Author: hangc0276 <hangc0...@163.com>
AuthorDate: Wed Sep 2 21:11:51 2020 +0800

    support message publish rate on topic level (#7948)
    
    Modifications
    Support set publish rate on topic level.
    Support get publish rate on topic level.
    Support remove publish rate on topic level.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  65 +++++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  88 +++++++++
 .../pulsar/broker/service/AbstractTopic.java       |  67 ++++++-
 .../broker/service/PrecisPublishLimiter.java       | 101 +++++++++++
 .../pulsar/broker/service/PublishRateLimiter.java  | 198 ---------------------
 .../broker/service/PublishRateLimiterDisable.java  |  65 +++++++
 .../broker/service/PublishRateLimiterImpl.java     | 104 +++++++++++
 .../broker/service/persistent/PersistentTopic.java |  52 ++++--
 .../broker/admin/TopicPoliciesDisableTest.java     |  21 +++
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  39 ++++
 .../org/apache/pulsar/client/admin/Topics.java     |  62 ++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  79 +++++++-
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  49 +++++
 .../pulsar/common/policies/data/TopicPolicies.java |   5 +
 14 files changed, 757 insertions(+), 238 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a78ead7..a25bf37 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -97,9 +97,8 @@ import 
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
-import org.apache.pulsar.common.policies.data.PolicyName;
-import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.protocol.Commands;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
@@ -3128,18 +3127,60 @@ public class PersistentTopicsBase extends AdminResource 
{
     }
 
     protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-      validateAdminAccessForTenant(namespaceName.getTenant());
-      validatePoliciesReadOnlyAccess();
-      if (topicName.isGlobal()) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
           validateGlobalNamespaceOwnership(namespaceName);
-      }
-      checkTopicLevelPolicyEnable();
-      Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
-      if (!topicPolicies.isPresent()) {
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
           return CompletableFuture.completedFuture(null);
-      }
-      topicPolicies.get().setCompactionThreshold(null);
-      return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+        }
+        topicPolicies.get().setCompactionThreshold(null);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
+    }
+
+    protected Optional<PublishRate> internalGetPublishRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);
+
+    }
+
+    protected CompletableFuture<Void> internalSetPublishRate(PublishRate 
publishRate) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        if (publishRate == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicPolicies topicPolicies = getTopicPolicies(topicName)
+            .orElseGet(TopicPolicies::new);
+        topicPolicies.setPublishRate(publishRate);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies);
+    }
+
+    protected CompletableFuture<Void> internalRemovePublishRate() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
+        if (!topicPolicies.isPresent()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        topicPolicies.get().setPublishRate(null);
+        return 
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, 
topicPolicies.get());
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index c98867a..639a79e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -61,6 +61,7 @@ import 
org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -1871,5 +1872,92 @@ public class PersistentTopics extends 
PersistentTopicsBase {
         });
     }
 
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/publishRate")
+    @ApiOperation(value = "Get publish rate configuration for specified 
topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        try {
+            Optional<PublishRate> publishRate = internalGetPublishRate();
+            if (!publishRate.isPresent()) {
+                asyncResponse.resume(Response.noContent().build());
+            } else {
+                asyncResponse.resume(publishRate.get());
+            }
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/publishRate")
+    @ApiOperation(value = "Set message publish rate configuration for 
specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not 
exist"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 405, message = "Topic level policy is 
disabled, please enable the topic level policy and retry"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
+                                @PathParam("tenant") String tenant,
+                                @PathParam("namespace") String namespace,
+                                @PathParam("topic") @Encoded String 
encodedTopic,
+                                @ApiParam(value = "Dispatch rate for the 
specified topic") PublishRate publishRate) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
+            if (ex instanceof RestException) {
+                log.error("Failed to set topic dispatch rate", ex);
+                asyncResponse.resume(ex);
+            } else if (ex != null) {
+                log.error("Failed to set topic dispatch rate");
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                try {
+                    log.info("[{}] Successfully set topic publish rate: 
tenant={}, namespace={}, topic={}, publishRate={}",
+                        clientAppId(),
+                        tenant,
+                        namespace,
+                        topicName.getLocalName(),
+                        jsonMapper().writeValueAsString(publishRate));
+                } catch (JsonProcessingException ignore) {}
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{topic}/publishRate")
+    @ApiOperation(value = "Remove message publish rate configuration for 
specified topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not 
exist"),
+        @ApiResponse(code = 404, message = "Topic does not exist"),
+        @ApiResponse(code = 405, message = "Topic level policy is disabled, 
please enable the topic level policy and retry"),
+        @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
+                                   @PathParam("tenant") String tenant,
+                                   @PathParam("namespace") String namespace,
+                                   @PathParam("topic") @Encoded String 
encodedTopic) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalRemovePublishRate().whenComplete((r, ex) -> {
+            if (ex != null) {
+                log.error("Failed to remove topic publish rate", ex);
+                asyncResponse.resume(new RestException(ex));
+            } else {
+                log.info("[{}] Successfully remove topic publish rate: 
tenant={}, namespace={}, topic={}",
+                    clientAppId(),
+                    tenant,
+                    namespace,
+                    topicName.getLocalName());
+                asyncResponse.resume(Response.noContent().build());
+            }
+        });
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 0d56ca0..bd4ad24 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -24,6 +24,7 @@ import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import com.google.common.base.MoreObjects;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
@@ -44,6 +46,7 @@ import 
org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -94,6 +97,8 @@ public abstract class AbstractTopic implements Topic {
 
     protected boolean preciseTopicPublishRateLimitingEnable;
 
+    protected volatile PublishRate topicPolicyPublishRate = null;
+
     private LongAdder bytesInCounter = new LongAdder();
     private LongAdder msgInCounter = new LongAdder();
 
@@ -110,17 +115,28 @@ public abstract class AbstractTopic implements Topic {
         
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
         
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
         this.lastActive = System.nanoTime();
-        Policies policies = null;
-        try {
-            policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
+        ServiceConfiguration brokerConfig = 
brokerService.pulsar().getConfiguration();
+        if (brokerConfig.isSystemTopicEnabled() && 
brokerConfig.isSystemTopicEnabled()) {
+            topicPolicyPublishRate = 
Optional.ofNullable(getTopicPolicies(TopicName.get(topic)))
+                    .map(TopicPolicies::getPublishRate)
+                    .orElse(null);
+        }
+        if (topicPolicyPublishRate != null) {
+            // update topic level publish dispatcher
+            updateTopicPublishDispatcher();
+        } else {
+            Policies policies = null;
+            try {
+                policies = 
brokerService.pulsar().getConfigurationCache().policiesCache()
                     .get(AdminResource.path(POLICIES, 
TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
-        } catch (Exception e) {
-            log.warn("[{}] Error getting policies {} and publish throttling 
will be disabled", topic, e.getMessage());
-        }
-        this.preciseTopicPublishRateLimitingEnable =
+            } catch (Exception e) {
+                log.warn("[{}] Error getting policies {} and publish 
throttling will be disabled", topic, e.getMessage());
+            }
+            this.preciseTopicPublishRateLimitingEnable =
                 
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
-        updatePublishDispatcher(policies);
+            updatePublishDispatcher(policies);
+        }
     }
 
     protected boolean isProducersExceeded() {
@@ -466,6 +482,12 @@ public abstract class AbstractTopic implements Topic {
     }
 
     private void updatePublishDispatcher(Policies policies) {
+        // if topic level publish rate policy is set, skip update publish rate 
on namespace level
+        if (topicPolicyPublishRate != null) {
+            log.info("Using topic policy publish rate instead of namespace 
level topic publish rate on topic {}", this.topic);
+            return;
+        }
+
         final String clusterName = 
brokerService.pulsar().getConfiguration().getClusterName();
         final PublishRate publishRate = policies != null && 
policies.publishMaxMessageRate != null
                 ? policies.publishMaxMessageRate.get(clusterName)
@@ -532,4 +554,33 @@ public abstract class AbstractTopic implements Topic {
         
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
         inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
     }
+
+    /**
+     * Get {@link TopicPolicies} for this topic.
+     * @param topicName
+     * @return TopicPolicies is exist else return null.
+     */
+    public TopicPolicies getTopicPolicies(TopicName topicName) {
+        TopicName cloneTopicName = topicName;
+        if (topicName.isPartitioned()) {
+            cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
+        }
+        try {
+            return 
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.warn("Topic {} policies cache have not init.", 
topicName.getPartitionedTopicName());
+            return null;
+        } catch (NullPointerException e) {
+            log.warn("Topic level policies are not enabled. " +
+                    "Please refer to systemTopicEnabled and 
topicLevelPoliciesEnabled on broker.conf");
+            return null;
+        }
+    }
+
+    /**
+     * update topic publish dispatcher for this topic.
+     */
+    protected void updateTopicPublishDispatcher() {
+        // noop
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
new file mode 100644
index 0000000..a555104
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+import org.apache.pulsar.common.util.RateLimitFunction;
+import org.apache.pulsar.common.util.RateLimiter;
+
+import java.util.concurrent.TimeUnit;
+
+public class PrecisPublishLimiter implements PublishRateLimiter {
+    protected volatile int publishMaxMessageRate = 0;
+    protected volatile long publishMaxByteRate = 0;
+    protected volatile boolean publishThrottlingEnabled = false;
+    // precise mode for publish rate limiter
+    private RateLimiter topicPublishRateLimiterOnMessage;
+    private RateLimiter topicPublishRateLimiterOnByte;
+    private final RateLimitFunction rateLimitFunction;
+
+    public PrecisPublishLimiter(Policies policies, String clusterName, 
RateLimitFunction rateLimitFunction) {
+        this.rateLimitFunction = rateLimitFunction;
+        update(policies, clusterName);
+    }
+
+    public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction 
rateLimitFunction) {
+        this.rateLimitFunction = rateLimitFunction;
+        update(publishRate);
+    }
+
+    @Override
+    public void checkPublishRate() {
+       // No-op
+    }
+
+    @Override
+    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+       // No-op
+    }
+
+    @Override
+    public boolean resetPublishCount() {
+        return true;
+    }
+
+    @Override
+    public boolean isPublishRateExceeded() {
+        return false;
+    }
+
+
+    @Override
+    public void update(Policies policies, String clusterName) {
+        final PublishRate maxPublishRate = policies.publishMaxMessageRate != 
null
+                ? policies.publishMaxMessageRate.get(clusterName)
+                : null;
+        this.update(maxPublishRate);
+    }
+    public void update(PublishRate maxPublishRate) {
+        if (maxPublishRate != null
+                && (maxPublishRate.publishThrottlingRateInMsg > 0 || 
maxPublishRate.publishThrottlingRateInByte > 0)) {
+            this.publishThrottlingEnabled = true;
+            this.publishMaxMessageRate = 
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+            this.publishMaxByteRate = 
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+            if (this.publishMaxMessageRate > 0) {
+                topicPublishRateLimiterOnMessage = new 
RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
+            }
+            if (this.publishMaxByteRate > 0) {
+                topicPublishRateLimiterOnByte = new 
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
+            }
+        } else {
+            this.publishMaxMessageRate = 0;
+            this.publishMaxByteRate = 0;
+            this.publishThrottlingEnabled = false;
+            topicPublishRateLimiterOnMessage = null;
+            topicPublishRateLimiterOnByte = null;
+        }
+    }
+
+    @Override
+    public boolean tryAcquire(int numbers, long bytes) {
+        return (topicPublishRateLimiterOnMessage == null || 
topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
+                (topicPublishRateLimiterOnByte == null || 
topicPublishRateLimiterOnByte.tryAcquire(bytes));
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
index 08d7e5b..67e6c77 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java
@@ -18,13 +18,8 @@
  */
 package org.apache.pulsar.broker.service;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.LongAdder;
-
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
-import org.apache.pulsar.common.util.RateLimitFunction;
-import org.apache.pulsar.common.util.RateLimiter;
 
 public interface PublishRateLimiter {
 
@@ -76,196 +71,3 @@ public interface PublishRateLimiter {
      * */
     boolean tryAcquire(int numbers, long bytes);
 }
-
-class PrecisPublishLimiter implements PublishRateLimiter {
-    protected volatile int publishMaxMessageRate = 0;
-    protected volatile long publishMaxByteRate = 0;
-    protected volatile boolean publishThrottlingEnabled = false;
-    // precise mode for publish rate limiter
-    private RateLimiter topicPublishRateLimiterOnMessage;
-    private RateLimiter topicPublishRateLimiterOnByte;
-    private final RateLimitFunction rateLimitFunction;
-
-    public PrecisPublishLimiter(Policies policies, String clusterName, 
RateLimitFunction rateLimitFunction) {
-        this.rateLimitFunction = rateLimitFunction;
-        update(policies, clusterName);
-    }
-
-    @Override
-    public void checkPublishRate() {
-       // No-op
-    }
-
-    @Override
-    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
-       // No-op
-    }
-
-    @Override
-    public boolean resetPublishCount() {
-        return true;
-    }
-
-    @Override
-    public boolean isPublishRateExceeded() {
-        return false;
-    }
-
-
-    @Override
-    public void update(Policies policies, String clusterName) {
-        final PublishRate maxPublishRate = policies.publishMaxMessageRate != 
null
-                ? policies.publishMaxMessageRate.get(clusterName)
-                : null;
-        this.update(maxPublishRate);
-    }
-    public void update(PublishRate maxPublishRate) {
-        if (maxPublishRate != null
-                && (maxPublishRate.publishThrottlingRateInMsg > 0 || 
maxPublishRate.publishThrottlingRateInByte > 0)) {
-            this.publishThrottlingEnabled = true;
-            this.publishMaxMessageRate = 
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
-            this.publishMaxByteRate = 
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
-            if (this.publishMaxMessageRate > 0) {
-                topicPublishRateLimiterOnMessage = new 
RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
-            }
-            if (this.publishMaxByteRate > 0) {
-                topicPublishRateLimiterOnByte = new 
RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
-            }
-        } else {
-            this.publishMaxMessageRate = 0;
-            this.publishMaxByteRate = 0;
-            this.publishThrottlingEnabled = false;
-            topicPublishRateLimiterOnMessage = null;
-            topicPublishRateLimiterOnByte = null;
-        }
-    }
-
-    @Override
-    public boolean tryAcquire(int numbers, long bytes) {
-        return (topicPublishRateLimiterOnMessage == null || 
topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
-                (topicPublishRateLimiterOnByte == null || 
topicPublishRateLimiterOnByte.tryAcquire(bytes));
-    }
-}
-
-class PublishRateLimiterImpl implements PublishRateLimiter {
-    protected volatile int publishMaxMessageRate = 0;
-    protected volatile long publishMaxByteRate = 0;
-    protected volatile boolean publishThrottlingEnabled = false;
-    protected volatile boolean publishRateExceeded = false;
-    protected volatile LongAdder currentPublishMsgCount = new LongAdder();
-    protected volatile LongAdder currentPublishByteCount = new LongAdder();
-
-    public PublishRateLimiterImpl(Policies policies, String clusterName) {
-        update(policies, clusterName);
-    }
-
-    public PublishRateLimiterImpl(PublishRate maxPublishRate) {
-        update(maxPublishRate);
-    }
-
-    @Override
-    public void checkPublishRate() {
-        if (this.publishThrottlingEnabled && !publishRateExceeded) {
-            long currentPublishMsgRate = this.currentPublishMsgCount.sum();
-            long currentPublishByteRate = this.currentPublishByteCount.sum();
-            if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > 
this.publishMaxMessageRate)
-                    || (this.publishMaxByteRate > 0 && currentPublishByteRate 
> this.publishMaxByteRate)) {
-                publishRateExceeded = true;
-            }
-        }
-    }
-
-    @Override
-    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
-        if (this.publishThrottlingEnabled) {
-            this.currentPublishMsgCount.add(numOfMessages);
-            this.currentPublishByteCount.add(msgSizeInBytes);
-        }
-    }
-
-    @Override
-    public boolean resetPublishCount() {
-        if (this.publishThrottlingEnabled) {
-            this.currentPublishMsgCount.reset();
-            this.currentPublishByteCount.reset();
-            this.publishRateExceeded = false;
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public boolean isPublishRateExceeded() {
-        return publishRateExceeded;
-    }
-
-    @Override
-    public void update(Policies policies, String clusterName) {
-        final PublishRate maxPublishRate = policies.publishMaxMessageRate != 
null
-                ? policies.publishMaxMessageRate.get(clusterName)
-                : null;
-        update(maxPublishRate);
-    }
-
-    public void update(PublishRate maxPublishRate) {
-        if (maxPublishRate != null
-            && (maxPublishRate.publishThrottlingRateInMsg > 0 || 
maxPublishRate.publishThrottlingRateInByte > 0)) {
-            this.publishThrottlingEnabled = true;
-            this.publishMaxMessageRate = 
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
-            this.publishMaxByteRate = 
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
-        } else {
-            this.publishMaxMessageRate = 0;
-            this.publishMaxByteRate = 0;
-            this.publishThrottlingEnabled = false;
-        }
-        resetPublishCount();
-    }
-
-    @Override
-    public boolean tryAcquire(int numbers, long bytes) {
-        return false;
-    }
-}
-
-class PublishRateLimiterDisable implements PublishRateLimiter {
-
-    public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new 
PublishRateLimiterDisable();
-
-    @Override
-    public void checkPublishRate() {
-        // No-op
-    }
-
-    @Override
-    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
-        // No-op
-    }
-
-    @Override
-    public boolean resetPublishCount() {
-        // No-op
-        return false;
-    }
-
-    @Override
-    public boolean isPublishRateExceeded() {
-        return false;
-    }
-
-    @Override
-    public void update(Policies policies, String clusterName) {
-        // No-op
-    }
-
-    @Override
-    public void update(PublishRate maxPublishRate) {
-        // No-op
-    }
-
-    @Override
-    public boolean tryAcquire(int numbers, long bytes) {
-        // No-op
-        return false;
-    }
-
-}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
new file mode 100644
index 0000000..0ff3866
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+
+public class PublishRateLimiterDisable implements PublishRateLimiter {
+
+    public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new 
PublishRateLimiterDisable();
+
+    @Override
+    public void checkPublishRate() {
+        // No-op
+    }
+
+    @Override
+    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+        // No-op
+    }
+
+    @Override
+    public boolean resetPublishCount() {
+        // No-op
+        return false;
+    }
+
+    @Override
+    public boolean isPublishRateExceeded() {
+        return false;
+    }
+
+    @Override
+    public void update(Policies policies, String clusterName) {
+        // No-op
+    }
+
+    @Override
+    public void update(PublishRate maxPublishRate) {
+        // No-op
+    }
+
+    @Override
+    public boolean tryAcquire(int numbers, long bytes) {
+        // No-op
+        return false;
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
new file mode 100644
index 0000000..7e481ab
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PublishRate;
+
+import java.util.concurrent.atomic.LongAdder;
+
+public class PublishRateLimiterImpl implements PublishRateLimiter {
+    protected volatile int publishMaxMessageRate = 0;
+    protected volatile long publishMaxByteRate = 0;
+    protected volatile boolean publishThrottlingEnabled = false;
+    protected volatile boolean publishRateExceeded = false;
+    protected volatile LongAdder currentPublishMsgCount = new LongAdder();
+    protected volatile LongAdder currentPublishByteCount = new LongAdder();
+
+    public PublishRateLimiterImpl(Policies policies, String clusterName) {
+        update(policies, clusterName);
+    }
+
+    public PublishRateLimiterImpl(PublishRate maxPublishRate) {
+        update(maxPublishRate);
+    }
+
+    @Override
+    public void checkPublishRate() {
+        if (this.publishThrottlingEnabled && !publishRateExceeded) {
+            long currentPublishMsgRate = this.currentPublishMsgCount.sum();
+            long currentPublishByteRate = this.currentPublishByteCount.sum();
+            if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > 
this.publishMaxMessageRate)
+                    || (this.publishMaxByteRate > 0 && currentPublishByteRate 
> this.publishMaxByteRate)) {
+                publishRateExceeded = true;
+            }
+        }
+    }
+
+    @Override
+    public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
+        if (this.publishThrottlingEnabled) {
+            this.currentPublishMsgCount.add(numOfMessages);
+            this.currentPublishByteCount.add(msgSizeInBytes);
+        }
+    }
+
+    @Override
+    public boolean resetPublishCount() {
+        if (this.publishThrottlingEnabled) {
+            this.currentPublishMsgCount.reset();
+            this.currentPublishByteCount.reset();
+            this.publishRateExceeded = false;
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean isPublishRateExceeded() {
+        return publishRateExceeded;
+    }
+
+    @Override
+    public void update(Policies policies, String clusterName) {
+        final PublishRate maxPublishRate = policies.publishMaxMessageRate != 
null
+                ? policies.publishMaxMessageRate.get(clusterName)
+                : null;
+        update(maxPublishRate);
+    }
+
+    public void update(PublishRate maxPublishRate) {
+        if (maxPublishRate != null
+            && (maxPublishRate.publishThrottlingRateInMsg > 0 || 
maxPublishRate.publishThrottlingRateInByte > 0)) {
+            this.publishThrottlingEnabled = true;
+            this.publishMaxMessageRate = 
Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
+            this.publishMaxByteRate = 
Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
+        } else {
+            this.publishMaxMessageRate = 0;
+            this.publishMaxByteRate = 0;
+            this.publishThrottlingEnabled = false;
+        }
+        resetPublishCount();
+    }
+
+    @Override
+    public boolean tryAcquire(int numbers, long bytes) {
+        return false;
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1bb7866..6f9860b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -82,7 +82,10 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedEx
 import 
org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.PrecisPublishLimiter;
 import org.apache.pulsar.broker.service.Producer;
+import org.apache.pulsar.broker.service.PublishRateLimiter;
+import org.apache.pulsar.broker.service.PublishRateLimiterImpl;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.ServerCnx;
 import org.apache.pulsar.broker.service.StreamingStats;
@@ -2135,24 +2138,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
     }
 
     /**
-     * Get {@link TopicPolicies} for this topic.
-     * @param topicName
-     * @return TopicPolicies is exist else return null.
-     */
-    public TopicPolicies getTopicPolicies(TopicName topicName) {
-        TopicName cloneTopicName = topicName;
-        if (topicName.isPartitioned()) {
-            cloneTopicName = 
TopicName.get(topicName.getPartitionedTopicName());
-        }
-        try {
-            return 
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
-        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
-            log.warn("Topic {} policies cache have not init.", 
topicName.getPartitionedTopicName());
-            return null;
-        }
-    }
-
-    /**
      * Get message TTL for this topic.
      * @param topicPolicies TopicPolicies
      * @param policies      NameSpace policy
@@ -2366,6 +2351,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
                 
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
         }
+
+        if (policies != null && policies.getPublishRate() != null) {
+            topicPolicyPublishRate = policies.getPublishRate();
+            updateTopicPublishDispatcher();
+        }
     }
 
     private void 
initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
@@ -2381,6 +2371,32 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
         return this;
     }
 
+    @Override
+    protected void updateTopicPublishDispatcher() {
+        if (topicPolicyPublishRate != null && 
(topicPolicyPublishRate.publishThrottlingRateInByte > 0
+            || topicPolicyPublishRate.publishThrottlingRateInMsg > 0)) {
+            log.info("Enabling topic policy publish rate limiting {} on topic 
{}", topicPolicyPublishRate, this.topic);
+            if (!preciseTopicPublishRateLimitingEnable) {
+                this.brokerService.setupBrokerPublishRateLimiterMonitor();
+            }
+
+            if (this.topicPublishRateLimiter == null
+                || this.topicPublishRateLimiter == 
PublishRateLimiter.DISABLED_RATE_LIMITER) {
+                // create new rateLimiter if rate-limiter is disabled
+                if (preciseTopicPublishRateLimitingEnable) {
+                    this.topicPublishRateLimiter = new 
PrecisPublishLimiter(topicPolicyPublishRate, ()-> this.enableCnxAutoRead());
+                } else {
+                    this.topicPublishRateLimiter = new 
PublishRateLimiterImpl(topicPolicyPublishRate);
+                }
+            } else {
+                this.topicPublishRateLimiter.update(topicPolicyPublishRate);
+            }
+        } else {
+            log.info("Disabling publish throttling for {}", this.topic);
+            this.topicPublishRateLimiter = 
PublishRateLimiter.DISABLED_RATE_LIMITER;
+            enableProducerReadForPublishRateLimiting();
+        }
+    }
 
     @VisibleForTesting
     public MessageDeduplication getMessageDeduplication() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
index fa073fa..459a8e3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
@@ -170,4 +171,24 @@ public class TopicPoliciesDisableTest extends 
MockedPulsarServiceBaseTest {
             Assert.assertEquals(e.getStatusCode(), 405);
         }
     }
+
+    @Test
+    public void testPublishRateDisabled() throws Exception {
+        PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+        log.info("Publish Rate: {} will set to the topic: {}", publishRate, 
testTopic);
+
+        try {
+            admin.topics().setPublishRate(testTopic, publishRate);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+
+        try {
+            admin.topics().getPublishRate(testTopic);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 405);
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index e216661..4eb656b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.testng.Assert;
@@ -415,4 +416,42 @@ public class TopicPoliciesTest extends 
MockedPulsarServiceBaseTest {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testGetSetPublishRate() throws Exception {
+        PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+        log.info("Publish Rate: {} will set to the topic: {}", publishRate, 
testTopic);
+
+        admin.topics().setPublishRate(testTopic, publishRate);
+        log.info("Publish Rate set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
+        log.info("Publish Rate: {} get on topic: {}", getPublishRate, 
testTopic);
+        Assert.assertEquals(getPublishRate, publishRate);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testRemovePublishRate() throws Exception {
+        PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5);
+        log.info("Publish Rate: {} will set to the topic: {}", publishRate, 
testTopic);
+
+        admin.topics().setPublishRate(testTopic, publishRate);
+        log.info("Publish Rate set success on topic: {}", testTopic);
+
+        Thread.sleep(3000);
+        PublishRate getPublishRate = admin.topics().getPublishRate(testTopic);
+        log.info("Publish Rate: {} get on topic: {}", getPublishRate, 
testTopic);
+        Assert.assertEquals(getPublishRate, publishRate);
+
+        admin.topics().removePublishRate(testTopic);
+        Thread.sleep(3000);
+        getPublishRate = admin.topics().getPublishRate(testTopic);
+        log.info("Publish Rate get on topic: {} after remove", getPublishRate, 
testTopic);
+        Assert.assertNull(getPublishRate);
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 6ca2390..74d134f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -39,9 +39,9 @@ import 
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
-
 /**
  * Admin interface for Topics management.
  */
@@ -2007,4 +2007,64 @@ public interface Topics {
      */
     CompletableFuture<Void> removeCompactionThresholdAsync(String topic);
 
+    /**
+     * Set message-publish-rate (topics can publish this many messages per 
second).
+     *
+     * @param topic
+     * @param publishMsgRate
+     *            number of messages per second
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setPublishRate(String topic, PublishRate publishMsgRate) throws 
PulsarAdminException;
+
+    /**
+     * Set message-publish-rate (topics can publish this many messages per 
second) asynchronously.
+     *
+     * @param topic
+     * @param publishMsgRate
+     *            number of messages per second
+     */
+    CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate 
publishMsgRate);
+
+    /**
+     * Get message-publish-rate (topics can publish this many messages per 
second).
+     *
+     * @param topic
+     * @return number of messages per second
+     * @throws PulsarAdminException Unexpected error
+     */
+    PublishRate getPublishRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Get message-publish-rate (topics can publish this many messages per 
second) asynchronously.
+     *
+     * @param topic
+     * @return number of messages per second
+     */
+    CompletableFuture<PublishRate> getPublishRateAsync(String topic);
+
+    /**
+     * Remove message-publish-rate.
+     * <p/>
+     * Remove topic message publish rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    void removePublishRate(String topic) throws PulsarAdminException;
+
+    /**
+     * Remove message-publish-rate asynchronously.
+     * <p/>
+     * Remove topic message publish rate
+     *
+     * @param topic
+     * @throws PulsarAdminException
+     *              unexpected error
+     */
+    CompletableFuture<Void> removePublishRateAsync(String topic) throws 
PulsarAdminException;
+
+
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ce4ce78..2711b2f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -75,6 +75,7 @@ import 
org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
@@ -2079,5 +2080,81 @@ public class TopicsImpl extends BaseResource implements 
Topics {
         return asyncDeleteRequest(path);
     }
 
-  private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
+    @Override
+    public PublishRate getPublishRate(String topic) throws 
PulsarAdminException {
+        try {
+            return getPublishRateAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "publishRate");
+        final CompletableFuture<PublishRate> future = new 
CompletableFuture<>();
+        asyncGetRequest(path,
+            new InvocationCallback<PublishRate>() {
+                @Override
+                public void completed(PublishRate publishRate) {
+                    future.complete(publishRate);
+                }
+
+                @Override
+                public void failed(Throwable throwable) {
+                    
future.completeExceptionally(getApiException(throwable.getCause()));
+                }
+            });
+        return future;
+    }
+
+    @Override
+    public void setPublishRate(String topic, PublishRate publishRate) throws 
PulsarAdminException {
+        try {
+            setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setPublishRateAsync(String topic, 
PublishRate publishRate) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "publishRate");
+        return asyncPostRequest(path, Entity.entity(publishRate, 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removePublishRate(String topic) throws PulsarAdminException {
+        try {
+            removePublishRateAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removePublishRateAsync(String topic) {
+        TopicName topicName = validateTopic(topic);
+        WebTarget path = topicPath(topicName, "publishRate");
+        return asyncDeleteRequest(path);
+    }
+
+    private static final Logger log = 
LoggerFactory.getLogger(TopicsImpl.class);
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 5b19576..df0d342 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -52,6 +52,7 @@ import 
org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
@@ -126,6 +127,9 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-compaction-threshold", new 
GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new 
SetCompactionThreshold());
         jcommander.addCommand("remove-compaction-threshold", new 
RemoveCompactionThreshold());
+        jcommander.addCommand("get-publish-rate", new GetPublishRate());
+        jcommander.addCommand("set-publish-rate", new SetPublishRate());
+        jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
     }
 
     @Parameters(commandDescription = "Get the list of topics under a 
namespace.")
@@ -1207,4 +1211,49 @@ public class CmdTopics extends CmdBase {
             admin.topics().removeCompactionThreshold(persistentTopic);
         }
     }
+
+    @Parameters(commandDescription = "Get publish rate for a topic")
+    private class GetPublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getPublishRate(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set publish rate for a topic")
+    private class SetPublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-publish-rate",
+            "-m" }, description = "message-publish-rate (default -1 will be 
overwrite if not passed)\n", required = false)
+        private int msgPublishRate = -1;
+
+         @Parameter(names = { "--byte-publish-rate",
+            "-b" }, description = "byte-publish-rate (default -1 will be 
overwrite if not passed)\n", required = false)
+        private long bytePublishRate = -1;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setPublishRate(persistentTopic,
+                new PublishRate(msgPublishRate, bytePublishRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove publish rate for a topic")
+    private class RemovePublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removePublishRate(persistentTopic);
+        }
+    }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
index d56e283..33fbe80 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java
@@ -51,6 +51,7 @@ public class TopicPolicies {
     private Boolean delayedDeliveryEnabled = null;
     private DispatchRate dispatchRate = null;
     private Long compactionThreshold = null;
+    private PublishRate publishRate = null;
 
     public boolean isMaxUnackedMessagesOnConsumerSet() {
         return maxUnackedMessagesOnConsumer != null;
@@ -107,4 +108,8 @@ public class TopicPolicies {
     public boolean isCompactionThresholdSet() {
         return compactionThreshold != null;
     }
+
+    public boolean isPublishRateSet() {
+        return publishRate != null;
+    }
 }

Reply via email to