This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 60a5698 [PIP-82] [pulsar-broker] Misc fixes: (#11821) 60a5698 is described below commit 60a5698232932df153c1f39045b3ca6c53298d76 Author: kaushik-develop <80294764+kaushik-deve...@users.noreply.github.com> AuthorDate: Tue Aug 31 15:03:16 2021 -0700 [PIP-82] [pulsar-broker] Misc fixes: (#11821) - fix updateLocalQuota to not attempt Dispatch changes - add checks for totalUsed of 0, and unconfigured RG in ResourceQuotaCalculatorImpl - UT for above Co-authored-by: Kaushik Ghosh <kaush...@splunk.com> --- .../pulsar/broker/resourcegroup/ResourceGroup.java | 47 +++++++++++++++------ .../broker/resourcegroup/ResourceGroupService.java | 48 ++++++++++++++++------ .../resourcegroup/ResourceQuotaCalculatorImpl.java | 18 ++++++-- .../ResourceQuotaCalculatorImplTest.java | 12 +++++- 4 files changed, 96 insertions(+), 29 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 363594f..0435bd9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -179,7 +179,9 @@ public class ResourceGroup { // If this is the first ref, register with the transport manager. if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) { - log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName); + if (log.isDebugEnabled()) { + log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName); + } transportManager.registerResourceUsagePublisher(this.ruPublisher); transportManager.registerResourceUsageConsumer(this.ruConsumer); } @@ -192,7 +194,9 @@ public class ResourceGroup { // If this was the last ref, unregister from the transport manager. if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) { - log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName); + if (log.isDebugEnabled()) { + log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName); + } transportManager.unregisterResourceUsageConsumer(this.ruConsumer); transportManager.unregisterResourceUsagePublisher(this.ruPublisher); } @@ -321,6 +325,14 @@ public class ResourceGroup { protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndMessagesCount newQuota) throws PulsarAdminException { + // Only the Publish side is functional now; add the Dispatch side code when the consume side is ready. + if (!ResourceGroupMonitoringClass.Publish.equals(monClass)) { + if (log.isDebugEnabled()) { + log.debug("Doing nothing for monClass={}; only Publish is functional", monClass); + } + return null; + } + this.checkMonitoringClass(monClass); BytesAndMessagesCount oldBMCount; @@ -333,8 +345,10 @@ public class ResourceGroup { } finally { monEntity.localUsageStatsLock.unlock(); } - log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}", - this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages); + if (log.isDebugEnabled()) { + log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}", + this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages); + } return oldBMCount; } @@ -434,11 +448,16 @@ public class ResourceGroup { double sentCount = sendReport ? 1 : 0; rgLocalUsageReportCount.labels(rgName, monClass.name()).inc(sentCount); if (sendReport) { - log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}", - rgName, monClass, bytesUsed, messagesUsed); + if (log.isDebugEnabled()) { + log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}", + rgName, monClass, bytesUsed, messagesUsed); + } } else { - log.debug("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)", - rgName, monClass, numSuppressions); + if (log.isDebugEnabled()) { + log.debug("fillResourceUsage for RG={}: report for {} suppressed " + + "(suppressions={} since last sent report)", + rgName, monClass, numSuppressions); + } } return sendReport; @@ -479,11 +498,13 @@ public class ResourceGroup { oldMessageCount = oldUsageStats.usedValues.messages; } - log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} " - + "with bytes={} (old ={}), messages={} (old={})", - this.resourceGroupName, monClass, broker, - newByteCount, oldByteCount, - newMessageCount, oldMessageCount); + if (log.isDebugEnabled()) { + log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} " + + "with bytes={} (old ={}), messages={} (old={})", + this.resourceGroupName, monClass, broker, + newByteCount, oldByteCount, + newMessageCount, oldMessageCount); + } } private void setResourceGroupMonitoringClassFields() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index 86a11d0..4a40235 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -445,10 +445,12 @@ public class ResourceGroupService { try { boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff); - log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " - + "by {} bytes, {} mesgs", - topicName, monClass, statsUpdated, tenantString, nsString, - bmDiff.bytes, bmDiff.messages); + if (log.isDebugEnabled()) { + log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " + + "by {} bytes, {} mesgs", + topicName, monClass, statsUpdated, tenantString, nsString, + bmDiff.bytes, bmDiff.messages); + } hm.put(topicName, bmNewCount); } catch (Throwable t) { log.error("updateStatsWithDiff: got ex={} while aggregating for {} side", @@ -553,7 +555,9 @@ public class ResourceGroupService { ResourceGroupMonitoringClass.Dispatch); } double diffTimeSeconds = aggrUsageTimer.observeDuration(); - log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000); + if (log.isDebugEnabled()) { + log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000); + } // Check any re-scheduling requirements for next time. // Use the same period as getResourceUsagePublishIntervalInSecs; @@ -610,12 +614,30 @@ public class ResourceGroupService { globUsageMessagesArray); BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota); - rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages); - rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes); - long messagesIncrement = updatedQuota.messages - oldBMCount.messages; - long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes; - log.debug("calculateQuota for RG {} [class {}]: bytes incremented by {}, messages by {}", - rgName, monClass, messagesIncrement, bytesIncrement); + // Guard against unconfigured quota settings, for which computeLocalQuota will return negative. + if (updatedQuota.messages >= 0) { + rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages); + } + if (updatedQuota.bytes >= 0) { + rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes); + } + if (oldBMCount != null) { + long messagesIncrement = updatedQuota.messages - oldBMCount.messages; + long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes; + if (log.isDebugEnabled()) { + log.debug("calculateQuota for RG={} [class {}]: " + + "updatedlocalBytes={}, updatedlocalMesgs={}; " + + "old bytes={}, old mesgs={}; incremented bytes by {}, messages by {}", + rgName, monClass, updatedQuota.bytes, updatedQuota.messages, + oldBMCount.bytes, oldBMCount.messages, + bytesIncrement, messagesIncrement); + } + } else { + if (log.isDebugEnabled()) { + log.debug("calculateQuota for RG={} [class {}]: got back null from updateLocalQuota", + rgName, monClass); + } + } } catch (Throwable t) { log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}", t.getMessage(), monClass, rgName); @@ -623,7 +645,9 @@ public class ResourceGroupService { } }); double diffTimeSeconds = quotaCalcTimer.observeDuration(); - log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000); + if (log.isDebugEnabled()) { + log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000); + } // Check any re-scheduling requirements for next time. // Use the same period as getResourceUsagePublishIntervalInSecs; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java index 773be27..27a3085 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java @@ -39,9 +39,12 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator { if (confUsage < 0) { // This can happen if the RG is not configured with this particular limit (message or byte count) yet. - // It is safe to return a high value (so we don't limit) for the quota. - log.debug("Configured usage {} is not set; returning a high calculated quota", confUsage); - return Long.MAX_VALUE; + val retVal = -1; + if (log.isDebugEnabled()) { + log.debug("Configured usage ({}) is not set; returning a special value ({}) for calculated quota", + confUsage, retVal); + } + return retVal; } if (myUsage < 0 || totalUsage < 0) { @@ -51,6 +54,15 @@ public class ResourceQuotaCalculatorImpl implements ResourceQuotaCalculator { throw new PulsarAdminException(errMesg); } + // If the total usage is zero (which may happen during initial transients), just return the configured value. + // The caller is expected to check the value returned, or not call here with a zero global usage. + // [This avoids a division by zero when calculating the local share.] + if (totalUsage == 0) { + log.warn("computeLocalQuota: totalUsage is zero; returning the configured usage ({}) as new local quota", + confUsage); + return confUsage; + } + if (myUsage > totalUsage) { String errMesg = String.format("Local usage (%d) is greater than total usage (%d)", myUsage, totalUsage); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java index 7d3653a..1a98838 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java @@ -46,7 +46,8 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest public void testRQCalcNegativeConfTest() throws PulsarAdminException { final long[] allUsage = { 0 }; long calculatedQuota = this.rqCalc.computeLocalQuota(-1, 0, allUsage); - Assert.assertEquals(calculatedQuota, Long.MAX_VALUE); + long expectedQuota = -1; + Assert.assertEquals(calculatedQuota, expectedQuota); } @Test @@ -102,5 +103,14 @@ public class ResourceQuotaCalculatorImplTest extends MockedPulsarServiceBaseTest Assert.assertEquals(initialUsageRatio, proposedUsageRatio); } + @Test + public void testRQCalcGlobUsedZeroTest() throws PulsarAdminException { + final long config = 10; // don't care + final long localUsed = 0; // don't care + final long[] allUsage = { 0 }; + final long newQuota = this.rqCalc.computeLocalQuota(config, localUsed, allUsage); + Assert.assertTrue(newQuota == config); + } + private ResourceQuotaCalculatorImpl rqCalc; } \ No newline at end of file