Copilot commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2230255604


##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,690 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
+ * - Comprehensive metrics publishing for monitoring and alerting:
+ *   * Current queue size (real-time)
+ *   * Total tasks queued (cumulative counter)
+ *   * Total tasks processed from queue (cumulative counter)
+ *   * Total tasks timed out in queue (cumulative counter)
+ *   * Total tasks canceled during shutdown (cumulative counter)
+ *
+ * Metrics Integration:
+ * - Automatically registers gauges with ServerMetrics when available
+ * - Uses reflection to avoid hard dependencies on metrics framework
+ * - Gracefully degrades if ServerMetrics is not available
+ * - Provides manual registration method for delayed initialization
+ * - Cleans up metrics during shutdown
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _shutdownCanceledTaskCount = new 
AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    // Register metrics for tracking queue statistics
+    registerMetrics();
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
default timeout: {}ms, "
+            + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs, 
monitorIntervalMs);
+  }
+
+  /**
+   * Register metrics to track queue statistics if ServerMetrics is available
+   */
+  private void registerMetrics() {
+    try {
+      // Try to get ServerMetrics instance if it's available
+      Class<?> serverMetricsClass = 
Class.forName("org.apache.pinot.common.metrics.ServerMetrics");
+      Object serverMetrics = serverMetricsClass.getMethod("get").invoke(null);
+
+      // Check if it's not the NOOP instance by checking if it's registered
+      if (serverMetrics != null) {
+        // Get the ServerGauge enum class
+        Class<?> serverGaugeClass = 
Class.forName("org.apache.pinot.common.metrics.ServerGauge");
+
+        // Register gauges for each metric
+        registerGauge(serverMetrics, serverGaugeClass, 
"THROTTLE_EXECUTOR_QUEUE_SIZE",
+            () -> (long) _taskQueue.size());
+        registerGauge(serverMetrics, serverGaugeClass, 
"THROTTLE_EXECUTOR_QUEUED_TASKS_TOTAL",
+            () -> (long) _queuedTaskCount.get());
+        registerGauge(serverMetrics, serverGaugeClass, 
"THROTTLE_EXECUTOR_PROCESSED_TASKS_TOTAL",
+            () -> (long) _processedTaskCount.get());
+        registerGauge(serverMetrics, serverGaugeClass, 
"THROTTLE_EXECUTOR_TIMED_OUT_TASKS_TOTAL",
+            () -> (long) _timedOutTaskCount.get());
+        registerGauge(serverMetrics, serverGaugeClass, 
"THROTTLE_EXECUTOR_SHUTDOWN_CANCELED_TASKS_TOTAL",
+            () -> (long) _shutdownCanceledTaskCount.get());
+
+        LOGGER.info("Successfully registered 
ThrottleOnCriticalHeapUsageExecutor metrics");
+      }
+    } catch (Exception e) {
+      // ServerMetrics may not be available or registered yet, which is fine
+      LOGGER.debug("ServerMetrics not available for 
ThrottleOnCriticalHeapUsageExecutor metrics registration: {}",
+          e.getMessage());
+    }
+  }
+
+  /**
+   * Helper method to register a gauge metric using reflection
+   */
+  private void registerGauge(Object serverMetrics, Class<?> serverGaugeClass, 
String gaugeName,
+      java.util.function.Supplier<Long> valueSupplier) {
+    try {
+      // Get the gauge enum constant
+      Object gauge = Enum.valueOf((Class<Enum>) serverGaugeClass, gaugeName);
+
+      // Call setOrUpdateGlobalGauge(gauge, valueSupplier)
+      serverMetrics.getClass()
+          .getMethod("setOrUpdateGlobalGauge", serverGaugeClass, 
java.util.function.Supplier.class)
+          .invoke(serverMetrics, gauge, valueSupplier);
+
+      LOGGER.debug("Registered gauge: {}", gaugeName);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to register gauge {}: {}", gaugeName, 
e.getMessage());
+    }
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }

Review Comment:
   The method `checkTaskAllowed()` is now empty but still present. This method 
should be removed since it's no longer used in the new queue-based 
implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to