This is an automated email from the ASF dual-hosted git repository.

mayankshriv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dc1b3bd3b7b Consolidate query killing integration tests into single 
class (#18300)
dc1b3bd3b7b is described below

commit dc1b3bd3b7bd02c87c384940dc96748939fb7b8a
Author: Mayank Shrivastava <[email protected]>
AuthorDate: Thu Apr 23 18:15:57 2026 -0700

    Consolidate query killing integration tests into single class (#18300)
    
    - Merge CPU broker, CPU server, and OOM server killing tests into one 
shared cluster
    - Use forward-only config progression (no full reset between tests)
    - Add testDefaultValues to verify accountant defaults after startup
    - Add testDynamicallyToggleQueryKill to verify dynamic config toggling
    - Use dependsOnMethods for explicit test ordering and skip-on-failure
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../tests/BaseQueryKillingIntegrationTest.java     | 209 --------
 .../CpuBasedBrokerQueryKillingIntegrationTest.java | 122 -----
 .../CpuBasedServerQueryKillingIntegrationTest.java | 119 -----
 ...moryBasedServerQueryKillingIntegrationTest.java | 118 -----
 .../tests/QueryKillingIntegrationTest.java         | 530 +++++++++++++++++++++
 5 files changed, 530 insertions(+), 568 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseQueryKillingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseQueryKillingIntegrationTest.java
deleted file mode 100644
index e6045ad1adb..00000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseQueryKillingIntegrationTest.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants.Broker;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-
-/// Base class for query killing integration tests.
-public abstract class BaseQueryKillingIntegrationTest extends 
BaseClusterIntegrationTest {
-  protected static final String STRING_DIM_SV1 = "stringDimSV1";
-  protected static final String STRING_DIM_SV2 = "stringDimSV2";
-  protected static final String INT_DIM_SV1 = "intDimSV1";
-  protected static final String LONG_DIM_SV1 = "longDimSV1";
-  protected static final String DOUBLE_DIM_SV1 = "doubleDimSV1";
-  protected static final String BOOLEAN_DIM_SV1 = "booleanDimSV1";
-  protected static final int NUM_SEGMENTS = 3;
-  protected static final int NUM_DOCS_PER_SEGMENT = 3_000_000;
-
-  protected static final String LARGE_SELECT_STAR_QUERY = "SELECT * FROM 
mytable LIMIT 9000000";
-
-  protected static final String LARGE_DISTINCT_QUERY =
-      "SELECT DISTINCT stringDimSV2 FROM mytable ORDER BY stringDimSV2 LIMIT 
3000000";
-
-  protected static final String LARGE_GROUP_BY_QUERY =
-      "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM mytable 
GROUP BY 2 ORDER BY 1 LIMIT 3000000";
-
-  /// SafeTrim sort-aggregation case, pair-wise combine
-  protected static final String LARGE_GROUP_BY_PAIRWISE_COMBINE_QUERY =
-      "SET sortAggregateSingleThreadedNumSegmentsThreshold=1; SET 
sortAggregateLimitThreshold=3000001; "
-          + "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM 
mytable GROUP BY 2 ORDER BY 2 LIMIT 3000000";
-
-  /// SafeTrim sort-aggregation case, sequential combine
-  protected static final String LARGE_GROUP_BY_SEQUENTIAL_COMBINE_QUERY =
-      "SET sortAggregateSingleThreadedNumSegmentsThreshold=10000; SET 
sortAggregateLimitThreshold=3000001; "
-          + "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM 
mytable GROUP BY 2 ORDER BY 2 LIMIT 3000000";
-
-  protected static final String AGGREGATE_QUERY = "SELECT MIN(intDimSV1) FROM 
mytable";
-  protected static final String SELECT_STAR_QUERY = "SELECT * FROM mytable 
LIMIT 5";
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-
-    // Create and upload the schema and table config
-    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_TABLE_NAME)
-        .addSingleValueDimension(STRING_DIM_SV1, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(STRING_DIM_SV2, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(INT_DIM_SV1, FieldSpec.DataType.INT)
-        .addSingleValueDimension(LONG_DIM_SV1, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(DOUBLE_DIM_SV1, FieldSpec.DataType.DOUBLE)
-        .addSingleValueDimension(BOOLEAN_DIM_SV1, FieldSpec.DataType.BOOLEAN)
-        .build();
-    addSchema(schema);
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build();
-    addTableConfig(tableConfig);
-
-    List<File> avroFiles = createAvroFiles();
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws IOException {
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-
-  @Override
-  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
-    
brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
-    
brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
-  }
-
-  @Override
-  protected void overrideServerConf(PinotConfiguration serverConf) {
-    
serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
-    
serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
-    serverConf.setProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT, 
Integer.MAX_VALUE);
-  }
-
-  @Override
-  protected long getCountStarResult() {
-    return NUM_SEGMENTS * NUM_DOCS_PER_SEGMENT;
-  }
-
-  @Test
-  public void testResourceUsageStats()
-      throws Exception {
-    JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
-    long offlineThreadMemAllocatedBytes = 
queryResponse.get("offlineThreadMemAllocatedBytes").asLong();
-    long offlineResponseSerMemAllocatedBytes = 
queryResponse.get("offlineResponseSerMemAllocatedBytes").asLong();
-    long offlineTotalMemAllocatedBytes = 
queryResponse.get("offlineTotalMemAllocatedBytes").asLong();
-
-    assertTrue(offlineThreadMemAllocatedBytes > 0);
-    assertTrue(offlineResponseSerMemAllocatedBytes > 0);
-    assertEquals(offlineThreadMemAllocatedBytes + 
offlineResponseSerMemAllocatedBytes, offlineTotalMemAllocatedBytes);
-
-    long offlineThreadCpuTimeNs = 
queryResponse.get("offlineThreadCpuTimeNs").asLong();
-    long offlineSystemActivitiesCpuTimeNs = 
queryResponse.get("offlineSystemActivitiesCpuTimeNs").asLong();
-    long offlineResponseSerializationCpuTimeNs = 
queryResponse.get("offlineResponseSerializationCpuTimeNs").asLong();
-    long offlineTotalCpuTimeNs = 
queryResponse.get("offlineTotalCpuTimeNs").asLong();
-    assertTrue(offlineThreadCpuTimeNs > 0);
-    assertTrue(offlineSystemActivitiesCpuTimeNs > 0);
-    assertTrue(offlineResponseSerializationCpuTimeNs > 0);
-    assertEquals(offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + 
offlineResponseSerializationCpuTimeNs,
-        offlineTotalCpuTimeNs);
-  }
-
-  @DataProvider
-  public String[] expensiveQueries() {
-    return new String[]{
-        LARGE_SELECT_STAR_QUERY, LARGE_DISTINCT_QUERY, LARGE_GROUP_BY_QUERY, 
LARGE_GROUP_BY_PAIRWISE_COMBINE_QUERY,
-        LARGE_GROUP_BY_SEQUENTIAL_COMBINE_QUERY
-    };
-  }
-
-  protected List<File> createAvroFiles()
-      throws IOException {
-    // Create Avro schema
-    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
-    avroSchema.setFields(List.of(new Field(STRING_DIM_SV1, 
org.apache.avro.Schema.create(Type.STRING), null, null),
-        new Field(STRING_DIM_SV2, org.apache.avro.Schema.create(Type.STRING), 
null, null),
-        new Field(INT_DIM_SV1, org.apache.avro.Schema.create(Type.INT), null, 
null),
-        new Field(LONG_DIM_SV1, org.apache.avro.Schema.create(Type.LONG), 
null, null),
-        new Field(DOUBLE_DIM_SV1, org.apache.avro.Schema.create(Type.DOUBLE), 
null, null),
-        new Field(BOOLEAN_DIM_SV1, 
org.apache.avro.Schema.create(Type.BOOLEAN), null, null)));
-
-    // Create Avro files
-    List<File> ret = new ArrayList<>();
-    for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
-      File avroFile = new File(_tempDir, "data_" + segmentId + ".avro");
-      try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
-        fileWriter.create(avroSchema, avroFile);
-
-        int randBound = NUM_DOCS_PER_SEGMENT / 2;
-        Random random = new Random(0);
-        for (int docId = 0; docId < NUM_DOCS_PER_SEGMENT; docId++) {
-          GenericData.Record record = new GenericData.Record(avroSchema);
-          record.put(STRING_DIM_SV1, "test query killing");
-          record.put(STRING_DIM_SV2, "test query killing" + docId);
-          record.put(INT_DIM_SV1, random.nextInt(randBound));
-          record.put(LONG_DIM_SV1, random.nextLong());
-          record.put(DOUBLE_DIM_SV1, random.nextDouble());
-          record.put(BOOLEAN_DIM_SV1, true);
-          fileWriter.append(record);
-        }
-        ret.add(avroFile);
-      }
-    }
-    return ret;
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedBrokerQueryKillingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedBrokerQueryKillingIntegrationTest.java
deleted file mode 100644
index 4a3faeb957e..00000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedBrokerQueryKillingIntegrationTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.utils.CommonConstants.Accounting;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-public class CpuBasedBrokerQueryKillingIntegrationTest extends 
BaseQueryKillingIntegrationTest {
-
-  @Override
-  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
-    super.overrideBrokerConf(brokerConf);
-
-    String prefix = Accounting.BROKER_PREFIX + ".";
-    brokerConf.setProperty(prefix + Accounting.Keys.FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
-    brokerConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_CPU_SAMPLING, true);
-    brokerConf.setProperty(prefix + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, true);
-    brokerConf.setProperty(prefix + 
Accounting.Keys.CPU_TIME_BASED_KILLING_THRESHOLD_MS, 500);
-    brokerConf.setProperty(prefix + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, 1.1f);
-    brokerConf.setProperty(prefix + 
Accounting.Keys.PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f);
-  }
-
-  @Test
-  public void testCpuTimeKill()
-      throws Exception {
-    // Trigger broker side CPU kill with a large streaming query
-    // Do not run other queries because they can cause OOM on server first
-    setUseMultiStageQueryEngine(true);
-    verifyCpuTimeKill(LARGE_SELECT_STAR_QUERY, 
postQuery(LARGE_SELECT_STAR_QUERY));
-  }
-
-  @Test
-  public void testCpuTimeKillMultipleQueries()
-      throws Exception {
-    // Trigger broker side CPU kill with a large streaming query
-    // Do not run other queries because they can cause OOM on server first
-    setUseMultiStageQueryEngine(true);
-    AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
-    CountDownLatch countDownLatch = new CountDownLatch(3);
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    executor.submit(() -> {
-      queryResponse1.set(postQuery(LARGE_SELECT_STAR_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse2.set(postQuery(AGGREGATE_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse3.set(postQuery(SELECT_STAR_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.shutdown();
-    countDownLatch.await();
-
-    JsonNode response = queryResponse1.get();
-    verifyCpuTimeKill(LARGE_SELECT_STAR_QUERY, response);
-
-    response = queryResponse2.get();
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
AGGREGATE_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + AGGREGATE_QUERY + ", but got: " 
+ exceptionsNode);
-
-    response = queryResponse3.get();
-    exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
SELECT_STAR_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + SELECT_STAR_QUERY + ", but got: 
" + exceptionsNode);
-  }
-
-  private void verifyCpuTimeKill(String query, JsonNode response) {
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
-    assertEquals(exceptionsNode.size(), 1, "Expected 1 exception for query: " 
+ query + ", but got: " + exceptionsNode);
-    JsonNode exceptionNode = exceptionsNode.get(0);
-    JsonNode errorCodeNode = exceptionNode.get("errorCode");
-    assertNotNull(errorCodeNode, "Missing errorCode from exception: " + 
exceptionNode);
-    int errorCode = errorCodeNode.asInt();
-    assertEquals(errorCode, 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId(),
-        "Unexpected error code: " + errorCode + " from exception: " + 
exceptionNode);
-    JsonNode messageNode = exceptionNode.get("message");
-    assertNotNull(messageNode, "Missing message from exception: " + 
exceptionNode);
-    String message = messageNode.asText();
-    assertTrue(message.contains("CPU time based killed on BROKER"),
-        "Unexpected exception message: " + message + " from exception: " + 
exceptionNode);
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedServerQueryKillingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedServerQueryKillingIntegrationTest.java
deleted file mode 100644
index 7e5ade42554..00000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CpuBasedServerQueryKillingIntegrationTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.utils.CommonConstants.Accounting;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-public class CpuBasedServerQueryKillingIntegrationTest extends 
BaseQueryKillingIntegrationTest {
-
-  @Override
-  protected void overrideServerConf(PinotConfiguration serverConf) {
-    super.overrideServerConf(serverConf);
-
-    String prefix = Accounting.SERVER_PREFIX + ".";
-    serverConf.setProperty(prefix + Accounting.Keys.FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
-    serverConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_CPU_SAMPLING, true);
-    serverConf.setProperty(prefix + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, true);
-    serverConf.setProperty(prefix + 
Accounting.Keys.CPU_TIME_BASED_KILLING_THRESHOLD_MS, 500);
-    serverConf.setProperty(prefix + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, 1.1f);
-    serverConf.setProperty(prefix + 
Accounting.Keys.PANIC_LEVEL_HEAP_USAGE_RATIO, 1.1f);
-  }
-
-  @Test(dataProvider = "expensiveQueries")
-  public void testCpuTimeKill(String query)
-      throws Exception {
-    verifyCpuTimeKill(query, postQuery(query));
-    setUseMultiStageQueryEngine(true);
-    verifyCpuTimeKill(query, postQuery(query));
-  }
-
-  @Test(dataProvider = "useBothQueryEngines")
-  public void testCpuTimeKillMultipleQueries(boolean useMultiStageQueryEngine)
-      throws Exception {
-    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
-    CountDownLatch countDownLatch = new CountDownLatch(3);
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    executor.submit(() -> {
-      queryResponse1.set(postQuery(LARGE_DISTINCT_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse2.set(postQuery(AGGREGATE_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse3.set(postQuery(SELECT_STAR_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.shutdown();
-    countDownLatch.await();
-
-    JsonNode response = queryResponse1.get();
-    verifyCpuTimeKill(LARGE_DISTINCT_QUERY, response);
-
-    response = queryResponse2.get();
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
AGGREGATE_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + AGGREGATE_QUERY + ", but got: " 
+ exceptionsNode);
-
-    response = queryResponse3.get();
-    exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
SELECT_STAR_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + SELECT_STAR_QUERY + ", but got: 
" + exceptionsNode);
-  }
-
-  private void verifyCpuTimeKill(String query, JsonNode response) {
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
-    assertEquals(exceptionsNode.size(), 1, "Expected 1 exception for query: " 
+ query + ", but got: " + exceptionsNode);
-    JsonNode exceptionNode = exceptionsNode.get(0);
-    JsonNode errorCodeNode = exceptionNode.get("errorCode");
-    assertNotNull(errorCodeNode, "Missing errorCode from exception: " + 
exceptionNode);
-    int errorCode = errorCodeNode.asInt();
-    assertEquals(errorCode, 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId(),
-        "Unexpected error code: " + errorCode + " from exception: " + 
exceptionNode);
-    JsonNode messageNode = exceptionNode.get("message");
-    assertNotNull(messageNode, "Missing message from exception: " + 
exceptionNode);
-    String message = messageNode.asText();
-    assertTrue(message.contains("CPU time based killed on SERVER"),
-        "Unexpected exception message: " + message + " from exception: " + 
exceptionNode);
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MemoryBasedServerQueryKillingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MemoryBasedServerQueryKillingIntegrationTest.java
deleted file mode 100644
index fd6b2b9ed05..00000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MemoryBasedServerQueryKillingIntegrationTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.utils.CommonConstants.Accounting;
-import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-
-
-public class MemoryBasedServerQueryKillingIntegrationTest extends 
BaseQueryKillingIntegrationTest {
-
-  @Override
-  protected void overrideServerConf(PinotConfiguration serverConf) {
-    super.overrideServerConf(serverConf);
-
-    String prefix = Accounting.SERVER_PREFIX + ".";
-    serverConf.setProperty(prefix + Accounting.Keys.FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
-    serverConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_MEMORY_SAMPLING, true);
-    serverConf.setProperty(prefix + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, true);
-    serverConf.setProperty(prefix + 
Accounting.Keys.ALARMING_LEVEL_HEAP_USAGE_RATIO, 0f);
-    serverConf.setProperty(prefix + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, 0.15f);
-  }
-
-  @Test(dataProvider = "expensiveQueries")
-  public void testOomKill(String query)
-      throws Exception {
-    verifyOomKill(query, postQuery(query));
-    setUseMultiStageQueryEngine(true);
-    verifyOomKill(query, postQuery(query));
-  }
-
-  @Test(dataProvider = "useBothQueryEngines")
-  public void testOomKillMultipleQueries(boolean useMultiStageQueryEngine)
-      throws Exception {
-    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    AtomicReference<JsonNode> queryResponse1 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse2 = new AtomicReference<>();
-    AtomicReference<JsonNode> queryResponse3 = new AtomicReference<>();
-    CountDownLatch countDownLatch = new CountDownLatch(3);
-    ExecutorService executor = Executors.newFixedThreadPool(3);
-    executor.submit(() -> {
-      queryResponse1.set(postQuery(LARGE_DISTINCT_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse2.set(postQuery(AGGREGATE_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.submit(() -> {
-      queryResponse3.set(postQuery(SELECT_STAR_QUERY));
-      countDownLatch.countDown();
-      return null;
-    });
-    executor.shutdown();
-    countDownLatch.await();
-
-    JsonNode response = queryResponse1.get();
-    verifyOomKill(LARGE_DISTINCT_QUERY, response);
-
-    response = queryResponse2.get();
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
AGGREGATE_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + AGGREGATE_QUERY + ", but got: " 
+ exceptionsNode);
-
-    response = queryResponse3.get();
-    exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + 
SELECT_STAR_QUERY);
-    assertTrue(exceptionsNode.isEmpty(),
-        "Expected no exceptions for query: " + SELECT_STAR_QUERY + ", but got: 
" + exceptionsNode);
-  }
-
-  private void verifyOomKill(String query, JsonNode response) {
-    JsonNode exceptionsNode = response.get("exceptions");
-    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
-    assertEquals(exceptionsNode.size(), 1, "Expected 1 exception for query: " 
+ query + ", but got: " + exceptionsNode);
-    JsonNode exceptionNode = exceptionsNode.get(0);
-    JsonNode errorCodeNode = exceptionNode.get("errorCode");
-    assertNotNull(errorCodeNode, "Missing errorCode from exception: " + 
exceptionNode);
-    int errorCode = errorCodeNode.asInt();
-    assertEquals(errorCode, 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId(),
-        "Unexpected error code: " + errorCode + " from exception: " + 
exceptionNode);
-    JsonNode messageNode = exceptionNode.get("message");
-    assertNotNull(messageNode, "Missing message from exception: " + 
exceptionNode);
-    String message = messageNode.asText();
-    assertTrue(message.contains("OOM killed on SERVER"),
-        "Unexpected exception message: " + message + " from exception: " + 
exceptionNode);
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryKillingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryKillingIntegrationTest.java
new file mode 100644
index 00000000000..45a1c4577f1
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/QueryKillingIntegrationTest.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import org.apache.pinot.core.accounting.ResourceUsageAccountantFactory;
+import 
org.apache.pinot.core.accounting.ResourceUsageAccountantFactory.ResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.spi.utils.CommonConstants.Server;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+/// Single integration test class that tests all query killing modes (CPU 
broker, CPU server, memory server)
+/// using one shared cluster. Tests run in priority order, each progressively 
enabling its kill mode via
+/// cluster config updates and disabling the previous test's mode.
+public class QueryKillingIntegrationTest extends BaseClusterIntegrationTest {
+  private static final String STRING_DIM_SV1 = "stringDimSV1";
+  private static final String STRING_DIM_SV2 = "stringDimSV2";
+  private static final String INT_DIM_SV1 = "intDimSV1";
+  private static final String LONG_DIM_SV1 = "longDimSV1";
+  private static final String DOUBLE_DIM_SV1 = "doubleDimSV1";
+  private static final String BOOLEAN_DIM_SV1 = "booleanDimSV1";
+  private static final int NUM_SEGMENTS = 3;
+  private static final int NUM_DOCS_PER_SEGMENT = 3_000_000;
+
+  private static final String LARGE_SELECT_STAR_QUERY = "SELECT * FROM mytable 
LIMIT 9000000";
+
+  private static final String LARGE_DISTINCT_QUERY =
+      "SELECT DISTINCT stringDimSV2 FROM mytable ORDER BY stringDimSV2 LIMIT 
3000000";
+
+  private static final String LARGE_GROUP_BY_QUERY =
+      "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM mytable 
GROUP BY 2 ORDER BY 1 LIMIT 3000000";
+
+  /// SafeTrim sort-aggregation case, pair-wise combine
+  private static final String LARGE_GROUP_BY_PAIRWISE_COMBINE_QUERY =
+      "SET sortAggregateSingleThreadedNumSegmentsThreshold=1; SET 
sortAggregateLimitThreshold=3000001; "
+          + "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM 
mytable GROUP BY 2 ORDER BY 2 LIMIT 3000000";
+
+  /// SafeTrim sort-aggregation case, sequential combine
+  private static final String LARGE_GROUP_BY_SEQUENTIAL_COMBINE_QUERY =
+      "SET sortAggregateSingleThreadedNumSegmentsThreshold=10000; SET 
sortAggregateLimitThreshold=3000001; "
+          + "SELECT DISTINCT_COUNT_HLL(intDimSV1, 14), stringDimSV2 FROM 
mytable GROUP BY 2 ORDER BY 2 LIMIT 3000000";
+
+  private static final String AGGREGATE_QUERY = "SELECT MIN(intDimSV1) FROM 
mytable";
+  private static final String SELECT_STAR_QUERY = "SELECT * FROM mytable LIMIT 
5";
+
+  private static final String[] EXPENSIVE_QUERIES = {
+      LARGE_SELECT_STAR_QUERY, LARGE_DISTINCT_QUERY, LARGE_GROUP_BY_QUERY, 
LARGE_GROUP_BY_PAIRWISE_COMBINE_QUERY,
+      LARGE_GROUP_BY_SEQUENTIAL_COMBINE_QUERY
+  };
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    // Create and upload the schema and table config
+    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_TABLE_NAME)
+        .addSingleValueDimension(STRING_DIM_SV1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_DIM_SV2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(INT_DIM_SV1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(LONG_DIM_SV1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(DOUBLE_DIM_SV1, FieldSpec.DataType.DOUBLE)
+        .addSingleValueDimension(BOOLEAN_DIM_SV1, FieldSpec.DataType.BOOLEAN)
+        .build();
+    addSchema(schema);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).build();
+    addTableConfig(tableConfig);
+
+    List<File> avroFiles = createAvroFiles();
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    stopServer();
+    stopBroker();
+    stopController();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    
brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
+    
brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+
+    // Enable sampling and factory at startup. Kill modes are toggled 
dynamically per test.
+    String prefix = Accounting.BROKER_PREFIX + ".";
+    brokerConf.setProperty(prefix + Accounting.Keys.FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
+    brokerConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_CPU_SAMPLING, true);
+    brokerConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_MEMORY_SAMPLING, true);
+    brokerConf.setProperty(prefix + 
Accounting.Keys.QUERY_KILLED_METRIC_ENABLED, true);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    
serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, 
true);
+    
serverConf.setProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT,
 true);
+    serverConf.setProperty(Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT, 
Integer.MAX_VALUE);
+
+    String prefix = Accounting.SERVER_PREFIX + ".";
+    serverConf.setProperty(prefix + Accounting.Keys.FACTORY_NAME, 
ResourceUsageAccountantFactory.class.getName());
+    serverConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_CPU_SAMPLING, true);
+    serverConf.setProperty(prefix + 
Accounting.Keys.ENABLE_THREAD_MEMORY_SAMPLING, true);
+    serverConf.setProperty(prefix + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, true);
+    serverConf.setProperty(prefix + 
Accounting.Keys.QUERY_KILLED_METRIC_ENABLED, true);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return NUM_SEGMENTS * NUM_DOCS_PER_SEGMENT;
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: default accountant values after startup
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  public void testDefaultValues() {
+    assertTrue(ThreadResourceUsageProvider.isThreadMemoryMeasurementEnabled());
+
+    ThreadAccountant brokerAccountant = 
_brokerStarters.get(0).getThreadAccountant();
+    assertTrue(brokerAccountant instanceof ResourceUsageAccountant);
+    QueryMonitorConfig brokerConfig = ((ResourceUsageAccountant) 
brokerAccountant).getQueryMonitorConfig();
+    assertFalse(brokerConfig.isOomKillQueryEnabled());
+    assertTrue(brokerConfig.isQueryKilledMetricEnabled());
+
+    ThreadAccountant serverAccountant = 
_serverStarters.get(0).getServerInstance().getThreadAccountant();
+    assertTrue(serverAccountant instanceof ResourceUsageAccountant);
+    QueryMonitorConfig serverConfig = ((ResourceUsageAccountant) 
serverAccountant).getQueryMonitorConfig();
+    assertTrue(serverConfig.isOomKillQueryEnabled());
+    assertTrue(serverConfig.isQueryKilledMetricEnabled());
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: resource usage stats (no kill mode needed, just verifies sampling 
works)
+  // 
---------------------------------------------------------------------------
+
+  @Test
+  public void testResourceUsageStats()
+      throws Exception {
+    JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
+    long offlineThreadMemAllocatedBytes = 
queryResponse.get("offlineThreadMemAllocatedBytes").asLong();
+    long offlineResponseSerMemAllocatedBytes = 
queryResponse.get("offlineResponseSerMemAllocatedBytes").asLong();
+    long offlineTotalMemAllocatedBytes = 
queryResponse.get("offlineTotalMemAllocatedBytes").asLong();
+
+    assertTrue(offlineThreadMemAllocatedBytes > 0);
+    assertTrue(offlineResponseSerMemAllocatedBytes > 0);
+    assertEquals(offlineThreadMemAllocatedBytes + 
offlineResponseSerMemAllocatedBytes, offlineTotalMemAllocatedBytes);
+
+    long offlineThreadCpuTimeNs = 
queryResponse.get("offlineThreadCpuTimeNs").asLong();
+    long offlineSystemActivitiesCpuTimeNs = 
queryResponse.get("offlineSystemActivitiesCpuTimeNs").asLong();
+    long offlineResponseSerializationCpuTimeNs = 
queryResponse.get("offlineResponseSerializationCpuTimeNs").asLong();
+    long offlineTotalCpuTimeNs = 
queryResponse.get("offlineTotalCpuTimeNs").asLong();
+    assertTrue(offlineThreadCpuTimeNs > 0);
+    assertTrue(offlineSystemActivitiesCpuTimeNs > 0);
+    assertTrue(offlineResponseSerializationCpuTimeNs > 0);
+    assertEquals(offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + 
offlineResponseSerializationCpuTimeNs,
+        offlineTotalCpuTimeNs);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: CPU-based broker query killing
+  // 
---------------------------------------------------------------------------
+
+  @Test(dependsOnMethods = {"testDefaultValues", "testResourceUsageStats"})
+  public void testCpuBasedBrokerQueryKilling()
+      throws Exception {
+    // Set server heap ratios high to prevent server OOM from killing the 
query first
+    updateClusterConfig(Map.of(
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, "true",
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_THRESHOLD_MS, "500",
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, "1.1",
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.PANIC_LEVEL_HEAP_USAGE_RATIO, "1.1",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, "1.1",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.PANIC_LEVEL_HEAP_USAGE_RATIO, "1.1"
+    ));
+    TestUtils.waitForCondition(
+        aVoid -> 
getBrokerResourceUsageAccountant().getQueryMonitorConfig().isCpuTimeBasedKillingEnabled(),
 10_000L,
+        "Failed to enable CPU killing on broker");
+
+    // Single query kill — use MSE only (other engines can cause OOM on server 
first)
+    setUseMultiStageQueryEngine(true);
+    verifyCpuTimeKill(LARGE_SELECT_STAR_QUERY, 
postQuery(LARGE_SELECT_STAR_QUERY), "BROKER");
+
+    // Multiple concurrent queries kill
+    JsonNode[] responses = runConcurrentQueries(LARGE_SELECT_STAR_QUERY, 
AGGREGATE_QUERY, SELECT_STAR_QUERY);
+    verifyCpuTimeKill(LARGE_SELECT_STAR_QUERY, responses[0], "BROKER");
+    verifyNoExceptions(AGGREGATE_QUERY, responses[1]);
+    verifyNoExceptions(SELECT_STAR_QUERY, responses[2]);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: CPU-based server query killing
+  // 
---------------------------------------------------------------------------
+
+  @Test(dependsOnMethods = "testCpuBasedBrokerQueryKilling")
+  public void testCpuBasedServerQueryKilling()
+      throws Exception {
+    // Disable broker CPU killing from previous test, enable server CPU killing
+    setUseMultiStageQueryEngine(false);
+    updateClusterConfig(Map.of(
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, "false",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, "true",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_THRESHOLD_MS, "500",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, "1.1",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.PANIC_LEVEL_HEAP_USAGE_RATIO, "1.1"
+    ));
+    TestUtils.waitForCondition(aVoid -> {
+      QueryMonitorConfig brokerConfig = 
getBrokerResourceUsageAccountant().getQueryMonitorConfig();
+      QueryMonitorConfig serverConfig = 
getServerResourceUsageAccountant().getQueryMonitorConfig();
+      return !brokerConfig.isCpuTimeBasedKillingEnabled() && 
serverConfig.isCpuTimeBasedKillingEnabled();
+    }, 10_000L, "Failed to switch from broker to server CPU killing");
+
+    // Single query kill with each expensive query, both SSE and MSE
+    for (String query : EXPENSIVE_QUERIES) {
+      verifyCpuTimeKill(query, postQuery(query), "SERVER");
+      setUseMultiStageQueryEngine(true);
+      verifyCpuTimeKill(query, postQuery(query), "SERVER");
+      setUseMultiStageQueryEngine(false);
+    }
+
+    // Multiple concurrent queries kill, both SSE and MSE
+    for (boolean useMSE : new boolean[]{false, true}) {
+      setUseMultiStageQueryEngine(useMSE);
+      JsonNode[] responses = runConcurrentQueries(LARGE_DISTINCT_QUERY, 
AGGREGATE_QUERY, SELECT_STAR_QUERY);
+      verifyCpuTimeKill(LARGE_DISTINCT_QUERY, responses[0], "SERVER");
+      verifyNoExceptions(AGGREGATE_QUERY, responses[1]);
+      verifyNoExceptions(SELECT_STAR_QUERY, responses[2]);
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: memory-based (OOM) server query killing
+  // 
---------------------------------------------------------------------------
+
+  @Test(dependsOnMethods = "testCpuBasedServerQueryKilling")
+  public void testMemoryBasedServerQueryKilling()
+      throws Exception {
+    // Disable server CPU killing from previous test, enable server OOM killing
+    setUseMultiStageQueryEngine(false);
+    updateClusterConfig(Map.of(
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CPU_TIME_BASED_KILLING_ENABLED, "false",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "true",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.ALARMING_LEVEL_HEAP_USAGE_RATIO, "0",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, "0.15"
+    ));
+    TestUtils.waitForCondition(aVoid -> {
+      QueryMonitorConfig serverConfig = 
getServerResourceUsageAccountant().getQueryMonitorConfig();
+      return !serverConfig.isCpuTimeBasedKillingEnabled() && 
serverConfig.isOomKillQueryEnabled();
+    }, 10_000L, "Failed to switch from CPU to OOM killing on server");
+
+    // Single query OOM kill with each expensive query, both SSE and MSE
+    for (String query : EXPENSIVE_QUERIES) {
+      verifyOomKill(query, postQuery(query));
+      setUseMultiStageQueryEngine(true);
+      verifyOomKill(query, postQuery(query));
+      setUseMultiStageQueryEngine(false);
+    }
+
+    // Multiple concurrent queries OOM kill, both SSE and MSE
+    for (boolean useMSE : new boolean[]{false, true}) {
+      setUseMultiStageQueryEngine(useMSE);
+      JsonNode[] responses = runConcurrentQueries(LARGE_DISTINCT_QUERY, 
AGGREGATE_QUERY, SELECT_STAR_QUERY);
+      verifyOomKill(LARGE_DISTINCT_QUERY, responses[0]);
+      verifyNoExceptions(AGGREGATE_QUERY, responses[1]);
+      verifyNoExceptions(SELECT_STAR_QUERY, responses[2]);
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: resource usage stats after all killing tests (verifies accountant 
is not degraded)
+  // 
---------------------------------------------------------------------------
+
+  @Test(dependsOnMethods = "testMemoryBasedServerQueryKilling")
+  public void testResourceUsageStatsAfterKilling()
+      throws Exception {
+    // Disable OOM killing from previous test
+    setUseMultiStageQueryEngine(false);
+    updateClusterConfig(Map.of(
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "false",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.ALARMING_LEVEL_HEAP_USAGE_RATIO, "0.75",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.CRITICAL_LEVEL_HEAP_USAGE_RATIO, "0.96"
+    ));
+    TestUtils.waitForCondition(
+        aVoid -> 
!getServerResourceUsageAccountant().getQueryMonitorConfig().isOomKillQueryEnabled(),
 10_000L,
+        "Failed to disable OOM killing on server");
+
+    JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
+    long offlineThreadMemAllocatedBytes = 
queryResponse.get("offlineThreadMemAllocatedBytes").asLong();
+    long offlineResponseSerMemAllocatedBytes = 
queryResponse.get("offlineResponseSerMemAllocatedBytes").asLong();
+    long offlineTotalMemAllocatedBytes = 
queryResponse.get("offlineTotalMemAllocatedBytes").asLong();
+
+    assertTrue(offlineThreadMemAllocatedBytes > 0,
+        "offlineThreadMemAllocatedBytes should be > 0 after killing tests");
+    assertTrue(offlineResponseSerMemAllocatedBytes > 0,
+        "offlineResponseSerMemAllocatedBytes should be > 0 after killing 
tests");
+    assertEquals(offlineThreadMemAllocatedBytes + 
offlineResponseSerMemAllocatedBytes, offlineTotalMemAllocatedBytes);
+
+    long offlineThreadCpuTimeNs = 
queryResponse.get("offlineThreadCpuTimeNs").asLong();
+    long offlineSystemActivitiesCpuTimeNs = 
queryResponse.get("offlineSystemActivitiesCpuTimeNs").asLong();
+    long offlineResponseSerializationCpuTimeNs = 
queryResponse.get("offlineResponseSerializationCpuTimeNs").asLong();
+    long offlineTotalCpuTimeNs = 
queryResponse.get("offlineTotalCpuTimeNs").asLong();
+    assertTrue(offlineThreadCpuTimeNs > 0,
+        "offlineThreadCpuTimeNs should be > 0 after killing tests");
+    assertTrue(offlineSystemActivitiesCpuTimeNs > 0,
+        "offlineSystemActivitiesCpuTimeNs should be > 0 after killing tests");
+    assertTrue(offlineResponseSerializationCpuTimeNs > 0,
+        "offlineResponseSerializationCpuTimeNs should be > 0 after killing 
tests");
+    assertEquals(offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + 
offlineResponseSerializationCpuTimeNs,
+        offlineTotalCpuTimeNs);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Test: dynamic toggling of query kill via cluster config
+  // 
---------------------------------------------------------------------------
+
+  @Test(dependsOnMethods = "testResourceUsageStatsAfterKilling")
+  public void testDynamicallyToggleQueryKill()
+      throws Exception {
+    ResourceUsageAccountant brokerAccountant =
+        (ResourceUsageAccountant) _brokerStarters.get(0).getThreadAccountant();
+    ResourceUsageAccountant serverAccountant =
+        (ResourceUsageAccountant) 
_serverStarters.get(0).getServerInstance().getThreadAccountant();
+
+    // Ensure initial state: broker OOM disabled, server OOM enabled
+    updateClusterConfig(Map.of(
+        Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "false",
+        Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "true"
+    ));
+    TestUtils.waitForCondition(aVoid -> 
!brokerAccountant.getQueryMonitorConfig().isOomKillQueryEnabled()
+            && 
serverAccountant.getQueryMonitorConfig().isOomKillQueryEnabled(), 10_000L,
+        "Failed to set initial OOM state");
+
+    // Role-specific prefix should be dynamically applied
+    updateClusterConfig(
+        Map.of(Accounting.BROKER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "true"));
+    TestUtils.waitForCondition(aVoid -> 
brokerAccountant.getQueryMonitorConfig().isOomKillQueryEnabled(), 10_000L,
+        "Failed to enable broker side query kill dynamically");
+    updateClusterConfig(
+        Map.of(Accounting.SERVER_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "false"));
+    TestUtils.waitForCondition(aVoid -> 
!serverAccountant.getQueryMonitorConfig().isOomKillQueryEnabled(), 10_000L,
+        "Failed to disable server side query kill dynamically");
+
+    // Common prefix should not override role-specific prefix
+    updateClusterConfig(
+        Map.of(Accounting.COMMON_PREFIX + "." + 
Accounting.Keys.OOM_PROTECTION_KILLING_QUERY, "false"));
+    
assertTrue(brokerAccountant.getQueryMonitorConfig().isOomKillQueryEnabled());
+    
assertFalse(serverAccountant.getQueryMonitorConfig().isOomKillQueryEnabled());
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Helpers
+  // 
---------------------------------------------------------------------------
+
+  private ResourceUsageAccountant getBrokerResourceUsageAccountant() {
+    return (ResourceUsageAccountant) 
_brokerStarters.get(0).getThreadAccountant();
+  }
+
+  private ResourceUsageAccountant getServerResourceUsageAccountant() {
+    return (ResourceUsageAccountant) 
_serverStarters.get(0).getServerInstance().getThreadAccountant();
+  }
+
+  private JsonNode[] runConcurrentQueries(String expensiveQuery, String 
cheapQuery1, String cheapQuery2)
+      throws Exception {
+    AtomicReference<JsonNode> response1 = new AtomicReference<>();
+    AtomicReference<JsonNode> response2 = new AtomicReference<>();
+    AtomicReference<JsonNode> response3 = new AtomicReference<>();
+    CountDownLatch latch = new CountDownLatch(3);
+    ExecutorService executor = Executors.newFixedThreadPool(3);
+    executor.submit(() -> {
+      response1.set(postQuery(expensiveQuery));
+      latch.countDown();
+      return null;
+    });
+    executor.submit(() -> {
+      response2.set(postQuery(cheapQuery1));
+      latch.countDown();
+      return null;
+    });
+    executor.submit(() -> {
+      response3.set(postQuery(cheapQuery2));
+      latch.countDown();
+      return null;
+    });
+    executor.shutdown();
+    latch.await();
+    return new JsonNode[]{response1.get(), response2.get(), response3.get()};
+  }
+
+  private void verifyCpuTimeKill(String query, JsonNode response, String 
killedOn) {
+    JsonNode exceptionsNode = response.get("exceptions");
+    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
+    assertEquals(exceptionsNode.size(), 1, "Expected 1 exception for query: " 
+ query + ", but got: " + exceptionsNode);
+    JsonNode exceptionNode = exceptionsNode.get(0);
+    JsonNode errorCodeNode = exceptionNode.get("errorCode");
+    assertNotNull(errorCodeNode, "Missing errorCode from exception: " + 
exceptionNode);
+    int errorCode = errorCodeNode.asInt();
+    assertEquals(errorCode, 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId(),
+        "Unexpected error code: " + errorCode + " from exception: " + 
exceptionNode);
+    JsonNode messageNode = exceptionNode.get("message");
+    assertNotNull(messageNode, "Missing message from exception: " + 
exceptionNode);
+    String message = messageNode.asText();
+    assertTrue(message.contains("CPU time based killed on " + killedOn),
+        "Unexpected exception message: " + message + " from exception: " + 
exceptionNode);
+  }
+
+  private void verifyOomKill(String query, JsonNode response) {
+    JsonNode exceptionsNode = response.get("exceptions");
+    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
+    assertEquals(exceptionsNode.size(), 1, "Expected 1 exception for query: " 
+ query + ", but got: " + exceptionsNode);
+    JsonNode exceptionNode = exceptionsNode.get(0);
+    JsonNode errorCodeNode = exceptionNode.get("errorCode");
+    assertNotNull(errorCodeNode, "Missing errorCode from exception: " + 
exceptionNode);
+    int errorCode = errorCodeNode.asInt();
+    assertEquals(errorCode, 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId(),
+        "Unexpected error code: " + errorCode + " from exception: " + 
exceptionNode);
+    JsonNode messageNode = exceptionNode.get("message");
+    assertNotNull(messageNode, "Missing message from exception: " + 
exceptionNode);
+    String message = messageNode.asText();
+    assertTrue(message.contains("OOM killed on SERVER"),
+        "Unexpected exception message: " + message + " from exception: " + 
exceptionNode);
+  }
+
+  private void verifyNoExceptions(String query, JsonNode response) {
+    JsonNode exceptionsNode = response.get("exceptions");
+    assertNotNull(exceptionsNode, "Missing exceptions for query: " + query);
+    assertTrue(exceptionsNode.isEmpty(),
+        "Expected no exceptions for query: " + query + ", but got: " + 
exceptionsNode);
+  }
+
+  private List<File> createAvroFiles()
+      throws IOException {
+    // Create Avro schema
+    org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord("myRecord", null, null, false);
+    avroSchema.setFields(List.of(new Field(STRING_DIM_SV1, 
org.apache.avro.Schema.create(Type.STRING), null, null),
+        new Field(STRING_DIM_SV2, org.apache.avro.Schema.create(Type.STRING), 
null, null),
+        new Field(INT_DIM_SV1, org.apache.avro.Schema.create(Type.INT), null, 
null),
+        new Field(LONG_DIM_SV1, org.apache.avro.Schema.create(Type.LONG), 
null, null),
+        new Field(DOUBLE_DIM_SV1, org.apache.avro.Schema.create(Type.DOUBLE), 
null, null),
+        new Field(BOOLEAN_DIM_SV1, 
org.apache.avro.Schema.create(Type.BOOLEAN), null, null)));
+
+    // Create Avro files
+    List<File> ret = new ArrayList<>();
+    for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) {
+      File avroFile = new File(_tempDir, "data_" + segmentId + ".avro");
+      try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+        fileWriter.create(avroSchema, avroFile);
+
+        int randBound = NUM_DOCS_PER_SEGMENT / 2;
+        Random random = new Random(0);
+        for (int docId = 0; docId < NUM_DOCS_PER_SEGMENT; docId++) {
+          GenericData.Record record = new GenericData.Record(avroSchema);
+          record.put(STRING_DIM_SV1, "test query killing");
+          record.put(STRING_DIM_SV2, "test query killing" + docId);
+          record.put(INT_DIM_SV1, random.nextInt(randBound));
+          record.put(LONG_DIM_SV1, random.nextLong());
+          record.put(DOUBLE_DIM_SV1, random.nextDouble());
+          record.put(BOOLEAN_DIM_SV1, true);
+          fileWriter.append(record);
+        }
+        ret.add(avroFile);
+      }
+    }
+    return ret;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to