This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 90346d5f37 Allow Reset of ThreadResourceUsageAccountant in
Tracing.java (#16360)
90346d5f37 is described below
commit 90346d5f379d24687b0117ee7ff8aafa113ac552
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Wed Jul 23 18:36:28 2025 +0530
Allow Reset of ThreadResourceUsageAccountant in Tracing.java (#16360)
---
.../broker/broker/helix/BaseBrokerStarter.java | 5 +-
.../controller/helix/ControllerRequestClient.java | 21 ++++++++
.../pinot/controller/helix/ControllerTest.java | 10 ++++
.../CPUMemThreadLevelAccountingObjects.java | 1 +
.../PerQueryCPUMemAccountantFactory.java | 15 ++++--
.../accounting/ResourceUsageAccountantFactory.java | 11 ++--
.../accounting/ResourceManagerAccountingTest.java | 61 +++++++++++++---------
.../tests/OOMProtectionEnabledIntegrationTest.java | 54 +++++++++++++++++++
.../tests/WindowResourceAccountingTest.java | 1 +
.../runtime/operator/MultiStageAccountingTest.java | 1 +
.../MultistageResourceUsageAccountingTest.java | 1 +
.../server/starter/helix/BaseServerStarter.java | 30 ++++++-----
.../accounting/ThreadResourceUsageAccountant.java | 7 +++
.../java/org/apache/pinot/spi/trace/Tracing.java | 51 ++++++++++++++----
.../utils/builder/ControllerRequestURLBuilder.java | 8 +++
15 files changed, 223 insertions(+), 54 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index c694864055..3ae7fd846c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -85,6 +85,7 @@ import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.cursors.ResponseStoreService;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -153,6 +154,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected AbstractResponseStore _responseStore;
protected BrokerGrpcServer _brokerGrpcServer;
protected FailureDetector _failureDetector;
+ protected ThreadResourceUsageAccountant _resourceUsageAccountant;
@Override
public void init(PinotConfiguration brokerConf)
@@ -415,9 +417,10 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
- Tracing.ThreadAccountantOps.initializeThreadAccountant(
+ _resourceUsageAccountant =
Tracing.ThreadAccountantOps.createThreadAccountant(
_brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
_instanceId,
org.apache.pinot.spi.config.instance.InstanceType.BROKER);
+ Preconditions.checkNotNull(_resourceUsageAccountant);
Tracing.ThreadAccountantOps.startThreadAccountant();
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 8acf41df87..c9efb0a418 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -473,4 +473,25 @@ public class ControllerRequestClient {
return new Tenant(TenantRole.SERVER, tenantName, numOfflineServers +
numRealtimeServers, numOfflineServers,
numRealtimeServers).toJsonString();
}
+
+ public void updateClusterConfig(Map<String, String> newConfigs)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(
+ new URI(_controllerRequestURLBuilder.forClusterConfigUpdate()),
+ JsonUtils.objectToString(newConfigs), _headers));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public void deleteClusterConfig(String config)
+ throws IOException {
+ try {
+ HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(
+ new
URI(_controllerRequestURLBuilder.forClusterConfigDelete(config)), _headers));
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e3225dea09..027b03e37b 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -937,6 +937,16 @@ public class ControllerTest {
sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName,
tableName, tableType));
}
+ public void updateClusterConfig(Map<String, String> clusterConfig)
+ throws IOException {
+ getControllerRequestClient().updateClusterConfig(clusterConfig);
+ }
+
+ public void deleteClusterConfig(String clusterConfig)
+ throws IOException {
+ getControllerRequestClient().deleteClusterConfig(clusterConfig);
+ }
+
/**
* Trigger a task on a table and wait for completion
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 3744d1729b..b6a0bc661a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -76,6 +76,7 @@ public class CPUMemThreadLevelAccountingObjects {
_currentThreadCPUTimeSampleMS = 0;
// clear memory usage
_currentThreadMemoryAllocationSampleBytes = 0;
+ _errorStatus.set(null);
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 71944bd610..7402b5138f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -90,7 +90,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
*/
private static final String ACCOUNTANT_TASK_NAME =
"CPUMemThreadAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;
- private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(1, r -> {
+ private final ExecutorService _executorService =
Executors.newFixedThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
@@ -213,6 +213,10 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
return new WatcherTask();
}
+ public QueryMonitorConfig getQueryMonitorConfig() {
+ return _watcherTask.getQueryMonitorConfig();
+ }
+
@Override
public Collection<? extends ThreadResourceTracker> getThreadResources() {
return _threadEntriesMap.values();
@@ -431,7 +435,12 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
@Override
public void startWatcherTask() {
- EXECUTOR_SERVICE.submit(_watcherTask);
+ _executorService.submit(_watcherTask);
+ }
+
+ @Override
+ public void stopWatcherTask() {
+ _executorService.shutdownNow();
}
@Override
@@ -756,7 +765,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
@Override
public void run() {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
runOnce();
} finally {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 4c4bcc8d02..3b3d509bf5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -54,7 +54,7 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
private static final String ACCOUNTANT_TASK_NAME =
"ResourceUsageAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;
- private static final ExecutorService EXECUTOR_SERVICE =
Executors.newFixedThreadPool(1, r -> {
+ private final ExecutorService _executorService =
Executors.newFixedThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
@@ -286,7 +286,12 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
@Override
public void startWatcherTask() {
- EXECUTOR_SERVICE.submit(_watcherTask);
+ _executorService.submit(_watcherTask);
+ }
+
+ @Override
+ public void stopWatcherTask() {
+ _executorService.shutdownNow();
}
@Override
@@ -315,7 +320,7 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
@Override
public void run() {
LOGGER.debug("Running timed task for {}", this.getClass().getName());
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
// Preaggregation.
runPreAggregation();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 5e473e402b..4986e7684d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -59,6 +59,7 @@ import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
@@ -99,14 +100,15 @@ public class ResourceManagerAccountingTest {
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
false);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
true);
- ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
- Future[] futures = new Future[2000];
- AtomicInteger atomicInteger = new AtomicInteger();
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
- Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg,
"testCPUtimeProvider",
- InstanceType.SERVER);
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+ "testCPUtimeProvider", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
+ ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
+ Future[] futures = new Future[2000];
+ AtomicInteger atomicInteger = new AtomicInteger();
+
for (int k = 0; k < 30; k++) {
int finalK = k;
rm.getQueryRunners().submit(() -> {
@@ -164,12 +166,13 @@ public class ResourceManagerAccountingTest {
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
- ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
- Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg,
"testCPUtimeProvider",
- InstanceType.SERVER);
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+ "testCPUtimeProvider", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
+ ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
+
for (int k = 0; k < 30; k++) {
int finalK = k;
rm.getQueryRunners().submit(() -> {
@@ -241,13 +244,13 @@ public class ResourceManagerAccountingTest {
String workloadName = CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME;
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
- Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg,
"testWorkloadMemoryAccounting",
- InstanceType.SERVER);
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+ "testWorkloadMemoryAccounting", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
WorkloadBudgetManager workloadBudgetManager =
Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
workloadBudgetManager.addOrUpdateWorkload(workloadName, 88_000_000,
27_000_000);
- ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+ ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
for (int k = 0; k < 30; k++) {
int finalK = k;
@@ -301,7 +304,8 @@ public class ResourceManagerAccountingTest {
@Test
public void testWorkerThreadInterruption()
throws Exception {
- ResourceManager rm = getResourceManager(2, 5, 1, 3,
Collections.emptyMap());
+ ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap(),
+ new Tracing.DefaultThreadResourceUsageAccountant());
AtomicReference<Future>[] futures = new AtomicReference[5];
for (int i = 0; i < 5; i++) {
futures[i] = new AtomicReference<>();
@@ -379,10 +383,12 @@ public class ResourceManagerAccountingTest {
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
PinotConfiguration config = getConfig(20, 2, configs);
- ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
- Tracing.ThreadAccountantOps.initializeThreadAccountant(config,
"testSelect", InstanceType.SERVER);
+ Tracing.unregisterThreadAccountant();
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+ "testSelect", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
+ ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);
CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
@@ -404,7 +410,7 @@ public class ResourceManagerAccountingTest {
}
});
}
- latch.await();
+ latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
// assert that EarlyTerminationException was thrown in at least one runner
thread
Assert.assertTrue(earlyTerminationOccurred.get());
}
@@ -450,11 +456,14 @@ public class ResourceManagerAccountingTest {
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
PinotConfiguration config = getConfig(20, 2, configs);
- ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
// init accountant and start watcher task
- Tracing.ThreadAccountantOps.initializeThreadAccountant(config,
"testGroupBy", InstanceType.SERVER);
+ Tracing.unregisterThreadAccountant();
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+ "testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
+ ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);
+
CountDownLatch latch = new CountDownLatch(100);
AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
@@ -475,7 +484,7 @@ public class ResourceManagerAccountingTest {
}
});
}
- latch.await();
+ latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
// assert that EarlyTerminationException was thrown in at least one runner
thread
Assert.assertTrue(earlyTerminationOccurred.get());
}
@@ -507,12 +516,14 @@ public class ResourceManagerAccountingTest {
configs.put(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
0.00f);
PinotConfiguration config = getConfig(2, 2, configs);
- ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
// init accountant and start watcher task
- Tracing.ThreadAccountantOps.initializeThreadAccountant(config,
"testJsonIndexExtractMapOOM",
- InstanceType.SERVER);
+ Tracing.unregisterThreadAccountant();
+ ThreadResourceUsageAccountant accountant =
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+ "testJsonIndexExtractMapOOM", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
+ ResourceManager rm = getResourceManager(2, 2, 1, 1, configs, accountant);
+
Supplier<String> randomJsonValue = () -> {
Random random = new Random();
int length = random.nextInt(1000);
@@ -575,7 +586,7 @@ public class ResourceManagerAccountingTest {
}
});
- latch.await();
+ latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
Assert.assertTrue(mutableEarlyTerminationOccurred.get(),
"Expected early termination reading the mutable index");
Assert.assertTrue(immutableEarlyTerminationOccurred.get(),
@@ -602,7 +613,7 @@ public class ResourceManagerAccountingTest {
"org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING,
true);
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
- ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+ ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, new
Tracing.DefaultThreadResourceUsageAccountant());
Future[] futures = new Future[30];
for (int k = 0; k < 4; k++) {
@@ -651,9 +662,9 @@ public class ResourceManagerAccountingTest {
}
private ResourceManager getResourceManager(int runners, int workers, final
int softLimit, final int hardLimit,
- Map<String, Object> map) {
+ Map<String, Object> map, ThreadResourceUsageAccountant accountant) {
- return new ResourceManager(getConfig(runners, workers, map), new
Tracing.DefaultThreadResourceUsageAccountant()) {
+ return new ResourceManager(getConfig(runners, workers, map), accountant) {
@Override
public QueryExecutorService getExecutorService(ServerQueryRequest query,
SchedulerGroupAccountant accountant) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
index 68c3a8a421..0ad70dac9b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
@@ -19,15 +19,23 @@
package org.apache.pinot.integration.tests;
import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
public class OOMProtectionEnabledIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final int NUM_OFFLINE_SEGMENTS = 8;
@@ -62,6 +70,7 @@ public class OOMProtectionEnabledIntegrationTest extends
BaseClusterIntegrationT
startZk();
startController();
startBroker();
+ Tracing.unregisterThreadAccountant();
startServer();
startKafka();
@@ -100,4 +109,49 @@ public class OOMProtectionEnabledIntegrationTest extends
BaseClusterIntegrationT
setUseMultiStageQueryEngine(useMultiStageEngine);
super.testHardcodedQueries();
}
+
+ @Test
+ public void testChangeOomKillQueryEnabled()
+ throws IOException {
+ assertTrue(_serverStarters.get(0)
+ .getResourceUsageAccountant() instanceof
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant);
+ PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
+
(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant)
_serverStarters.get(0)
+ .getResourceUsageAccountant();
+
+ QueryMonitorConfig queryMonitorConfig = accountant.getQueryMonitorConfig();
+ assertFalse(queryMonitorConfig.isOomKillQueryEnabled());
+
+
updateClusterConfig(Map.of("pinot.query.scheduler.accounting.oom.enable.killing.query",
"true",
+ "pinot.query.scheduler.accounting.query.killed.metric.enabled",
"true"));
+
+ TestUtils.waitForCondition(aVoid -> {
+ QueryMonitorConfig updatedQueryMonitorConfig =
accountant.getQueryMonitorConfig();
+ return updatedQueryMonitorConfig.isOomKillQueryEnabled()
+ && updatedQueryMonitorConfig.isQueryKilledMetricEnabled();
+ }, 1000L, "Waiting for OOM protection to be enabled");
+ }
+
+ @Test
+ public void testChangeThresholds()
+ throws IOException {
+ assertTrue(_serverStarters.get(0)
+ .getResourceUsageAccountant() instanceof
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant);
+ PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant
accountant =
+
(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant)
_serverStarters.get(0)
+ .getResourceUsageAccountant();
+
+
+ QueryMonitorConfig queryMonitorConfig = accountant.getQueryMonitorConfig();
+
updateClusterConfig(Map.of("pinot.query.scheduler.accounting.oom.alarming.usage.ratio",
"0.7f",
+ "pinot.query.scheduler.accounting.oom.critical.heap.usage.ratio",
"0.75f",
+ "pinot.query.scheduler.accounting.oom.panic.heap.usage.ratio",
"0.8f"));
+
+ TestUtils.waitForCondition(aVoid -> {
+ QueryMonitorConfig updatedQueryMonitorConfig =
accountant.getQueryMonitorConfig();
+ return updatedQueryMonitorConfig.getAlarmingLevel() !=
queryMonitorConfig.getAlarmingLevel()
+ && updatedQueryMonitorConfig.getCriticalLevel() !=
queryMonitorConfig.getCriticalLevel()
+ && updatedQueryMonitorConfig.getPanicLevel() !=
queryMonitorConfig.getPanicLevel();
+ }, 1000L, "Waiting for OOM protection to be enabled");
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
index d511787f8e..b6158211a0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
@@ -83,6 +83,7 @@ public class WindowResourceAccountingTest extends
BaseClusterIntegrationTest {
startZk();
startController();
startBroker();
+ Tracing.unregisterThreadAccountant();
startServer();
if (_controllerRequestURLBuilder == null) {
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index 37a21e314f..de34a977d0 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -91,6 +91,7 @@ public class MultiStageAccountingTest implements ITest {
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING,
false);
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY,
true);
// init accountant and start watcher task
+ Tracing.unregisterThreadAccountant();
Tracing.ThreadAccountantOps.initializeThreadAccountant(new
PinotConfiguration(configs), "testGroupBy",
InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
index 9f19b55171..534f1f413d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
@@ -91,6 +91,7 @@ public class MultistageResourceUsageAccountingTest implements
ITest {
configs.put(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
true);
// init accountant and start watcher task
PinotConfiguration pinotCfg = new PinotConfiguration(configs);
+ Tracing.unregisterThreadAccountant();
Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg,
"testGroupBy", InstanceType.SERVER);
Tracing.ThreadAccountantOps.startThreadAccountant();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 27e6ecc852..c3f6f0e611 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -172,6 +172,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
protected volatile boolean _isServerReadyToServeQueries = false;
private ScheduledExecutorService _helixMessageCountScheduler;
+ protected ThreadResourceUsageAccountant _resourceUsageAccountant;
@Override
public void init(PinotConfiguration serverConf)
@@ -243,14 +244,6 @@ public abstract class BaseServerStarter implements
ServiceStartable {
// Initialize the data buffer factory
PinotDataBuffer.loadDefaultFactory(serverConf);
- // Enable/disable thread CPU time measurement through instance config.
- ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
-
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
- Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
- // Enable/disable thread memory allocation tracking through instance config
- ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
-
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
- Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
// Set data table version send to broker.
int dataTableVersion =
_serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION,
DataTableBuilderFactory.DEFAULT_VERSION);
@@ -670,16 +663,24 @@ public abstract class BaseServerStarter implements
ServiceStartable {
segmentDownloadThrottler,
segmentMultiColTextIndexPreprocessThrottler);
}
+ // Enable/disable thread CPU time measurement through instance config.
+ ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
+
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
+ Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
+ // Enable/disable thread memory allocation tracking through instance config
+ ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
+
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
+ Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
// Initialize the thread accountant for query killing
- Tracing.ThreadAccountantOps.initializeThreadAccountant(
+ _resourceUsageAccountant =
Tracing.ThreadAccountantOps.createThreadAccountant(
_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX),
_instanceId,
org.apache.pinot.spi.config.instance.InstanceType.SERVER);
- ThreadResourceUsageAccountant threadAccountant =
Tracing.getThreadAccountant();
+ Preconditions.checkNotNull(_resourceUsageAccountant);
SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
ServerConf serverConf = new ServerConf(_serverConf);
_serverInstance = new ServerInstance(serverConf, _helixManager,
_accessControlFactory, _segmentOperationsThrottler,
- sendStatsPredicate, threadAccountant);
+ sendStatsPredicate, _resourceUsageAccountant);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
@@ -784,7 +785,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
// Start the thread accountant
Tracing.ThreadAccountantOps.startThreadAccountant();
- PinotClusterConfigChangeListener threadAccountantListener =
threadAccountant.getClusterConfigChangeListener();
+ PinotClusterConfigChangeListener threadAccountantListener =
+ _resourceUsageAccountant.getClusterConfigChangeListener();
if (threadAccountantListener != null) {
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
}
@@ -1149,4 +1151,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
LOGGER.warn("Failed to refresh Helix message count", e);
}
}
+
+ public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+ return _resourceUsageAccountant;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index abf823fa6f..4fdb4f1e1b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -132,6 +132,13 @@ public interface ThreadResourceUsageAccountant {
*/
void startWatcherTask();
+ /**
+ * Stop the periodic watcher task.
+ */
+ default void stopWatcherTask() {
+ // Default implementation does nothing. Subclasses can override to stop
the watcher task.
+ }
+
@Nullable
default PinotClusterConfigChangeListener getClusterConfigChangeListener() {
return null;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 4cb83ab79c..afc9a283c1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -67,7 +67,7 @@ public class Tracing {
private static final class Holder {
static final Tracer TRACER = TRACER_REGISTRATION.get() == null ?
createDefaultTracer() : TRACER_REGISTRATION.get();
- static final ThreadResourceUsageAccountant ACCOUNTANT =
+ static ThreadResourceUsageAccountant _accountant =
ACCOUNTANT_REGISTRATION.get() == null ?
createDefaultThreadAccountant() : ACCOUNTANT_REGISTRATION.get();
}
@@ -88,7 +88,12 @@ public class Tracing {
* @return true if the registration was successful.
*/
public static boolean register(ThreadResourceUsageAccountant
threadResourceUsageAccountant) {
- return ACCOUNTANT_REGISTRATION.compareAndSet(null,
threadResourceUsageAccountant);
+ if (ACCOUNTANT_REGISTRATION.compareAndSet(null,
threadResourceUsageAccountant)) {
+ Holder._accountant = threadResourceUsageAccountant;
+ LOGGER.info("Registered thread accountant: {}",
threadResourceUsageAccountant.getClass().getName());
+ return true;
+ }
+ return false;
}
/**
@@ -109,7 +114,7 @@ public class Tracing {
* @return the registered threadAccountant.
*/
public static ThreadResourceUsageAccountant getThreadAccountant() {
- return Holder.ACCOUNTANT;
+ return Holder._accountant;
}
/**
@@ -138,7 +143,23 @@ public class Tracing {
*/
private static DefaultThreadResourceUsageAccountant
createDefaultThreadAccountant() {
LOGGER.info("Using default thread accountant");
- return new DefaultThreadResourceUsageAccountant();
+ DefaultThreadResourceUsageAccountant accountant = new
DefaultThreadResourceUsageAccountant();
+ Holder._accountant = accountant;
+ ACCOUNTANT_REGISTRATION.set(accountant);
+ return accountant;
+ }
+
+ /**
+ * Unregisters the thread accountant. This is only used in tests when a
custom thread accountant is required.
+ * This will reset the thread accountant to null, so that the next call to
initializeThreadAccountant or
+ * createThreadAccountant will register the new thread accountant.
+ */
+ public static void unregisterThreadAccountant() {
+ if (Holder._accountant != null) {
+ Holder._accountant.stopWatcherTask();
+ }
+ Holder._accountant = null;
+ ACCOUNTANT_REGISTRATION.set(null);
}
/**
@@ -323,25 +344,35 @@ public class Tracing {
public static void initializeThreadAccountant(PinotConfiguration config,
String instanceId,
InstanceType instanceType) {
- String factoryName =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
+ createThreadAccountant(config, instanceId, instanceType);
+ }
+
+ public static ThreadResourceUsageAccountant
createThreadAccountant(PinotConfiguration config, String instanceId,
+ InstanceType instanceType) {
_workloadBudgetManager = new WorkloadBudgetManager(config);
- if (factoryName == null) {
- LOGGER.warn("No thread accountant factory provided, using default
implementation");
- } else {
+ String factoryName =
config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
+ ThreadResourceUsageAccountant accountant = null;
+ if (factoryName != null) {
LOGGER.info("Config-specified accountant factory name {}",
factoryName);
try {
ThreadAccountantFactory threadAccountantFactory =
(ThreadAccountantFactory)
Class.forName(factoryName).getDeclaredConstructor().newInstance();
- boolean registered =
Tracing.register(threadAccountantFactory.init(config, instanceId,
instanceType));
LOGGER.info("Using accountant provided by {}", factoryName);
+ accountant = threadAccountantFactory.init(config, instanceId,
instanceType);
+ boolean registered = register(accountant);
if (!registered) {
- LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is
already registered.", factoryName);
+ LOGGER.warn("ThreadAccountant register unsuccessful, as it is
already registered.");
}
} catch (Exception exception) {
LOGGER.warn("Using default implementation of thread accountant, "
+ "due to invalid thread accountant factory {} provided,
exception:", factoryName, exception);
}
}
+ // If no factory is specified or the factory creation failed, use the
default implementation
+ if (accountant == null) {
+ accountant = createDefaultThreadAccountant();
+ }
+ return accountant;
}
public static void startThreadAccountant() {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index cb5de2956a..e90e427239 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -694,4 +694,12 @@ public class ControllerRequestURLBuilder {
public String forTableTimeBoundary(String tableName) {
return StringUtil.join("/", _baseUrl, "tables", tableName, "timeBoundary");
}
+
+ public String forClusterConfigUpdate() {
+ return StringUtil.join("/", _baseUrl, "cluster", "configs");
+ }
+
+ public String forClusterConfigDelete(String config) {
+ return StringUtil.join("/", _baseUrl, "cluster", "configs", config);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]