michaeljmarshall commented on code in PR #15457:
URL: https://github.com/apache/pulsar/pull/15457#discussion_r866495906
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -446,13 +442,21 @@ public void publishMessage(ByteBuf headersAndPayload,
PublishContext publishCont
}
public void updateSubscribeRateLimiter() {
Review Comment:
Does this method need to be synchronized?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -446,13 +442,21 @@ public void publishMessage(ByteBuf headersAndPayload,
PublishContext publishCont
}
public void updateSubscribeRateLimiter() {
- SubscribeRate subscribeRate = this.getSubscribeRate();
+ SubscribeRate subscribeRate = getSubscribeRate();
if (isSubscribeRateEnabled(subscribeRate)) {
- subscribeRateLimiter = Optional.of(subscribeRateLimiter.orElse(new
SubscribeRateLimiter(this)));
+ if (subscribeRateLimiter.isPresent()) {
+ if
(!subscribeRateLimiter.get().getSubscribeRate().equals(subscribeRate)) {
Review Comment:
It might make sense to move this optimization into the
`onSubscribeRateUpdate` method itself. The method could return early if the
rate hasn't changed.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3090,6 +3094,7 @@ private void
initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies>
this.subscribeRateLimiter = Optional.of(new
SubscribeRateLimiter(this));
Review Comment:
Similarly, is there a reason that we're not using
`subscribeRateLimiter.onSubscribeRateUpdate(subscribeRate)` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]