Copilot commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2227651066


##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,552 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
default timeout: {}ms, "
+            + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs, 
monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}, Queued: 
{}, Processed: {}, Timed out: {}",
+            initialQueueSize, _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get());
+        _lastQueueStatusLogTime = currentTime;
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+              while (!shouldQueueTask() && !_taskQueue.isEmpty()) {

Review Comment:
   The indentation is inconsistent - this line has extra leading spaces 
compared to the surrounding code block.
   ```suggestion
         while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,552 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
default timeout: {}ms, "
+            + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs, 
monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}, Queued: 
{}, Processed: {}, Timed out: {}",
+            initialQueueSize, _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get());
+        _lastQueueStatusLogTime = currentTime;
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+              while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > queuedTask.getTimeoutMs()) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)", 
queueTime, queuedTask.getTimeoutMs());
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+
+      // Log completion only for significant processing (5+ tasks) to avoid 
log spam
+      int finalQueueSize = _taskQueue.size();
+      if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+        int processedThisCycle = initialQueueSize - finalQueueSize;
+        if (processedThisCycle >= 5) {
+          LOGGER.info("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        } else {
+          LOGGER.debug("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Execute a callable task with custom timeout if queuing is needed.
+   * This allows per-task timeout specification instead of using the global 
default.
+   *
+   * @param task the callable task to execute
+   * @param timeoutMs the timeout in milliseconds for this specific task if it 
gets queued
+   * @return the result of the callable task
+   * @throws Exception if execution fails
+   */
+  public <T> T executeWithTimeout(Callable<T> task, long timeoutMs)
+      throws Exception {
+    if (shouldQueueTask()) {
+      // Queue the task with custom timeout if heap usage is critical
+      return queueCallableTask(task, timeoutMs);
+    } else {
+      // Execute immediately if heap usage is normal
+      return task.call();
+    }
+  }
+
+  /**
+   * Execute a runnable task with custom timeout if queuing is needed.
+   * This allows per-task timeout specification instead of using the global 
default.
+   *
+   * @param task the runnable task to execute
+   * @param timeoutMs the timeout in milliseconds for this specific task if it 
gets queued
+   */
+  public void executeWithTimeout(Runnable task, long timeoutMs) {
+    if (shouldQueueTask()) {
+      // Queue the task with custom timeout if heap usage is critical
+      queueRunnableTask(task, timeoutMs);
+    } else {
+      // Execute immediately if heap usage is normal
+      task.run();
+    }
+  }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    return queueCallableTask(task, _defaultQueueTimeoutMs);
+  }
+
+  /**
+   * Queue a callable task with custom timeout and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task, long timeoutMs)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task, 
timeoutMs);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    queueRunnableTask(task, _defaultQueueTimeoutMs);
+  }
+
+  /**
+   * Queue a runnable task with custom timeout and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task, long timeoutMs) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdownNow();
+
+    // Allow the monitor thread to complete current processing
+    try {
+      if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn("Monitor executor did not terminate within the timeout 
period.");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting for monitor executor to 
terminate.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    // Ignore remaining tasks in the queue - fail them with shutdown exception
+    int remainingTasks = 0;
+    while (!_taskQueue.isEmpty()) {
+      QueuedTask task = _taskQueue.poll();
+      if (task != null) {
+        remainingTasks++;
+        task.fail(new IllegalStateException("Executor is shutting down"));
+        _timedOutTaskCount.incrementAndGet();
+      }
+    }
+
+    super.shutdown();
+
+    LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Stats - Queued: 
{}, Processed: {}, "
+            + "Timed out: {}, Ignored: {}", _queuedTaskCount.get(), 
_processedTaskCount.get(),
+        _timedOutTaskCount.get(), remainingTasks);

Review Comment:
   [nitpick] The log message is split across multiple lines unnecessarily. 
Consider consolidating into a single string for better readability.
   ```suggestion
       LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Stats - 
Queued: {}, Processed: {}, Timed out: {}, Ignored: {}", _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get(), remainingTasks);
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,552 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout (global default or per-task)
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _defaultQueueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long defaultQueueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _defaultQueueTimeoutMs = defaultQueueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
default timeout: {}ms, "
+            + "monitor interval: {}ms", maxQueueSize, defaultQueueTimeoutMs, 
monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}, Queued: 
{}, Processed: {}, Timed out: {}",
+            initialQueueSize, _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get());
+        _lastQueueStatusLogTime = currentTime;
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+              while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > queuedTask.getTimeoutMs()) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue (timeout: {}ms)", 
queueTime, queuedTask.getTimeoutMs());
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+
+      // Log completion only for significant processing (5+ tasks) to avoid 
log spam
+      int finalQueueSize = _taskQueue.size();
+      if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+        int processedThisCycle = initialQueueSize - finalQueueSize;
+        if (processedThisCycle >= 5) {
+          LOGGER.info("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        } else {
+          LOGGER.debug("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Execute a callable task with custom timeout if queuing is needed.
+   * This allows per-task timeout specification instead of using the global 
default.
+   *
+   * @param task the callable task to execute
+   * @param timeoutMs the timeout in milliseconds for this specific task if it 
gets queued
+   * @return the result of the callable task
+   * @throws Exception if execution fails
+   */
+  public <T> T executeWithTimeout(Callable<T> task, long timeoutMs)
+      throws Exception {
+    if (shouldQueueTask()) {
+      // Queue the task with custom timeout if heap usage is critical
+      return queueCallableTask(task, timeoutMs);
+    } else {
+      // Execute immediately if heap usage is normal
+      return task.call();
+    }
+  }
+
+  /**
+   * Execute a runnable task with custom timeout if queuing is needed.
+   * This allows per-task timeout specification instead of using the global 
default.
+   *
+   * @param task the runnable task to execute
+   * @param timeoutMs the timeout in milliseconds for this specific task if it 
gets queued
+   */
+  public void executeWithTimeout(Runnable task, long timeoutMs) {
+    if (shouldQueueTask()) {
+      // Queue the task with custom timeout if heap usage is critical
+      queueRunnableTask(task, timeoutMs);
+    } else {
+      // Execute immediately if heap usage is normal
+      task.run();
+    }
+  }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    return queueCallableTask(task, _defaultQueueTimeoutMs);
+  }
+
+  /**
+   * Queue a callable task with custom timeout and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task, long timeoutMs)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task, 
timeoutMs);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    queueRunnableTask(task, _defaultQueueTimeoutMs);
+  }
+
+  /**
+   * Queue a runnable task with custom timeout and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task, long timeoutMs) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task, timeoutMs);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdownNow();
+
+    // Allow the monitor thread to complete current processing
+    try {
+      if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn("Monitor executor did not terminate within the timeout 
period.");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting for monitor executor to 
terminate.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    // Ignore remaining tasks in the queue - fail them with shutdown exception
+    int remainingTasks = 0;
+    while (!_taskQueue.isEmpty()) {
+      QueuedTask task = _taskQueue.poll();
+      if (task != null) {
+        remainingTasks++;
+        task.fail(new IllegalStateException("Executor is shutting down"));

Review Comment:
   The task failure is counted as a timed-out task, but it's actually a 
shutdown failure. Consider adding a separate counter for shutdown-canceled 
tasks or using a more specific exception type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to