This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a09d01faae9 Remove unnecessary methods and config for ThreadResourceUsageAccountant (#16490) a09d01faae9 is described below commit a09d01faae93e5156884d9429decdd43aa8f5582 Author: Rajat Venkatesh <1638298+vra...@users.noreply.github.com> AuthorDate: Mon Aug 4 15:57:15 2025 +0530 Remove unnecessary methods and config for ThreadResourceUsageAccountant (#16490) --- .../PerQueryCPUMemAccountantFactory.java | 20 ---------- .../accounting/ResourceUsageAccountantFactory.java | 16 -------- .../query/runtime/operator/MultiStageOperator.java | 2 +- .../runtime/operator/MultiStageAccountingTest.java | 2 +- .../MultistageResourceUsageAccountingTest.java | 2 +- .../queries/PerQueryCPUMemAccountantTest.java | 43 ---------------------- .../accounting/ThreadResourceUsageAccountant.java | 5 --- .../java/org/apache/pinot/spi/trace/Tracing.java | 13 ------- .../apache/pinot/spi/utils/CommonConstants.java | 3 -- .../ThrottleOnCriticalHeapUsageExecutorTest.java | 4 -- 10 files changed, 3 insertions(+), 107 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index b88f06f2917..293edff6ff6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -127,9 +127,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory // track memory usage protected final boolean _isThreadMemorySamplingEnabled; - // is sampling allowed for MSE queries - protected final boolean _isThreadSamplingEnabledForMSE; - protected final Set<String> _inactiveQuery; protected Set<String> _cancelSentQueries; @@ -148,7 +145,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory _config = config; _isThreadCPUSamplingEnabled = isThreadCPUSamplingEnabled; _isThreadMemorySamplingEnabled = isThreadMemorySamplingEnabled; - _isThreadSamplingEnabledForMSE = isThreadSamplingEnabledForMSE; _inactiveQuery = inactiveQuery; _instanceId = instanceId; _instanceType = instanceType; @@ -184,11 +180,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled, _isThreadMemorySamplingEnabled); - _isThreadSamplingEnabledForMSE = - config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, - CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE); - LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE); - _queryCancelCallbacks = CacheBuilder.newBuilder().maximumSize( config.getProperty(CommonConstants.Accounting.CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE, CommonConstants.Accounting.DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE)).expireAfterWrite( @@ -274,17 +265,6 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory sampleThreadCPUTime(); } - /** - * Sample Usage for Multi-stage engine queries - */ - @Override - public void sampleUsageMSE() { - if (_isThreadSamplingEnabledForMSE) { - sampleThreadBytesAllocated(); - sampleThreadCPUTime(); - } - } - @Override public boolean throttleQuerySubmission() { return getWatcherTask().getHeapUsageBytes() > getWatcherTask().getQueryMonitorConfig().getAlarmingLevel(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java index e00f74d888d..93c68ca7fe3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java @@ -82,9 +82,6 @@ public class ResourceUsageAccountantFactory implements ThreadAccountantFactory { // track memory usage private final boolean _isThreadMemorySamplingEnabled; - // is sampling allowed for MSE queries - private final boolean _isThreadSamplingEnabledForMSE; - private final WatcherTask _watcherTask; private final EnumMap<TrackingScope, ResourceAggregator> _resourceAggregators; @@ -109,11 +106,6 @@ public class ResourceUsageAccountantFactory implements ThreadAccountantFactory { LOGGER.info("_isThreadCPUSamplingEnabled: {}, _isThreadMemorySamplingEnabled: {}", _isThreadCPUSamplingEnabled, _isThreadMemorySamplingEnabled); - _isThreadSamplingEnabledForMSE = - config.getProperty(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, - CommonConstants.Accounting.DEFAULT_ENABLE_THREAD_SAMPLING_MSE); - LOGGER.info("_isThreadSamplingEnabledForMSE: {}", _isThreadSamplingEnabledForMSE); - _watcherTask = new WatcherTask(); _resourceAggregators = new EnumMap<>(TrackingScope.class); @@ -138,14 +130,6 @@ public class ResourceUsageAccountantFactory implements ThreadAccountantFactory { sampleThreadCPUTime(); } - @Override - public void sampleUsageMSE() { - if (_isThreadSamplingEnabledForMSE) { - sampleThreadBytesAllocated(); - sampleThreadCPUTime(); - } - } - @Override public boolean isAnchorThreadInterrupted() { ThreadExecutionContext context = _threadLocalEntry.get().getCurrentThreadTaskStatus(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 3b179ed4b80..adf02c497f4 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -97,7 +97,7 @@ public abstract class MultiStageOperator earlyTerminate(); throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " + getExplainName()); } - Tracing.ThreadAccountantOps.sampleMSE(); + Tracing.ThreadAccountantOps.sample(); if (Tracing.ThreadAccountantOps.isInterrupted()) { earlyTerminate(); throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Resource limit exceeded for operator: " diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java index de34a977d05..57b27b907fd 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java @@ -92,7 +92,7 @@ public class MultiStageAccountingTest implements ITest { configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, true); // init accountant and start watcher task Tracing.unregisterThreadAccountant(); - Tracing.ThreadAccountantOps.initializeThreadAccountant(new PinotConfiguration(configs), "testGroupBy", + Tracing.ThreadAccountantOps.createThreadAccountant(new PinotConfiguration(configs), "testGroupBy", InstanceType.SERVER); Tracing.ThreadAccountantOps.startThreadAccountant(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java index 534f1f413d5..8312bd098ba 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java @@ -92,7 +92,7 @@ public class MultistageResourceUsageAccountingTest implements ITest { // init accountant and start watcher task PinotConfiguration pinotCfg = new PinotConfiguration(configs); Tracing.unregisterThreadAccountant(); - Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, "testGroupBy", InstanceType.SERVER); + Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg, "testGroupBy", InstanceType.SERVER); Tracing.ThreadAccountantOps.startThreadAccountant(); // Setup Thread Context diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java index 42915480a9a..1aa4b23af29 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java @@ -27,7 +27,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory; -import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory; import org.apache.pinot.query.planner.physical.DispatchableSubPlan; import org.apache.pinot.spi.accounting.QueryResourceTracker; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; @@ -98,48 +97,6 @@ public class PerQueryCPUMemAccountantTest extends QueryRunnerAccountingTest { } } - @Test - void testDisableSamplingForMSE() { - HashMap<String, Object> configs = getAccountingConfig(); - configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false); - - ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); - PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant accountant = - new PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant(new PinotConfiguration(configs), - "testWithPerQueryAccountantFactory", InstanceType.SERVER); - - try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { - tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); - ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); - Assert.assertEquals(resultTable.getRows().size(), 2); - - Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources(); - Assert.assertEquals(resources.size(), 1); - Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(), 0); - } - } - - @Test - void testDisableSamplingWithResourceUsageAccountantForMSE() { - HashMap<String, Object> configs = getAccountingConfig(); - configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE, false); - - ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(true); - ResourceUsageAccountantFactory.ResourceUsageAccountant accountant = - new ResourceUsageAccountantFactory.ResourceUsageAccountant(new PinotConfiguration(configs), - "testWithPerQueryAccountantFactory", InstanceType.SERVER); - - try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, Mockito.CALLS_REAL_METHODS)) { - tracing.when(Tracing::getThreadAccountant).thenReturn(accountant); - ResultTable resultTable = queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable(); - Assert.assertEquals(resultTable.getRows().size(), 2); - - Map<String, ? extends QueryResourceTracker> resources = accountant.getQueryResources(); - Assert.assertEquals(resources.size(), 1); - Assert.assertEquals(resources.entrySet().iterator().next().getValue().getAllocatedBytes(), 0); - } - } - public static class InterruptingAccountant extends PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index db41723d5be..b744778e8a9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -75,11 +75,6 @@ public interface ThreadResourceUsageAccountant { */ void sampleUsage(); - /** - * Sample Usage for Multi-stage engine queries - */ - void sampleUsageMSE(); - default boolean throttleQuerySubmission() { return false; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 6d9993cd4b7..d2466edbe47 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -206,10 +206,6 @@ public class Tracing { public void sampleUsage() { } - @Override - public void sampleUsageMSE() { - } - @Override public void updateQueryUsageConcurrently(String queryId, long cpuTimeNs, long allocatedBytes, TrackingScope trackingScope) { @@ -295,19 +291,10 @@ public class Tracing { Tracing.getThreadAccountant().sampleUsage(); } - public static void sampleMSE() { - Tracing.getThreadAccountant().sampleUsageMSE(); - } - public static void clear() { Tracing.getThreadAccountant().clear(); } - public static void initializeThreadAccountant(PinotConfiguration config, String instanceId, - InstanceType instanceType) { - createThreadAccountant(config, instanceId, instanceType); - } - public static ThreadResourceUsageAccountant createThreadAccountant(PinotConfiguration config, String instanceId, InstanceType instanceType) { _workloadBudgetManager = new WorkloadBudgetManager(config); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index d8152b1812c..e8c46ff8c8e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1538,9 +1538,6 @@ public class CommonConstants { public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled"; public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false; - public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = "accounting.enable.thread.sampling.mse.debug"; - public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true; - public static final String CONFIG_OF_CANCEL_CALLBACK_CACHE_MAX_SIZE = "accounting.cancel.callback.cache.max.size"; public static final int DEFAULT_CANCEL_CALLBACK_CACHE_MAX_SIZE = 500; diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java index 141b55129f0..5005c543677 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java @@ -75,10 +75,6 @@ public class ThrottleOnCriticalHeapUsageExecutorTest { public void sampleUsage() { } - @Override - public void sampleUsageMSE() { - } - @Override public boolean throttleQuerySubmission() { return _numCalls.getAndIncrement() > 1; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org