This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ded806f [Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384) ded806f is described below commit ded806fd52f6e2f182fa02052cbd82c2a6755098 Author: Lari Hotari <lhot...@users.noreply.github.com> AuthorDate: Tue Aug 3 19:14:26 2021 +0300 [Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384) ### Motivation When using `preciseTopicPublishRateLimiterEnable=true` (introduced by #7078) setting for rate limiting, there are various issues: - updating the limits doesn't set either boundary when changing the limits from a bounded limit to unbounded. - each topic will create a scheduler thread for each limiter instance - each topic will never release the scheduler thread when the topic gets unloaded / closed - updating the limits didn't close the scheduler thread related to the replaced limiter instance ### Modifications - Fix updating of the limits by cleaning up the previous limiter instances before creating new limiter instances - Use `brokerService.pulsar().getExecutor()` as the scheduler for the rate limiter instances - Add resource cleanup hooks for topic closing (unload) ### Open issue The existing code has a difference in passing the `rateLimitFunction`: https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java#L80-L86 It's passed to the `topicPublishRateLimiterOnMessage`, but not to `topicPublishRateLimiterOnByte` . It is unclear whether this is intentional. The `rateLimitFunction` is `() -> this.enableCnxAutoRead()` https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L913 (This also raises a question whether rate limiting works consistently when multiple topics share the same connection.) --- .../pulsar/broker/service/AbstractTopic.java | 2 +- .../broker/service/PrecisPublishLimiter.java | 113 +++++++++++++++------ .../pulsar/broker/service/PublishRateLimiter.java | 2 +- .../broker/service/PublishRateLimiterDisable.java | 4 + .../broker/service/PublishRateLimiterImpl.java | 5 + .../service/nonpersistent/NonPersistentTopic.java | 7 ++ .../broker/service/persistent/PersistentTopic.java | 7 ++ .../service/persistent/SubscribeRateLimiter.java | 2 +- .../broker/service/PrecisPublishLimiterTest.java | 57 +++++++++++ .../org/apache/pulsar/common/util/RateLimiter.java | 29 +++++- .../instance/stats/FunctionStatsManager.java | 14 +-- .../functions/instance/stats/SinkStatsManager.java | 4 +- .../instance/stats/SourceStatsManager.java | 4 +- 13 files changed, 201 insertions(+), 49 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index bb66308..d388d51 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -929,7 +929,7 @@ public abstract class AbstractTopic implements Topic { // create new rateLimiter if rate-limiter is disabled if (preciseTopicPublishRateLimitingEnable) { this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, - () -> this.enableCnxAutoRead()); + () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); } else { this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 60fbcf0..e61597e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.service; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimitFunction; @@ -27,30 +27,37 @@ import org.apache.pulsar.common.util.RateLimiter; public class PrecisPublishLimiter implements PublishRateLimiter { protected volatile int publishMaxMessageRate = 0; protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; // precise mode for publish rate limiter - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; + private volatile RateLimiter topicPublishRateLimiterOnMessage; + private volatile RateLimiter topicPublishRateLimiterOnByte; private final RateLimitFunction rateLimitFunction; + private final ScheduledExecutorService scheduledExecutorService; public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { this.rateLimitFunction = rateLimitFunction; update(policies, clusterName); + this.scheduledExecutorService = null; } public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + this(publishRate, rateLimitFunction, null); + } + + public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, + ScheduledExecutorService scheduledExecutorService) { this.rateLimitFunction = rateLimitFunction; update(publishRate); + this.scheduledExecutorService = scheduledExecutorService; } @Override public void checkPublishRate() { - // No-op + // No-op } @Override public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op + // No-op } @Override @@ -62,10 +69,15 @@ public class PrecisPublishLimiter implements PublishRateLimiter { public boolean isPublishRateExceeded() { return false; } + // If all rate limiters are not exceeded, re-enable auto read from socket. private void tryReleaseConnectionThrottle() { - if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) - || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + if ((currentTopicPublishRateLimiterOnMessage != null + && currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) + || (currentTopicPublishRateLimiterOnByte != null + && currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { return; } this.rateLimitFunction.apply(); @@ -78,34 +90,73 @@ public class PrecisPublishLimiter implements PublishRateLimiter { : null; this.update(maxPublishRate); } + public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); + replaceLimiters(() -> { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 + || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + if (this.publishMaxMessageRate > 0) { + topicPublishRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxMessageRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + if (this.publishMaxByteRate > 0) { + topicPublishRateLimiterOnByte = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxByteRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; } - if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = - new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); - } - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - topicPublishRateLimiterOnMessage = null; - topicPublishRateLimiterOnByte = null; - } + }); } @Override public boolean tryAcquire(int numbers, long bytes) { - return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) - && (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + return (currentTopicPublishRateLimiterOnMessage == null + || currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers)) + && (currentTopicPublishRateLimiterOnByte == null + || currentTopicPublishRateLimiterOnByte.tryAcquire(bytes)); + } + + @Override + public void close() throws Exception { + rateLimitFunction.apply(); + replaceLimiters(null); + } + + private void replaceLimiters(Runnable updater) { + RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + topicPublishRateLimiterOnMessage = null; + RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + topicPublishRateLimiterOnByte = null; + try { + if (updater != null) { + updater.run(); + } + } finally { + // Close previous limiters to prevent resource leakages. + // Delay closing of previous limiters after new ones are in place so that updating the limiter + // doesn't cause unavailability. + if (previousTopicPublishRateLimiterOnMessage != null) { + previousTopicPublishRateLimiterOnMessage.close(); + } + if (previousTopicPublishRateLimiterOnByte != null) { + previousTopicPublishRateLimiterOnByte.close(); + } + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index ec23b26..3978879 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -21,7 +21,7 @@ package org.apache.pulsar.broker.service; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; -public interface PublishRateLimiter { +public interface PublishRateLimiter extends AutoCloseable { PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 6dc1954..81c4b82 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -62,4 +62,8 @@ public class PublishRateLimiterDisable implements PublishRateLimiter { return true; } + @Override + public void close() throws Exception { + // No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index c1458cf..0e1200e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -108,4 +108,9 @@ public class PublishRateLimiterImpl implements PublishRateLimiter { public boolean tryAcquire(int numbers, long bytes) { return false; } + + @Override + public void close() throws Exception { + // no-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index b364daf..7c13cda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -445,6 +445,13 @@ public class NonPersistentTopic extends AbstractTopic implements Topic { replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) { this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 54aafd6..74d57d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1149,6 +1149,13 @@ public class PersistentTopic extends AbstractTopic futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) { this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index f014717..a13328c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -140,7 +140,7 @@ public class SubscribeRateLimiter { if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS, null)); + ratePeriod, TimeUnit.SECONDS)); } else { this.subscribeRateLimiter.get(consumerIdentifier) .setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java new file mode 100644 index 0000000..61804e7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.testng.annotations.Test; + +public class PrecisPublishLimiterTest { + + @Test + void shouldResetMsgLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(-1, 100)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldResetBytesLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(100, -1)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldCloseResources() throws Exception { + for (int i = 0; i < 20000; i++) { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> { + }); + precisPublishLimiter.tryAcquire(99, 99); + precisPublishLimiter.close(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index cb88a95..1bb2fcd 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Builder; /** * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a @@ -48,7 +49,6 @@ import java.util.function.Supplier; * </ul> */ public class RateLimiter implements AutoCloseable{ - private final ScheduledExecutorService executorService; private long rateTime; private TimeUnit timeUnit; @@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{ private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit); } public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { @@ -83,12 +83,25 @@ public class RateLimiter implements AutoCloseable{ } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit) { + this(service, permits, rateTime, timeUnit, (Supplier<Long>) null); + } + + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier<Long> permitUpdater) { this(service, permits, rateTime, timeUnit, permitUpdater, false); } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { + this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, + null); + } + + @Builder + RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter, + RateLimitFunction rateLimitFunction) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -98,8 +111,8 @@ public class RateLimiter implements AutoCloseable{ this.permitUpdater = permitUpdater; this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; - if (service != null) { - this.executorService = service; + if (scheduledExecutorService != null) { + this.executorService = scheduledExecutorService; this.externalExecutor = true; } else { final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); @@ -109,6 +122,14 @@ public class RateLimiter implements AutoCloseable{ this.externalExecutor = false; } + this.rateLimitFunction = rateLimitFunction; + + } + + // default values for Lombok generated builder class + public static class RateLimiterBuilder { + private long rateTime = 1; + private TimeUnit timeUnit = TimeUnit.SECONDS; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 08ea9ea..b008371 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions; final Counter statTotalUserExceptions; - + final Summary statProcessLatency; final Gauge statlastInvocation; final Counter statTotalRecordsReceived; - + // windowed metrics final Counter statTotalProcessedSuccessfully1min; @@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions1min; final Counter statTotalUserExceptions1min; - + final Summary statProcessLatency1min; final Counter statTotalRecordsReceived1min; @@ -262,8 +262,8 @@ public class FunctionStatsManager extends ComponentStatsManager{ .help("Exception from sink.") .create()); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } public void addUserException(Throwable ex) { @@ -371,7 +371,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ public double getTotalUserExceptions() { return _statTotalUserExceptions.get(); } - + @Override public double getLastInvocation() { return _statlastInvocation.get(); @@ -417,7 +417,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ public double getTotalUserExceptions1min() { return _statTotalUserExceptions1min.get(); } - + @Override public double getAvgProcessLatency1min() { return _statProcessLatency1min.get().count <= 0.0 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 255ad62..536f55a 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -196,8 +196,8 @@ public class SinkStatsManager extends ComponentStatsManager { .help("Exception from sink.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index f4e1da0..451a8ad 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -195,8 +195,8 @@ public class SourceStatsManager extends ComponentStatsManager { .help("Exception from source.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override