This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 66688d1 support message publish rate on topic level (#7948) 66688d1 is described below commit 66688d15801119b7c62efa504e8267564acbe403 Author: hangc0276 <hangc0...@163.com> AuthorDate: Wed Sep 2 21:11:51 2020 +0800 support message publish rate on topic level (#7948) Modifications Support set publish rate on topic level. Support get publish rate on topic level. Support remove publish rate on topic level. --- .../broker/admin/impl/PersistentTopicsBase.java | 65 +++++-- .../pulsar/broker/admin/v2/PersistentTopics.java | 88 +++++++++ .../pulsar/broker/service/AbstractTopic.java | 67 ++++++- .../broker/service/PrecisPublishLimiter.java | 101 +++++++++++ .../pulsar/broker/service/PublishRateLimiter.java | 198 --------------------- .../broker/service/PublishRateLimiterDisable.java | 65 +++++++ .../broker/service/PublishRateLimiterImpl.java | 104 +++++++++++ .../broker/service/persistent/PersistentTopic.java | 52 ++++-- .../broker/admin/TopicPoliciesDisableTest.java | 21 +++ .../pulsar/broker/admin/TopicPoliciesTest.java | 39 ++++ .../org/apache/pulsar/client/admin/Topics.java | 62 ++++++- .../pulsar/client/admin/internal/TopicsImpl.java | 79 +++++++- .../org/apache/pulsar/admin/cli/CmdTopics.java | 49 +++++ .../pulsar/common/policies/data/TopicPolicies.java | 5 + 14 files changed, 757 insertions(+), 238 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a78ead7..a25bf37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -97,9 +97,8 @@ import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; -import org.apache.pulsar.common.policies.data.PolicyName; -import org.apache.pulsar.common.policies.data.PolicyOperation; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; @@ -3128,18 +3127,60 @@ public class PersistentTopicsBase extends AdminResource { } protected CompletableFuture<Void> internalRemoveCompactionThreshold() { - validateAdminAccessForTenant(namespaceName.getTenant()); - validatePoliciesReadOnlyAccess(); - if (topicName.isGlobal()) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); - } - checkTopicLevelPolicyEnable(); - Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); - if (!topicPolicies.isPresent()) { + } + checkTopicLevelPolicyEnable(); + Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); + if (!topicPolicies.isPresent()) { return CompletableFuture.completedFuture(null); - } - topicPolicies.get().setCompactionThreshold(null); - return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + } + topicPolicies.get().setCompactionThreshold(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); + } + + protected Optional<PublishRate> internalGetPublishRate() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate); + + } + + protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + if (publishRate == null) { + return CompletableFuture.completedFuture(null); + } + TopicPolicies topicPolicies = getTopicPolicies(topicName) + .orElseGet(TopicPolicies::new); + topicPolicies.setPublishRate(publishRate); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); + } + + protected CompletableFuture<Void> internalRemovePublishRate() { + validateAdminAccessForTenant(namespaceName.getTenant()); + validatePoliciesReadOnlyAccess(); + if (topicName.isGlobal()) { + validateGlobalNamespaceOwnership(namespaceName); + } + checkTopicLevelPolicyEnable(); + Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName); + if (!topicPolicies.isPresent()) { + return CompletableFuture.completedFuture(null); + } + topicPolicies.get().setPublishRate(null); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index c98867a..639a79e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -61,6 +61,7 @@ import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.policies.data.TopicStats; @@ -1871,5 +1872,92 @@ public class PersistentTopics extends PersistentTopicsBase { }); } + @GET + @Path("/{tenant}/{namespace}/{topic}/publishRate") + @ApiOperation(value = "Get publish rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getPublishRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + try { + Optional<PublishRate> publishRate = internalGetPublishRate(); + if (!publishRate.isPresent()) { + asyncResponse.resume(Response.noContent().build()); + } else { + asyncResponse.resume(publishRate.get()); + } + } catch (RestException e) { + asyncResponse.resume(e); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + } + } + + @POST + @Path("/{tenant}/{namespace}/{topic}/publishRate") + @ApiOperation(value = "Set message publish rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void setPublishRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) { + validateTopicName(tenant, namespace, encodedTopic); + internalSetPublishRate(publishRate).whenComplete((r, ex) -> { + if (ex instanceof RestException) { + log.error("Failed to set topic dispatch rate", ex); + asyncResponse.resume(ex); + } else if (ex != null) { + log.error("Failed to set topic dispatch rate"); + asyncResponse.resume(new RestException(ex)); + } else { + try { + log.info("[{}] Successfully set topic publish rate: tenant={}, namespace={}, topic={}, publishRate={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName(), + jsonMapper().writeValueAsString(publishRate)); + } catch (JsonProcessingException ignore) {} + asyncResponse.resume(Response.noContent().build()); + } + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/publishRate") + @ApiOperation(value = "Remove message publish rate configuration for specified topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removePublishRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic) { + validateTopicName(tenant, namespace, encodedTopic); + internalRemovePublishRate().whenComplete((r, ex) -> { + if (ex != null) { + log.error("Failed to remove topic publish rate", ex); + asyncResponse.resume(new RestException(ex)); + } else { + log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + } + }); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 0d56ca0..bd4ad24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import com.google.common.base.MoreObjects; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -32,6 +33,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.bookkeeper.mledger.util.StatsBuckets; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.schema.SchemaRegistryService; @@ -44,6 +46,7 @@ import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; @@ -94,6 +97,8 @@ public abstract class AbstractTopic implements Topic { protected boolean preciseTopicPublishRateLimitingEnable; + protected volatile PublishRate topicPolicyPublishRate = null; + private LongAdder bytesInCounter = new LongAdder(); private LongAdder msgInCounter = new LongAdder(); @@ -110,17 +115,28 @@ public abstract class AbstractTopic implements Topic { this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()); this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode()); this.lastActive = System.nanoTime(); - Policies policies = null; - try { - policies = brokerService.pulsar().getConfigurationCache().policiesCache() + ServiceConfiguration brokerConfig = brokerService.pulsar().getConfiguration(); + if (brokerConfig.isSystemTopicEnabled() && brokerConfig.isSystemTopicEnabled()) { + topicPolicyPublishRate = Optional.ofNullable(getTopicPolicies(TopicName.get(topic))) + .map(TopicPolicies::getPublishRate) + .orElse(null); + } + if (topicPolicyPublishRate != null) { + // update topic level publish dispatcher + updateTopicPublishDispatcher(); + } else { + Policies policies = null; + try { + policies = brokerService.pulsar().getConfigurationCache().policiesCache() .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace())) .orElseGet(() -> new Policies()); - } catch (Exception e) { - log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); - } - this.preciseTopicPublishRateLimitingEnable = + } catch (Exception e) { + log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); + } + this.preciseTopicPublishRateLimitingEnable = brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable(); - updatePublishDispatcher(policies); + updatePublishDispatcher(policies); + } } protected boolean isProducersExceeded() { @@ -466,6 +482,12 @@ public abstract class AbstractTopic implements Topic { } private void updatePublishDispatcher(Policies policies) { + // if topic level publish rate policy is set, skip update publish rate on namespace level + if (topicPolicyPublishRate != null) { + log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}", this.topic); + return; + } + final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null ? policies.publishMaxMessageRate.get(clusterName) @@ -532,4 +554,33 @@ public abstract class AbstractTopic implements Topic { inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds); inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive); } + + /** + * Get {@link TopicPolicies} for this topic. + * @param topicName + * @return TopicPolicies is exist else return null. + */ + public TopicPolicies getTopicPolicies(TopicName topicName) { + TopicName cloneTopicName = topicName; + if (topicName.isPartitioned()) { + cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); + } + try { + return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName); + } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { + log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName()); + return null; + } catch (NullPointerException e) { + log.warn("Topic level policies are not enabled. " + + "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf"); + return null; + } + } + + /** + * update topic publish dispatcher for this topic. + */ + protected void updateTopicPublishDispatcher() { + // noop + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java new file mode 100644 index 0000000..a555104 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; + +import java.util.concurrent.TimeUnit; + +public class PrecisPublishLimiter implements PublishRateLimiter { + protected volatile int publishMaxMessageRate = 0; + protected volatile long publishMaxByteRate = 0; + protected volatile boolean publishThrottlingEnabled = false; + // precise mode for publish rate limiter + private RateLimiter topicPublishRateLimiterOnMessage; + private RateLimiter topicPublishRateLimiterOnByte; + private final RateLimitFunction rateLimitFunction; + + public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { + this.rateLimitFunction = rateLimitFunction; + update(policies, clusterName); + } + + public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + this.rateLimitFunction = rateLimitFunction; + update(publishRate); + } + + @Override + public void checkPublishRate() { + // No-op + } + + @Override + public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { + // No-op + } + + @Override + public boolean resetPublishCount() { + return true; + } + + @Override + public boolean isPublishRateExceeded() { + return false; + } + + + @Override + public void update(Policies policies, String clusterName) { + final PublishRate maxPublishRate = policies.publishMaxMessageRate != null + ? policies.publishMaxMessageRate.get(clusterName) + : null; + this.update(maxPublishRate); + } + public void update(PublishRate maxPublishRate) { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishThrottlingEnabled = true; + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + if (this.publishMaxMessageRate > 0) { + topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + } + if (this.publishMaxByteRate > 0) { + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; + this.publishThrottlingEnabled = false; + topicPublishRateLimiterOnMessage = null; + topicPublishRateLimiterOnByte = null; + } + } + + @Override + public boolean tryAcquire(int numbers, long bytes) { + return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) && + (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index 08d7e5b..67e6c77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -18,13 +18,8 @@ */ package org.apache.pulsar.broker.service; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; - import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; -import org.apache.pulsar.common.util.RateLimitFunction; -import org.apache.pulsar.common.util.RateLimiter; public interface PublishRateLimiter { @@ -76,196 +71,3 @@ public interface PublishRateLimiter { * */ boolean tryAcquire(int numbers, long bytes); } - -class PrecisPublishLimiter implements PublishRateLimiter { - protected volatile int publishMaxMessageRate = 0; - protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; - // precise mode for publish rate limiter - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; - private final RateLimitFunction rateLimitFunction; - - public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { - this.rateLimitFunction = rateLimitFunction; - update(policies, clusterName); - } - - @Override - public void checkPublishRate() { - // No-op - } - - @Override - public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op - } - - @Override - public boolean resetPublishCount() { - return true; - } - - @Override - public boolean isPublishRateExceeded() { - return false; - } - - - @Override - public void update(Policies policies, String clusterName) { - final PublishRate maxPublishRate = policies.publishMaxMessageRate != null - ? policies.publishMaxMessageRate.get(clusterName) - : null; - this.update(maxPublishRate); - } - public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); - } - if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); - } - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - topicPublishRateLimiterOnMessage = null; - topicPublishRateLimiterOnByte = null; - } - } - - @Override - public boolean tryAcquire(int numbers, long bytes) { - return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) && - (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); - } -} - -class PublishRateLimiterImpl implements PublishRateLimiter { - protected volatile int publishMaxMessageRate = 0; - protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; - protected volatile boolean publishRateExceeded = false; - protected volatile LongAdder currentPublishMsgCount = new LongAdder(); - protected volatile LongAdder currentPublishByteCount = new LongAdder(); - - public PublishRateLimiterImpl(Policies policies, String clusterName) { - update(policies, clusterName); - } - - public PublishRateLimiterImpl(PublishRate maxPublishRate) { - update(maxPublishRate); - } - - @Override - public void checkPublishRate() { - if (this.publishThrottlingEnabled && !publishRateExceeded) { - long currentPublishMsgRate = this.currentPublishMsgCount.sum(); - long currentPublishByteRate = this.currentPublishByteCount.sum(); - if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > this.publishMaxMessageRate) - || (this.publishMaxByteRate > 0 && currentPublishByteRate > this.publishMaxByteRate)) { - publishRateExceeded = true; - } - } - } - - @Override - public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - if (this.publishThrottlingEnabled) { - this.currentPublishMsgCount.add(numOfMessages); - this.currentPublishByteCount.add(msgSizeInBytes); - } - } - - @Override - public boolean resetPublishCount() { - if (this.publishThrottlingEnabled) { - this.currentPublishMsgCount.reset(); - this.currentPublishByteCount.reset(); - this.publishRateExceeded = false; - return true; - } - return false; - } - - @Override - public boolean isPublishRateExceeded() { - return publishRateExceeded; - } - - @Override - public void update(Policies policies, String clusterName) { - final PublishRate maxPublishRate = policies.publishMaxMessageRate != null - ? policies.publishMaxMessageRate.get(clusterName) - : null; - update(maxPublishRate); - } - - public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - } - resetPublishCount(); - } - - @Override - public boolean tryAcquire(int numbers, long bytes) { - return false; - } -} - -class PublishRateLimiterDisable implements PublishRateLimiter { - - public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new PublishRateLimiterDisable(); - - @Override - public void checkPublishRate() { - // No-op - } - - @Override - public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op - } - - @Override - public boolean resetPublishCount() { - // No-op - return false; - } - - @Override - public boolean isPublishRateExceeded() { - return false; - } - - @Override - public void update(Policies policies, String clusterName) { - // No-op - } - - @Override - public void update(PublishRate maxPublishRate) { - // No-op - } - - @Override - public boolean tryAcquire(int numbers, long bytes) { - // No-op - return false; - } - -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java new file mode 100644 index 0000000..0ff3866 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; + +public class PublishRateLimiterDisable implements PublishRateLimiter { + + public static final PublishRateLimiterDisable DISABLED_RATE_LIMITER = new PublishRateLimiterDisable(); + + @Override + public void checkPublishRate() { + // No-op + } + + @Override + public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { + // No-op + } + + @Override + public boolean resetPublishCount() { + // No-op + return false; + } + + @Override + public boolean isPublishRateExceeded() { + return false; + } + + @Override + public void update(Policies policies, String clusterName) { + // No-op + } + + @Override + public void update(PublishRate maxPublishRate) { + // No-op + } + + @Override + public boolean tryAcquire(int numbers, long bytes) { + // No-op + return false; + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java new file mode 100644 index 0000000..7e481ab --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PublishRate; + +import java.util.concurrent.atomic.LongAdder; + +public class PublishRateLimiterImpl implements PublishRateLimiter { + protected volatile int publishMaxMessageRate = 0; + protected volatile long publishMaxByteRate = 0; + protected volatile boolean publishThrottlingEnabled = false; + protected volatile boolean publishRateExceeded = false; + protected volatile LongAdder currentPublishMsgCount = new LongAdder(); + protected volatile LongAdder currentPublishByteCount = new LongAdder(); + + public PublishRateLimiterImpl(Policies policies, String clusterName) { + update(policies, clusterName); + } + + public PublishRateLimiterImpl(PublishRate maxPublishRate) { + update(maxPublishRate); + } + + @Override + public void checkPublishRate() { + if (this.publishThrottlingEnabled && !publishRateExceeded) { + long currentPublishMsgRate = this.currentPublishMsgCount.sum(); + long currentPublishByteRate = this.currentPublishByteCount.sum(); + if ((this.publishMaxMessageRate > 0 && currentPublishMsgRate > this.publishMaxMessageRate) + || (this.publishMaxByteRate > 0 && currentPublishByteRate > this.publishMaxByteRate)) { + publishRateExceeded = true; + } + } + } + + @Override + public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { + if (this.publishThrottlingEnabled) { + this.currentPublishMsgCount.add(numOfMessages); + this.currentPublishByteCount.add(msgSizeInBytes); + } + } + + @Override + public boolean resetPublishCount() { + if (this.publishThrottlingEnabled) { + this.currentPublishMsgCount.reset(); + this.currentPublishByteCount.reset(); + this.publishRateExceeded = false; + return true; + } + return false; + } + + @Override + public boolean isPublishRateExceeded() { + return publishRateExceeded; + } + + @Override + public void update(Policies policies, String clusterName) { + final PublishRate maxPublishRate = policies.publishMaxMessageRate != null + ? policies.publishMaxMessageRate.get(clusterName) + : null; + update(maxPublishRate); + } + + public void update(PublishRate maxPublishRate) { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishThrottlingEnabled = true; + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; + this.publishThrottlingEnabled = false; + } + resetPublishCount(); + } + + @Override + public boolean tryAcquire(int numbers, long bytes) { + return false; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1bb7866..6f9860b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -82,7 +82,10 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedEx import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.PrecisPublishLimiter; import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.PublishRateLimiter; +import org.apache.pulsar.broker.service.PublishRateLimiterImpl; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.broker.service.StreamingStats; @@ -2135,24 +2138,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal } /** - * Get {@link TopicPolicies} for this topic. - * @param topicName - * @return TopicPolicies is exist else return null. - */ - public TopicPolicies getTopicPolicies(TopicName topicName) { - TopicName cloneTopicName = topicName; - if (topicName.isPartitioned()) { - cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); - } - try { - return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.warn("Topic {} policies cache have not init.", topicName.getPartitionedTopicName()); - return null; - } - } - - /** * Get message TTL for this topic. * @param topicPolicies TopicPolicies * @param policies NameSpace policy @@ -2366,6 +2351,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal dispatchRateLimiter.ifPresent(dispatchRateLimiter -> dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate())); } + + if (policies != null && policies.getPublishRate() != null) { + topicPolicyPublishRate = policies.getPublishRate(); + updateTopicPublishDispatcher(); + } } private void initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) { @@ -2381,6 +2371,32 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return this; } + @Override + protected void updateTopicPublishDispatcher() { + if (topicPolicyPublishRate != null && (topicPolicyPublishRate.publishThrottlingRateInByte > 0 + || topicPolicyPublishRate.publishThrottlingRateInMsg > 0)) { + log.info("Enabling topic policy publish rate limiting {} on topic {}", topicPolicyPublishRate, this.topic); + if (!preciseTopicPublishRateLimitingEnable) { + this.brokerService.setupBrokerPublishRateLimiterMonitor(); + } + + if (this.topicPublishRateLimiter == null + || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { + // create new rateLimiter if rate-limiter is disabled + if (preciseTopicPublishRateLimitingEnable) { + this.topicPublishRateLimiter = new PrecisPublishLimiter(topicPolicyPublishRate, ()-> this.enableCnxAutoRead()); + } else { + this.topicPublishRateLimiter = new PublishRateLimiterImpl(topicPolicyPublishRate); + } + } else { + this.topicPublishRateLimiter.update(topicPolicyPublishRate); + } + } else { + log.info("Disabling publish throttling for {}", this.topic); + this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; + enableProducerReadForPublishRateLimiting(); + } + } @VisibleForTesting public MessageDeduplication getMessageDeduplication() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java index fa073fa..459a8e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java @@ -26,6 +26,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; @@ -170,4 +171,24 @@ public class TopicPoliciesDisableTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(e.getStatusCode(), 405); } } + + @Test + public void testPublishRateDisabled() throws Exception { + PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); + log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); + + try { + admin.topics().setPublishRate(testTopic, publishRate); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + + try { + admin.topics().getPublishRate(testTopic); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 405); + } + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index e216661..4eb656b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.testng.Assert; @@ -415,4 +416,42 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest { admin.topics().deletePartitionedTopic(testTopic, true); } + + @Test + public void testGetSetPublishRate() throws Exception { + PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); + log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); + + admin.topics().setPublishRate(testTopic, publishRate); + log.info("Publish Rate set success on topic: {}", testTopic); + + Thread.sleep(3000); + PublishRate getPublishRate = admin.topics().getPublishRate(testTopic); + log.info("Publish Rate: {} get on topic: {}", getPublishRate, testTopic); + Assert.assertEquals(getPublishRate, publishRate); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testRemovePublishRate() throws Exception { + PublishRate publishRate = new PublishRate(10000, 1024 * 1024 * 5); + log.info("Publish Rate: {} will set to the topic: {}", publishRate, testTopic); + + admin.topics().setPublishRate(testTopic, publishRate); + log.info("Publish Rate set success on topic: {}", testTopic); + + Thread.sleep(3000); + PublishRate getPublishRate = admin.topics().getPublishRate(testTopic); + log.info("Publish Rate: {} get on topic: {}", getPublishRate, testTopic); + Assert.assertEquals(getPublishRate, publishRate); + + admin.topics().removePublishRate(testTopic); + Thread.sleep(3000); + getPublishRate = admin.topics().getPublishRate(testTopic); + log.info("Publish Rate get on topic: {} after remove", getPublishRate, testTopic); + Assert.assertNull(getPublishRate); + + admin.topics().deletePartitionedTopic(testTopic, true); + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 6ca2390..74d134f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -39,9 +39,9 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicStats; - /** * Admin interface for Topics management. */ @@ -2007,4 +2007,64 @@ public interface Topics { */ CompletableFuture<Void> removeCompactionThresholdAsync(String topic); + /** + * Set message-publish-rate (topics can publish this many messages per second). + * + * @param topic + * @param publishMsgRate + * number of messages per second + * @throws PulsarAdminException + * Unexpected error + */ + void setPublishRate(String topic, PublishRate publishMsgRate) throws PulsarAdminException; + + /** + * Set message-publish-rate (topics can publish this many messages per second) asynchronously. + * + * @param topic + * @param publishMsgRate + * number of messages per second + */ + CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishMsgRate); + + /** + * Get message-publish-rate (topics can publish this many messages per second). + * + * @param topic + * @return number of messages per second + * @throws PulsarAdminException Unexpected error + */ + PublishRate getPublishRate(String topic) throws PulsarAdminException; + + /** + * Get message-publish-rate (topics can publish this many messages per second) asynchronously. + * + * @param topic + * @return number of messages per second + */ + CompletableFuture<PublishRate> getPublishRateAsync(String topic); + + /** + * Remove message-publish-rate. + * <p/> + * Remove topic message publish rate + * + * @param topic + * @throws PulsarAdminException + * unexpected error + */ + void removePublishRate(String topic) throws PulsarAdminException; + + /** + * Remove message-publish-rate asynchronously. + * <p/> + * Remove topic message publish rate + * + * @param topic + * @throws PulsarAdminException + * unexpected error + */ + CompletableFuture<Void> removePublishRateAsync(String topic) throws PulsarAdminException; + + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index ce4ce78..2711b2f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -75,6 +75,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.protocol.Commands; @@ -2079,5 +2080,81 @@ public class TopicsImpl extends BaseResource implements Topics { return asyncDeleteRequest(path); } - private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); + @Override + public PublishRate getPublishRate(String topic) throws PulsarAdminException { + try { + return getPublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<PublishRate> getPublishRateAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "publishRate"); + final CompletableFuture<PublishRate> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback<PublishRate>() { + @Override + public void completed(PublishRate publishRate) { + future.complete(publishRate); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public void setPublishRate(String topic, PublishRate publishRate) throws PulsarAdminException { + try { + setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishRate) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "publishRate"); + return asyncPostRequest(path, Entity.entity(publishRate, MediaType.APPLICATION_JSON)); + } + + @Override + public void removePublishRate(String topic) throws PulsarAdminException { + try { + removePublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture<Void> removePublishRateAsync(String topic) { + TopicName topicName = validateTopic(topic); + WebTarget path = topicPath(topicName, "publishRate"); + return asyncDeleteRequest(path); + } + + private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 5b19576..df0d342 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -52,6 +52,7 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.util.RelativeTimeUtil; @@ -126,6 +127,9 @@ public class CmdTopics extends CmdBase { jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold()); jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold()); jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold()); + jcommander.addCommand("get-publish-rate", new GetPublishRate()); + jcommander.addCommand("set-publish-rate", new SetPublishRate()); + jcommander.addCommand("remove-publish-rate", new RemovePublishRate()); } @Parameters(commandDescription = "Get the list of topics under a namespace.") @@ -1207,4 +1211,49 @@ public class CmdTopics extends CmdBase { admin.topics().removeCompactionThreshold(persistentTopic); } } + + @Parameters(commandDescription = "Get publish rate for a topic") + private class GetPublishRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + print(admin.topics().getPublishRate(persistentTopic)); + } + } + + @Parameters(commandDescription = "Set publish rate for a topic") + private class SetPublishRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Parameter(names = { "--msg-publish-rate", + "-m" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false) + private int msgPublishRate = -1; + + @Parameter(names = { "--byte-publish-rate", + "-b" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false) + private long bytePublishRate = -1; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().setPublishRate(persistentTopic, + new PublishRate(msgPublishRate, bytePublishRate)); + } + } + + @Parameters(commandDescription = "Remove publish rate for a topic") + private class RemovePublishRate extends CliCommand { + @Parameter(description = "persistent://tenant/namespace/topic", required = true) + private java.util.List<String> params; + + @Override + void run() throws PulsarAdminException { + String persistentTopic = validatePersistentTopic(params); + admin.topics().removePublishRate(persistentTopic); + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java index d56e283..33fbe80 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicPolicies.java @@ -51,6 +51,7 @@ public class TopicPolicies { private Boolean delayedDeliveryEnabled = null; private DispatchRate dispatchRate = null; private Long compactionThreshold = null; + private PublishRate publishRate = null; public boolean isMaxUnackedMessagesOnConsumerSet() { return maxUnackedMessagesOnConsumer != null; @@ -107,4 +108,8 @@ public class TopicPolicies { public boolean isCompactionThresholdSet() { return compactionThreshold != null; } + + public boolean isPublishRateSet() { + return publishRate != null; + } }