This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 90346d5f37 Allow Reset of ThreadResourceUsageAccountant in 
Tracing.java (#16360)
90346d5f37 is described below

commit 90346d5f379d24687b0117ee7ff8aafa113ac552
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Wed Jul 23 18:36:28 2025 +0530

    Allow Reset of ThreadResourceUsageAccountant in Tracing.java (#16360)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  5 +-
 .../controller/helix/ControllerRequestClient.java  | 21 ++++++++
 .../pinot/controller/helix/ControllerTest.java     | 10 ++++
 .../CPUMemThreadLevelAccountingObjects.java        |  1 +
 .../PerQueryCPUMemAccountantFactory.java           | 15 ++++--
 .../accounting/ResourceUsageAccountantFactory.java | 11 ++--
 .../accounting/ResourceManagerAccountingTest.java  | 61 +++++++++++++---------
 .../tests/OOMProtectionEnabledIntegrationTest.java | 54 +++++++++++++++++++
 .../tests/WindowResourceAccountingTest.java        |  1 +
 .../runtime/operator/MultiStageAccountingTest.java |  1 +
 .../MultistageResourceUsageAccountingTest.java     |  1 +
 .../server/starter/helix/BaseServerStarter.java    | 30 ++++++-----
 .../accounting/ThreadResourceUsageAccountant.java  |  7 +++
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 51 ++++++++++++++----
 .../utils/builder/ControllerRequestURLBuilder.java |  8 +++
 15 files changed, 223 insertions(+), 54 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index c694864055..3ae7fd846c 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -85,6 +85,7 @@ import org.apache.pinot.core.util.trace.ContinuousJfrStarter;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.cursors.ResponseStoreService;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -153,6 +154,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected AbstractResponseStore _responseStore;
   protected BrokerGrpcServer _brokerGrpcServer;
   protected FailureDetector _failureDetector;
+  protected ThreadResourceUsageAccountant _resourceUsageAccountant;
 
   @Override
   public void init(PinotConfiguration brokerConf)
@@ -415,9 +417,10 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
         
_brokerConf.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
             
CommonConstants.Broker.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(
+    _resourceUsageAccountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(
         _brokerConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
         org.apache.pinot.spi.config.instance.InstanceType.BROKER);
+    Preconditions.checkNotNull(_resourceUsageAccountant);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
     String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 8acf41df87..c9efb0a418 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -473,4 +473,25 @@ public class ControllerRequestClient {
     return new Tenant(TenantRole.SERVER, tenantName, numOfflineServers + 
numRealtimeServers, numOfflineServers,
         numRealtimeServers).toJsonString();
   }
+
+  public void updateClusterConfig(Map<String, String> newConfigs)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(
+          new URI(_controllerRequestURLBuilder.forClusterConfigUpdate()),
+          JsonUtils.objectToString(newConfigs), _headers));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void deleteClusterConfig(String config)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendDeleteRequest(
+          new 
URI(_controllerRequestURLBuilder.forClusterConfigDelete(config)), _headers));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index e3225dea09..027b03e37b 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -937,6 +937,16 @@ public class ControllerTest {
     sendGetRequest(getControllerRequestURLBuilder().forPeriodTaskRun(taskName, 
tableName, tableType));
   }
 
+  public void updateClusterConfig(Map<String, String> clusterConfig)
+      throws IOException {
+    getControllerRequestClient().updateClusterConfig(clusterConfig);
+  }
+
+  public void deleteClusterConfig(String clusterConfig)
+      throws IOException {
+    getControllerRequestClient().deleteClusterConfig(clusterConfig);
+  }
+
   /**
    * Trigger a task on a table and wait for completion
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 3744d1729b..b6a0bc661a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -76,6 +76,7 @@ public class CPUMemThreadLevelAccountingObjects {
       _currentThreadCPUTimeSampleMS = 0;
       // clear memory usage
       _currentThreadMemoryAllocationSampleBytes = 0;
+      _errorStatus.set(null);
     }
 
     /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 71944bd610..7402b5138f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -90,7 +90,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
      */
     private static final String ACCOUNTANT_TASK_NAME = 
"CPUMemThreadAccountant";
     private static final int ACCOUNTANT_PRIORITY = 4;
-    private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(1, r -> {
+    private final ExecutorService _executorService = 
Executors.newFixedThreadPool(1, r -> {
       Thread thread = new Thread(r);
       thread.setPriority(ACCOUNTANT_PRIORITY);
       thread.setDaemon(true);
@@ -213,6 +213,10 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
       return new WatcherTask();
     }
 
+    public QueryMonitorConfig getQueryMonitorConfig() {
+      return _watcherTask.getQueryMonitorConfig();
+    }
+
     @Override
     public Collection<? extends ThreadResourceTracker> getThreadResources() {
       return _threadEntriesMap.values();
@@ -431,7 +435,12 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
     @Override
     public void startWatcherTask() {
-      EXECUTOR_SERVICE.submit(_watcherTask);
+      _executorService.submit(_watcherTask);
+    }
+
+    @Override
+    public void stopWatcherTask() {
+      _executorService.shutdownNow();
     }
 
     @Override
@@ -756,7 +765,7 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
 
       @Override
       public void run() {
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
           try {
             runOnce();
           } finally {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 4c4bcc8d02..3b3d509bf5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -54,7 +54,7 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
     private static final String ACCOUNTANT_TASK_NAME = 
"ResourceUsageAccountant";
     private static final int ACCOUNTANT_PRIORITY = 4;
 
-    private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(1, r -> {
+    private final ExecutorService _executorService = 
Executors.newFixedThreadPool(1, r -> {
       Thread thread = new Thread(r);
       thread.setPriority(ACCOUNTANT_PRIORITY);
       thread.setDaemon(true);
@@ -286,7 +286,12 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
 
     @Override
     public void startWatcherTask() {
-      EXECUTOR_SERVICE.submit(_watcherTask);
+      _executorService.submit(_watcherTask);
+    }
+
+    @Override
+    public void stopWatcherTask() {
+      _executorService.shutdownNow();
     }
 
     @Override
@@ -315,7 +320,7 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
       @Override
       public void run() {
         LOGGER.debug("Running timed task for {}", this.getClass().getName());
-        while (true) {
+        while (!Thread.currentThread().isInterrupted()) {
           try {
             // Preaggregation.
             runPreAggregation();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
index 5e473e402b..4986e7684d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/ResourceManagerAccountingTest.java
@@ -59,6 +59,7 @@ import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
@@ -99,14 +100,15 @@ public class ResourceManagerAccountingTest {
         "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
false);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
true);
-    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
-    Future[] futures = new Future[2000];
-    AtomicInteger atomicInteger = new AtomicInteger();
     PinotConfiguration pinotCfg = new PinotConfiguration(configs);
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, 
"testCPUtimeProvider",
-        InstanceType.SERVER);
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+        "testCPUtimeProvider", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
+    Future[] futures = new Future[2000];
+    AtomicInteger atomicInteger = new AtomicInteger();
+
     for (int k = 0; k < 30; k++) {
       int finalK = k;
       rm.getQueryRunners().submit(() -> {
@@ -164,12 +166,13 @@ public class ResourceManagerAccountingTest {
         "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
-    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
     PinotConfiguration pinotCfg = new PinotConfiguration(configs);
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, 
"testCPUtimeProvider",
-        InstanceType.SERVER);
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+        "testCPUtimeProvider", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
+
     for (int k = 0; k < 30; k++) {
       int finalK = k;
       rm.getQueryRunners().submit(() -> {
@@ -241,13 +244,13 @@ public class ResourceManagerAccountingTest {
 
     String workloadName = CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME;
     PinotConfiguration pinotCfg = new PinotConfiguration(configs);
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, 
"testWorkloadMemoryAccounting",
-        InstanceType.SERVER);
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(pinotCfg,
+        "testWorkloadMemoryAccounting", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
     WorkloadBudgetManager workloadBudgetManager =
         Tracing.ThreadAccountantOps.getWorkloadBudgetManager();
     workloadBudgetManager.addOrUpdateWorkload(workloadName, 88_000_000, 
27_000_000);
-    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, accountant);
 
     for (int k = 0; k < 30; k++) {
       int finalK = k;
@@ -301,7 +304,8 @@ public class ResourceManagerAccountingTest {
   @Test
   public void testWorkerThreadInterruption()
       throws Exception {
-    ResourceManager rm = getResourceManager(2, 5, 1, 3, 
Collections.emptyMap());
+    ResourceManager rm = getResourceManager(2, 5, 1, 3, Collections.emptyMap(),
+        new Tracing.DefaultThreadResourceUsageAccountant());
     AtomicReference<Future>[] futures = new AtomicReference[5];
     for (int i = 0; i < 5; i++) {
       futures[i] = new AtomicReference<>();
@@ -379,10 +383,12 @@ public class ResourceManagerAccountingTest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
     PinotConfiguration config = getConfig(20, 2, configs);
-    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testSelect", InstanceType.SERVER);
+    Tracing.unregisterThreadAccountant();
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+        "testSelect", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
+    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);
 
     CountDownLatch latch = new CountDownLatch(100);
     AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
@@ -404,7 +410,7 @@ public class ResourceManagerAccountingTest {
         }
       });
     }
-    latch.await();
+    latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
     // assert that EarlyTerminationException was thrown in at least one runner 
thread
     Assert.assertTrue(earlyTerminationOccurred.get());
   }
@@ -450,11 +456,14 @@ public class ResourceManagerAccountingTest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
     PinotConfiguration config = getConfig(20, 2, configs);
 
-    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testGroupBy", InstanceType.SERVER);
+    Tracing.unregisterThreadAccountant();
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+        "testGroupBy", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
+    ResourceManager rm = getResourceManager(20, 2, 1, 1, configs, accountant);
+
     CountDownLatch latch = new CountDownLatch(100);
     AtomicBoolean earlyTerminationOccurred = new AtomicBoolean(false);
 
@@ -475,7 +484,7 @@ public class ResourceManagerAccountingTest {
         }
       });
     }
-    latch.await();
+    latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
     // assert that EarlyTerminationException was thrown in at least one runner 
thread
     Assert.assertTrue(earlyTerminationOccurred.get());
   }
@@ -507,12 +516,14 @@ public class ResourceManagerAccountingTest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_MIN_MEMORY_FOOTPRINT_TO_KILL_RATIO,
 0.00f);
 
     PinotConfiguration config = getConfig(2, 2, configs);
-    ResourceManager rm = getResourceManager(2, 2, 1, 1, configs);
     // init accountant and start watcher task
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(config, 
"testJsonIndexExtractMapOOM",
-        InstanceType.SERVER);
+    Tracing.unregisterThreadAccountant();
+    ThreadResourceUsageAccountant accountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(config,
+        "testJsonIndexExtractMapOOM", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
+    ResourceManager rm = getResourceManager(2, 2, 1, 1, configs, accountant);
+
     Supplier<String> randomJsonValue = () -> {
       Random random = new Random();
       int length = random.nextInt(1000);
@@ -575,7 +586,7 @@ public class ResourceManagerAccountingTest {
         }
       });
 
-      latch.await();
+      latch.await(1, java.util.concurrent.TimeUnit.MINUTES);
       Assert.assertTrue(mutableEarlyTerminationOccurred.get(),
           "Expected early termination reading the mutable index");
       Assert.assertTrue(immutableEarlyTerminationOccurred.get(),
@@ -602,7 +613,7 @@ public class ResourceManagerAccountingTest {
         "org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory");
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_MEMORY_SAMPLING, 
true);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
-    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs);
+    ResourceManager rm = getResourceManager(20, 40, 1, 1, configs, new 
Tracing.DefaultThreadResourceUsageAccountant());
     Future[] futures = new Future[30];
 
     for (int k = 0; k < 4; k++) {
@@ -651,9 +662,9 @@ public class ResourceManagerAccountingTest {
   }
 
   private ResourceManager getResourceManager(int runners, int workers, final 
int softLimit, final int hardLimit,
-      Map<String, Object> map) {
+      Map<String, Object> map, ThreadResourceUsageAccountant accountant) {
 
-    return new ResourceManager(getConfig(runners, workers, map), new 
Tracing.DefaultThreadResourceUsageAccountant()) {
+    return new ResourceManager(getConfig(runners, workers, map), accountant) {
 
       @Override
       public QueryExecutorService getExecutorService(ServerQueryRequest query, 
SchedulerGroupAccountant accountant) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
index 68c3a8a421..0ad70dac9b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OOMProtectionEnabledIntegrationTest.java
@@ -19,15 +19,23 @@
 package org.apache.pinot.integration.tests;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
 
 public class OOMProtectionEnabledIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private static final int NUM_OFFLINE_SEGMENTS = 8;
@@ -62,6 +70,7 @@ public class OOMProtectionEnabledIntegrationTest extends 
BaseClusterIntegrationT
     startZk();
     startController();
     startBroker();
+    Tracing.unregisterThreadAccountant();
     startServer();
     startKafka();
 
@@ -100,4 +109,49 @@ public class OOMProtectionEnabledIntegrationTest extends 
BaseClusterIntegrationT
     setUseMultiStageQueryEngine(useMultiStageEngine);
     super.testHardcodedQueries();
   }
+
+  @Test
+  public void testChangeOomKillQueryEnabled()
+      throws IOException {
+    assertTrue(_serverStarters.get(0)
+        .getResourceUsageAccountant() instanceof 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant);
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        
(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant) 
_serverStarters.get(0)
+            .getResourceUsageAccountant();
+
+    QueryMonitorConfig queryMonitorConfig = accountant.getQueryMonitorConfig();
+    assertFalse(queryMonitorConfig.isOomKillQueryEnabled());
+
+    
updateClusterConfig(Map.of("pinot.query.scheduler.accounting.oom.enable.killing.query",
 "true",
+        "pinot.query.scheduler.accounting.query.killed.metric.enabled", 
"true"));
+
+    TestUtils.waitForCondition(aVoid -> {
+      QueryMonitorConfig updatedQueryMonitorConfig = 
accountant.getQueryMonitorConfig();
+      return updatedQueryMonitorConfig.isOomKillQueryEnabled()
+          && updatedQueryMonitorConfig.isQueryKilledMetricEnabled();
+    }, 1000L, "Waiting for OOM protection to be enabled");
+  }
+
+  @Test
+  public void testChangeThresholds()
+      throws IOException {
+    assertTrue(_serverStarters.get(0)
+        .getResourceUsageAccountant() instanceof 
PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant);
+    PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant 
accountant =
+        
(PerQueryCPUMemAccountantFactory.PerQueryCPUMemResourceUsageAccountant) 
_serverStarters.get(0)
+            .getResourceUsageAccountant();
+
+
+    QueryMonitorConfig queryMonitorConfig = accountant.getQueryMonitorConfig();
+    
updateClusterConfig(Map.of("pinot.query.scheduler.accounting.oom.alarming.usage.ratio",
 "0.7f",
+        "pinot.query.scheduler.accounting.oom.critical.heap.usage.ratio", 
"0.75f",
+        "pinot.query.scheduler.accounting.oom.panic.heap.usage.ratio", 
"0.8f"));
+
+    TestUtils.waitForCondition(aVoid -> {
+      QueryMonitorConfig updatedQueryMonitorConfig = 
accountant.getQueryMonitorConfig();
+      return updatedQueryMonitorConfig.getAlarmingLevel() != 
queryMonitorConfig.getAlarmingLevel()
+          && updatedQueryMonitorConfig.getCriticalLevel() != 
queryMonitorConfig.getCriticalLevel()
+          && updatedQueryMonitorConfig.getPanicLevel() != 
queryMonitorConfig.getPanicLevel();
+    }, 1000L, "Waiting for OOM protection to be enabled");
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
index d511787f8e..b6158211a0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/WindowResourceAccountingTest.java
@@ -83,6 +83,7 @@ public class WindowResourceAccountingTest extends 
BaseClusterIntegrationTest {
     startZk();
     startController();
     startBroker();
+    Tracing.unregisterThreadAccountant();
     startServer();
 
     if (_controllerRequestURLBuilder == null) {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index 37a21e314f..de34a977d0 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -91,6 +91,7 @@ public class MultiStageAccountingTest implements ITest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_ENABLE_THREAD_CPU_SAMPLING, 
false);
     
configs.put(CommonConstants.Accounting.CONFIG_OF_OOM_PROTECTION_KILLING_QUERY, 
true);
     // init accountant and start watcher task
+    Tracing.unregisterThreadAccountant();
     Tracing.ThreadAccountantOps.initializeThreadAccountant(new 
PinotConfiguration(configs), "testGroupBy",
         InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
index 9f19b55171..534f1f413d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
@@ -91,6 +91,7 @@ public class MultistageResourceUsageAccountingTest implements 
ITest {
     
configs.put(CommonConstants.Accounting.CONFIG_OF_WORKLOAD_ENABLE_COST_COLLECTION,
 true);
     // init accountant and start watcher task
     PinotConfiguration pinotCfg = new PinotConfiguration(configs);
+    Tracing.unregisterThreadAccountant();
     Tracing.ThreadAccountantOps.initializeThreadAccountant(pinotCfg, 
"testGroupBy", InstanceType.SERVER);
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 27e6ecc852..c3f6f0e611 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -172,6 +172,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
   protected DefaultClusterConfigChangeHandler _clusterConfigChangeHandler;
   protected volatile boolean _isServerReadyToServeQueries = false;
   private ScheduledExecutorService _helixMessageCountScheduler;
+  protected ThreadResourceUsageAccountant _resourceUsageAccountant;
 
   @Override
   public void init(PinotConfiguration serverConf)
@@ -243,14 +244,6 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     // Initialize the data buffer factory
     PinotDataBuffer.loadDefaultFactory(serverConf);
 
-    // Enable/disable thread CPU time measurement through instance config.
-    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
-        
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
-            Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
-    // Enable/disable thread memory allocation tracking through instance config
-    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
-        
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
-            Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
     // Set data table version send to broker.
     int dataTableVersion =
         _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, 
DataTableBuilderFactory.DEFAULT_VERSION);
@@ -670,16 +663,24 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
               segmentDownloadThrottler, 
segmentMultiColTextIndexPreprocessThrottler);
     }
 
+    // Enable/disable thread CPU time measurement through instance config.
+    ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled(
+        
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT,
+            Server.DEFAULT_ENABLE_THREAD_CPU_TIME_MEASUREMENT));
+    // Enable/disable thread memory allocation tracking through instance config
+    ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled(
+        
_serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
+            Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT));
     // Initialize the thread accountant for query killing
-    Tracing.ThreadAccountantOps.initializeThreadAccountant(
+    _resourceUsageAccountant = 
Tracing.ThreadAccountantOps.createThreadAccountant(
         _serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), 
_instanceId,
         org.apache.pinot.spi.config.instance.InstanceType.SERVER);
-    ThreadResourceUsageAccountant threadAccountant = 
Tracing.getThreadAccountant();
+    Preconditions.checkNotNull(_resourceUsageAccountant);
 
     SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
     ServerConf serverConf = new ServerConf(_serverConf);
     _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
-        sendStatsPredicate, threadAccountant);
+        sendStatsPredicate, _resourceUsageAccountant);
     ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
 
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
@@ -784,7 +785,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
 
     // Start the thread accountant
     Tracing.ThreadAccountantOps.startThreadAccountant();
-    PinotClusterConfigChangeListener threadAccountantListener = 
threadAccountant.getClusterConfigChangeListener();
+    PinotClusterConfigChangeListener threadAccountantListener =
+        _resourceUsageAccountant.getClusterConfigChangeListener();
     if (threadAccountantListener != null) {
       
_clusterConfigChangeHandler.registerClusterConfigChangeListener(threadAccountantListener);
     }
@@ -1149,4 +1151,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
       LOGGER.warn("Failed to refresh Helix message count", e);
     }
   }
+
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index abf823fa6f..4fdb4f1e1b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -132,6 +132,13 @@ public interface ThreadResourceUsageAccountant {
    */
   void startWatcherTask();
 
+  /**
+   * Stop the periodic watcher task.
+   */
+  default void stopWatcherTask() {
+    // Default implementation does nothing. Subclasses can override to stop 
the watcher task.
+  }
+
   @Nullable
   default PinotClusterConfigChangeListener getClusterConfigChangeListener() {
     return null;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 4cb83ab79c..afc9a283c1 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -67,7 +67,7 @@ public class Tracing {
 
   private static final class Holder {
     static final Tracer TRACER = TRACER_REGISTRATION.get() == null ? 
createDefaultTracer() : TRACER_REGISTRATION.get();
-    static final ThreadResourceUsageAccountant ACCOUNTANT =
+    static ThreadResourceUsageAccountant _accountant =
         ACCOUNTANT_REGISTRATION.get() == null ? 
createDefaultThreadAccountant() : ACCOUNTANT_REGISTRATION.get();
   }
 
@@ -88,7 +88,12 @@ public class Tracing {
    * @return true if the registration was successful.
    */
   public static boolean register(ThreadResourceUsageAccountant 
threadResourceUsageAccountant) {
-    return ACCOUNTANT_REGISTRATION.compareAndSet(null, 
threadResourceUsageAccountant);
+    if (ACCOUNTANT_REGISTRATION.compareAndSet(null, 
threadResourceUsageAccountant)) {
+      Holder._accountant = threadResourceUsageAccountant;
+      LOGGER.info("Registered thread accountant: {}", 
threadResourceUsageAccountant.getClass().getName());
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -109,7 +114,7 @@ public class Tracing {
    * @return the registered threadAccountant.
    */
   public static ThreadResourceUsageAccountant getThreadAccountant() {
-    return Holder.ACCOUNTANT;
+    return Holder._accountant;
   }
 
   /**
@@ -138,7 +143,23 @@ public class Tracing {
    */
   private static DefaultThreadResourceUsageAccountant 
createDefaultThreadAccountant() {
     LOGGER.info("Using default thread accountant");
-    return new DefaultThreadResourceUsageAccountant();
+    DefaultThreadResourceUsageAccountant accountant = new 
DefaultThreadResourceUsageAccountant();
+    Holder._accountant = accountant;
+    ACCOUNTANT_REGISTRATION.set(accountant);
+    return accountant;
+  }
+
+  /**
+   * Unregisters the thread accountant. This is only used in tests when a 
custom thread accountant is required.
+   * This will reset the thread accountant to null, so that the next call to 
initializeThreadAccountant or
+   * createThreadAccountant will register the new thread accountant.
+   */
+  public static void unregisterThreadAccountant() {
+    if (Holder._accountant != null) {
+      Holder._accountant.stopWatcherTask();
+    }
+    Holder._accountant = null;
+    ACCOUNTANT_REGISTRATION.set(null);
   }
 
   /**
@@ -323,25 +344,35 @@ public class Tracing {
 
     public static void initializeThreadAccountant(PinotConfiguration config, 
String instanceId,
         InstanceType instanceType) {
-      String factoryName = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
+      createThreadAccountant(config, instanceId, instanceType);
+    }
+
+    public static ThreadResourceUsageAccountant 
createThreadAccountant(PinotConfiguration config, String instanceId,
+        InstanceType instanceType) {
       _workloadBudgetManager = new WorkloadBudgetManager(config);
-      if (factoryName == null) {
-        LOGGER.warn("No thread accountant factory provided, using default 
implementation");
-      } else {
+      String factoryName = 
config.getProperty(CommonConstants.Accounting.CONFIG_OF_FACTORY_NAME);
+      ThreadResourceUsageAccountant accountant = null;
+      if (factoryName != null) {
         LOGGER.info("Config-specified accountant factory name {}", 
factoryName);
         try {
           ThreadAccountantFactory threadAccountantFactory =
               (ThreadAccountantFactory) 
Class.forName(factoryName).getDeclaredConstructor().newInstance();
-          boolean registered = 
Tracing.register(threadAccountantFactory.init(config, instanceId, 
instanceType));
           LOGGER.info("Using accountant provided by {}", factoryName);
+          accountant = threadAccountantFactory.init(config, instanceId, 
instanceType);
+          boolean registered = register(accountant);
           if (!registered) {
-            LOGGER.warn("ThreadAccountant {} register unsuccessful, as it is 
already registered.", factoryName);
+            LOGGER.warn("ThreadAccountant register unsuccessful, as it is 
already registered.");
           }
         } catch (Exception exception) {
           LOGGER.warn("Using default implementation of thread accountant, "
               + "due to invalid thread accountant factory {} provided, 
exception:", factoryName, exception);
         }
       }
+      // If no factory is specified or the factory creation failed, use the 
default implementation
+      if (accountant == null) {
+        accountant = createDefaultThreadAccountant();
+      }
+      return accountant;
     }
 
     public static void startThreadAccountant() {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index cb5de2956a..e90e427239 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -694,4 +694,12 @@ public class ControllerRequestURLBuilder {
   public String forTableTimeBoundary(String tableName) {
     return StringUtil.join("/", _baseUrl, "tables", tableName, "timeBoundary");
   }
+
+  public String forClusterConfigUpdate() {
+    return StringUtil.join("/", _baseUrl, "cluster", "configs");
+  }
+
+  public String forClusterConfigDelete(String config) {
+    return StringUtil.join("/", _baseUrl, "cluster", "configs", config);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to