This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 99501835783540daabe31b4600d801fbf5067fa4 Author: Jiwei Guo <[email protected]> AuthorDate: Wed Sep 28 21:55:26 2022 +0800 Fix NPE when ResourceGroupService execute scheduled task. (#17840) --- .../org/apache/pulsar/broker/PulsarService.java | 8 +++++ .../broker/resourcegroup/ResourceGroupService.java | 37 ++++++++++++++++++---- .../resourcegroup/ResourceGroupServiceTest.java | 8 +++++ 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b28318de89c..abc70480a8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -419,6 +419,14 @@ public class PulsarService implements AutoCloseable, ShutdownService { } this.resourceUsageTransportManager = null; } + if (this.resourceGroupServiceManager != null) { + try { + this.resourceGroupServiceManager.close(); + } catch (Exception e) { + LOG.warn("ResourceGroupServiceManager closing failed {}", e.getMessage()); + } + this.resourceGroupServiceManager = null; + } if (this.webService != null) { try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 4bb1bc8ab24..c74681fdb73 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory; * * @see PulsarService */ -public class ResourceGroupService { +public class ResourceGroupService implements AutoCloseable{ /** * Default constructor. */ @@ -303,6 +303,21 @@ public class ResourceGroupService { return this.namespaceToRGsMap.get(namespaceName); } + @Override + public void close() throws Exception { + if (aggregateLocalUsagePeriodicTask != null) { + aggregateLocalUsagePeriodicTask.cancel(true); + } + if (calculateQuotaPeriodicTask != null) { + calculateQuotaPeriodicTask.cancel(true); + } + resourceGroupsMap.clear(); + tenantToRGsMap.clear(); + namespaceToRGsMap.clear(); + topicProduceStats.clear(); + topicConsumeStats.clear(); + } + /** * Increments usage stats for the resource groups associated with the given namespace and tenant. * Expected to be called when a message is produced or consumed on a topic, or when we calculate @@ -565,17 +580,17 @@ public class ResourceGroupService { ServiceConfiguration config = pulsar.getConfiguration(); long newPeriodInSeconds = config.getResourceUsageTransportPublishIntervalInSecs(); if (newPeriodInSeconds != this.aggregateLocalUsagePeriodInSeconds) { - if (this.aggreagteLocalUsagePeriodicTask == null) { + if (this.aggregateLocalUsagePeriodicTask == null) { log.error("aggregateResourceGroupLocalUsages: Unable to find running task to cancel when " + "publish period changed from {} to {} {}", this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale); } else { - boolean cancelStatus = this.aggreagteLocalUsagePeriodicTask.cancel(true); + boolean cancelStatus = this.aggregateLocalUsagePeriodicTask.cancel(true); log.info("aggregateResourceGroupLocalUsages: Got status={} in cancel of periodic " + "when publish period changed from {} to {} {}", cancelStatus, this.aggregateLocalUsagePeriodInSeconds, newPeriodInSeconds, timeUnitScale); } - this.aggreagteLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate( + this.aggregateLocalUsagePeriodicTask = pulsar.getExecutor().scheduleAtFixedRate( catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages), newPeriodInSeconds, newPeriodInSeconds, @@ -680,7 +695,7 @@ public class ResourceGroupService { ServiceConfiguration config = this.pulsar.getConfiguration(); long periodInSecs = config.getResourceUsageTransportPublishIntervalInSecs(); this.aggregateLocalUsagePeriodInSeconds = this.resourceUsagePublishPeriodInSeconds = periodInSecs; - this.aggreagteLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate( + this.aggregateLocalUsagePeriodicTask = this.pulsar.getExecutor().scheduleAtFixedRate( catchingAndLoggingThrowables(this::aggregateResourceGroupLocalUsages), periodInSecs, periodInSecs, @@ -737,7 +752,7 @@ public class ResourceGroupService { // The task that periodically re-calculates the quota budget for local usage. - private ScheduledFuture<?> aggreagteLocalUsagePeriodicTask; + private ScheduledFuture<?> aggregateLocalUsagePeriodicTask; private long aggregateLocalUsagePeriodInSeconds; // The task that periodically re-calculates the quota budget for local usage. @@ -840,4 +855,14 @@ public class ResourceGroupService { ConcurrentHashMap getTopicProduceStats() { return this.topicProduceStats; } + + @VisibleForTesting + ScheduledFuture<?> getAggregateLocalUsagePeriodicTask() { + return this.aggregateLocalUsagePeriodicTask; + } + + @VisibleForTesting + ScheduledFuture<?> getCalculateQuotaPeriodicTask() { + return this.calculateQuotaPeriodicTask; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index e0e3ec9c16a..86dff398f97 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -257,6 +257,14 @@ public class ResourceGroupServiceTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(rgs.getNumResourceGroups(), 0); } + @Test + public void testClose() throws Exception { + ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null); + service.close(); + Assert.assertTrue(service.getAggregateLocalUsagePeriodicTask().isCancelled()); + Assert.assertTrue(service.getCalculateQuotaPeriodicTask().isCancelled()); + } + private ResourceGroupService rgs; int numAnonymousQuotaCalculations;
