tarun11Mavani commented on code in PR #18102:
URL: https://github.com/apache/pinot/pull/18102#discussion_r3108405204


##########
pinot-core/src/main/java/org/apache/pinot/core/query/killing/QueryKillingManager.java:
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.killing;
+
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.accounting.QueryMonitorConfig;
+import 
org.apache.pinot.core.query.killing.strategy.ScanEntriesThresholdStrategy;
+import org.apache.pinot.spi.config.table.QueryConfig;
+import org.apache.pinot.spi.query.QueryExecutionContext;
+import org.apache.pinot.spi.query.QueryScanCostContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Central manager for scan-based query killing. Owns the guard rails and 
delegates the
+ * actual kill decision to a {@link QueryKillingStrategy}.
+ *
+ * The default factory is {@link ScanEntriesThresholdStrategy.Factory}, which 
reads
+ * scan thresholds from {@link QueryMonitorConfig}. Custom factories can be 
configured
+ * via {@code accounting.scan.based.killing.strategy.factory.class.name}.
+ *
+ */
+public class QueryKillingManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryKillingManager.class);
+
+  private static volatile QueryKillingManager _instance;
+
+  private final AtomicReference<QueryMonitorConfig> _configRef;
+  private final ServerMetrics _serverMetrics;
+
+  /**
+   * Null if: killing is disabled, config is insufficient, or factory failed 
to load.
+   * Rebuilt when config changes via {@link #rebuildStrategy()}.
+   */
+  @Nullable
+  private volatile QueryKillingStrategy _strategy;
+
+  public QueryKillingManager(AtomicReference<QueryMonitorConfig> configRef, 
ServerMetrics serverMetrics) {
+    _configRef = configRef;
+    _serverMetrics = serverMetrics;
+  }
+
+  /**
+   * Initializes the singleton instance and builds the strategy from config.
+   * Called once during server startup.
+   */
+  public static void init(AtomicReference<QueryMonitorConfig> configRef, 
ServerMetrics serverMetrics) {
+    QueryKillingManager manager = new QueryKillingManager(configRef, 
serverMetrics);
+    manager.rebuildStrategy();
+    _instance = manager;
+  }
+
+  @Nullable
+  public static QueryKillingManager getInstance() {
+    return _instance;
+  }
+
+  /**
+   * Rebuilds the strategy from the current config. Called at init and when
+   * cluster config changes (via the same onChange path that rebuilds 
QueryMonitorConfig).
+   */
+  public void rebuildStrategy() {
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      _strategy = null;
+      return;
+    }
+
+    try {
+      QueryKillingStrategyFactory factory = loadFactory(config);
+      _strategy = factory.create(config);
+      if (_strategy == null) {
+        LOGGER.warn("Scan-based killing is enabled but strategy factory '{}' 
returned null — "
+            + "required configuration may be missing. Scan-based killing will 
be effectively disabled.",
+            factory.getName());
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to initialize scan-based killing strategy. "
+          + "Scan-based killing will be disabled.", e);
+      _strategy = null;
+    }
+  }
+
+  /**
+   * Loads the strategy factory from config. If a custom factory class name is 
configured,
+   * loads it by reflection (following the same pattern as {@code 
ThreadAccountantUtils.createAccountant()}).
+   * Otherwise, returns the default {@link 
ScanEntriesThresholdStrategy.Factory}.
+   */
+  private QueryKillingStrategyFactory loadFactory(QueryMonitorConfig config) {
+    String factoryClassName = 
config.getScanBasedKillingStrategyFactoryClassName();
+    if (factoryClassName != null && !factoryClassName.isEmpty()) {
+      LOGGER.info("Loading custom query killing strategy factory: {}", 
factoryClassName);
+      try {
+        return (QueryKillingStrategyFactory) Class.forName(factoryClassName)
+            .getDeclaredConstructor().newInstance();
+      } catch (Exception e) {
+        LOGGER.error("Failed to load custom strategy factory '{}', falling 
back to default",
+            factoryClassName, e);
+      }
+    }
+    return new ScanEntriesThresholdStrategy.Factory();
+  }
+
+  /**
+   * Returns the active strategy. Visible for testing.
+   */
+  @Nullable
+  public QueryKillingStrategy getActiveStrategy() {
+    return _strategy;
+  }
+
+  /**
+   * Evaluates whether the query should be killed based on the active strategy.
+   *
+   * <p>Calls {@link QueryKillingStrategy#forQuery(QueryConfig, 
QueryMonitorConfig)}
+   * to resolve table-level overrides before evaluating.</p>
+   */
+  public void checkAndKillIfNeeded(QueryExecutionContext executionContext,
+      QueryScanCostContext scanCostContext, String queryId, String tableName,
+      @Nullable QueryConfig queryConfig) {
+    // no strategy means killing is disabled or unconfigured
+    QueryKillingStrategy strategy = _strategy;
+    if (strategy == null) {
+      return;
+    }
+
+    QueryMonitorConfig config = _configRef.get();
+    if (config == null || !config.isScanBasedKillingEnabled()) {
+      return;
+    }
+
+    // Prevent duplicate kills
+    if (executionContext.getTerminateException() != null) {
+      return;
+    }
+
+    try {
+      // Resolve per-query table overrides (returns same instance if no 
overrides)
+      QueryKillingStrategy queryStrategy = strategy.forQuery(queryConfig, 
config);
+
+      String configSource = (queryStrategy != strategy) ? "table:" + tableName 
: "cluster";
+
+      // Delegate to strategy
+      if (!queryStrategy.shouldTerminate(scanCostContext)) {
+        return;
+      }
+
+      QueryKillReport report = queryStrategy.buildKillReport(
+          scanCostContext, queryId, tableName, configSource);
+
+      if (config.isScanBasedKillingLogOnly()) {
+        LOGGER.info("Query killed in LogOnly mode: {}", 
report.toInternalLogMessage());
+        
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN_DRY_RUN, 
1);
+        return;
+      }
+
+      LOGGER.warn("Query Killed in enforce mode: {}", 
report.toInternalLogMessage());
+      executionContext.terminate(queryStrategy.getErrorCode(), 
report.toCustomerMessage());
+      _serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES_KILLED_SCAN, 1);
+    } catch (Exception e) {
+      LOGGER.error("Error in scan-based killing evaluation for query {}", 
queryId, e);

Review Comment:
   :nit: We can add a metric here. 



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java:
##########


Review Comment:
   Side q: Are we honoring this flag during emitting the metics in future PRs? 
worth a look.



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java:
##########
@@ -20,11 +20,21 @@
 
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class QueryMonitorConfig {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMonitorConfig.class);
+
+  private static final Set<String> VALID_SCAN_KILLING_MODES = Set.of(

Review Comment:
   Can we replace this and `_scanBasedKillingMode` with  `public enum 
ScanKillingMode { DISABLED, LOG_ONLY, ENFORCE }` 



##########
pinot-core/src/main/java/org/apache/pinot/core/accounting/QueryMonitorConfig.java:
##########
@@ -280,6 +316,72 @@ public QueryMonitorConfig(PinotConfiguration config, long 
maxHeapSize) {
     } else {
       _workloadCostEnforcementEnabled = 
oldConfig._workloadCostEnforcementEnabled;
     }
+
+    if 
(changedConfigs.contains(CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE))
 {
+      _scanBasedKillingMode = 
validateScanKillingMode(clusterConfigs.getOrDefault(
+          CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MODE,
+          CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MODE));
+    } else {
+      _scanBasedKillingMode = oldConfig.getScanBasedKillingMode();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER))
 {
+      _scanBasedKillingMaxEntriesScannedInFilter = Long.parseLong(
+          clusterConfigs.getOrDefault(
+              
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER,
+              String.valueOf(
+                  
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_IN_FILTER)));
+    } else {
+      _scanBasedKillingMaxEntriesScannedInFilter =
+          oldConfig.getScanBasedKillingMaxEntriesScannedInFilter();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED)) {
+      _scanBasedKillingMaxDocsScanned = Long.parseLong(
+          clusterConfigs.getOrDefault(
+              
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_DOCS_SCANNED,
+              
String.valueOf(CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_DOCS_SCANNED)));
+    } else {
+      _scanBasedKillingMaxDocsScanned = 
oldConfig.getScanBasedKillingMaxDocsScanned();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER))
 {
+      _scanBasedKillingMaxEntriesScannedPostFilter = Long.parseLong(
+          clusterConfigs.getOrDefault(
+              
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER,
+              String.valueOf(
+                  
CommonConstants.Accounting.DEFAULT_SCAN_BASED_KILLING_MAX_ENTRIES_SCANNED_POST_FILTER)));
+    } else {
+      _scanBasedKillingMaxEntriesScannedPostFilter =
+          oldConfig.getScanBasedKillingMaxEntriesScannedPostFilter();
+    }
+
+    if (changedConfigs.contains(
+        
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME))
 {
+      _scanBasedKillingStrategyFactoryClassName = clusterConfigs.getOrDefault(
+          
CommonConstants.Accounting.CONFIG_OF_SCAN_BASED_KILLING_STRATEGY_FACTORY_CLASS_NAME,
 null);
+    } else {
+      _scanBasedKillingStrategyFactoryClassName = 
oldConfig.getScanBasedKillingStrategyFactoryClassName();
+    }
+  }
+
+  /**
+   * Validates the scan-based killing mode. If the value is not one of the 
recognized modes
+   * (disabled, logOnly, enforce), logs an error and falls back to "disabled" 
so the server
+   * continues to start normally.
+   */
+  private static String validateScanKillingMode(String mode) {

Review Comment:
   This can be removed if we use enum. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to