Copilot commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2225103604


##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,432 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > _queueTimeoutMs) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue", queueTime);
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdown();

Review Comment:
   The shutdown method should call _monitorExecutor.shutdownNow() instead of 
shutdown() to ensure the background monitoring thread is interrupted 
immediately, preventing potential delays during application shutdown.
   ```suggestion
       _monitorExecutor.shutdownNow();
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,432 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > _queueTimeoutMs) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue", queueTime);
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdown();
+
+    // Process any remaining tasks in the queue
+    while (!_taskQueue.isEmpty()) {
+      QueuedTask task = _taskQueue.poll();
+      if (task != null) {
+        task.timeout();

Review Comment:
   Processing remaining tasks during shutdown by calling task.timeout() may 
cause unexpected behavior for callers still waiting on these tasks. Consider 
using a more graceful approach like setting a shutdown flag and allowing the 
monitor thread to complete current processing before terminating.
   ```suggestion
       // Allow the monitor thread to complete current processing
       try {
         if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
           LOGGER.warn("Monitor executor did not terminate within the timeout 
period.");
         }
       } catch (InterruptedException e) {
         LOGGER.warn("Interrupted while waiting for monitor executor to 
terminate.", e);
         Thread.currentThread().interrupt();
       }
   
       // Process any remaining tasks in the queue
       while (!_taskQueue.isEmpty()) {
         QueuedTask task = _taskQueue.poll();
         if (task != null) {
           try {
             task.run(); // Attempt to execute the task
             _processedTaskCount.incrementAndGet();
           } catch (Exception e) {
             LOGGER.error("Error executing remaining task during shutdown", e);
             _timedOutTaskCount.incrementAndGet();
           }
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,432 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > _queueTimeoutMs) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue", queueTime);
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdown();
+
+    // Process any remaining tasks in the queue
+    while (!_taskQueue.isEmpty()) {
+      QueuedTask task = _taskQueue.poll();
+      if (task != null) {
+        task.timeout();
+      }
+    }
+
+    super.shutdown();
+
+    LOGGER.info("ThrottleOnCriticalHeapUsageExecutor shutdown. Stats - Queued: 
{}, Processed: {}, Timed out: {}",
+        _queuedTaskCount.get(), _processedTaskCount.get(), 
_timedOutTaskCount.get());
+  }
+
+  /**
+   * Get current queue size
+   */
+  public int getQueueSize() {
+    return _taskQueue.size();
+  }
+
+  /**
+   * Get total number of tasks queued
+   */
+  public int getQueuedTaskCount() {
+    return _queuedTaskCount.get();
+  }
+
+  /**
+   * Get total number of tasks processed from queue
+   */
+  public int getProcessedTaskCount() {
+    return _processedTaskCount.get();
+  }
+
+  /**
+   * Get total number of tasks that timed out in queue
+   */
+  public int getTimedOutTaskCount() {
+    return _timedOutTaskCount.get();
+  }
+
+  /**
+   * Base class for queued tasks

Review Comment:
   The QueuedTask abstract class and its implementations (QueuedCallableTask, 
QueuedRunnableTask) lack class-level documentation explaining their purpose, 
lifecycle, and thread-safety guarantees, which is important for understanding 
the complex synchronization logic.
   ```suggestion
      * Base class for queued tasks.
      *
      * <p>The {@code QueuedTask} class represents a task that can be queued 
for execution when the system is under
      * critical heap usage. It provides a common interface for handling task 
execution, timeouts, and failures.</p>
      *
      * <p>Lifecycle:
      * <ul>
      *   <li>Tasks are created and added to the queue when heap usage is 
critical.</li>
      *   <li>When heap usage drops below the critical level, tasks are 
dequeued and executed.</li>
      *   <li>If a task remains in the queue beyond a configured timeout, the 
{@code timeout()} method is invoked.</li>
      *   <li>If an exception occurs during execution, the {@code 
fail(Exception e)} method is invoked.</li>
      * </ul>
      * </p>
      *
      * <p>Thread-safety:
      * <ul>
      *   <li>Instances of {@code QueuedTask} are not inherently thread-safe 
and should be accessed in a thread-safe
      *       manner by the enclosing executor.</li>
      *   <li>The enclosing {@code ThrottleOnCriticalHeapUsageExecutor} ensures 
proper synchronization when accessing
      *       and modifying the queue.</li>
      * </ul>
      * </p>
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,432 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 

Review Comment:
   The method checkTaskAllowed() is now empty but still exists. Since the 
functionality has moved to shouldQueueTask(), this method should either be 
removed or documented as intentionally empty for backward compatibility.
   ```suggestion
   
     /**
      * This method is intentionally left empty for backward compatibility.
      * The functionality has been moved to {@link #shouldQueueTask()}.
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to