Jackie-Jiang commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2226885350


##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,494 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;

Review Comment:
   Ideally we want each task to specify its own timeout



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,494 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}, Queued: 
{}, Processed: {}, Timed out: {}",
+            initialQueueSize, _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get());
+        _lastQueueStatusLogTime = currentTime;
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > _queueTimeoutMs) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue", queueTime);
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+
+      // Log completion only for significant processing (5+ tasks) to avoid 
log spam
+      int finalQueueSize = _taskQueue.size();
+      if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+        int processedThisCycle = initialQueueSize - finalQueueSize;
+        if (processedThisCycle >= 5) {
+          LOGGER.info("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        } else {
+          LOGGER.debug("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdownNow();
+
+    // Allow the monitor thread to complete current processing
+    try {
+      if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn("Monitor executor did not terminate within the timeout 
period.");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting for monitor executor to 
terminate.", e);
+      Thread.currentThread().interrupt();

Review Comment:
   Do we want to interrupt again? Same for other places



##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,494 @@
  */
 package org.apache.pinot.spi.executor;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of 
rejecting them.
  * Heap Usage level is obtained from {@link 
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
  */
 public class ThrottleOnCriticalHeapUsageExecutor extends 
DecoratorExecutorService {
-  ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+  // Default configuration values
+  private static final int DEFAULT_QUEUE_SIZE = 1000;
+  private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+  private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+  private static final long LOG_THROTTLE_INTERVAL_MS = 30000; // Log queue 
status every 30 seconds
+
+  private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+  private final BlockingQueue<QueuedTask> _taskQueue;
+  private final int _maxQueueSize;
+  private final long _queueTimeoutMs;
+  private final ScheduledExecutorService _monitorExecutor;
+  private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+  private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+  private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
+  private volatile long _lastQueueStatusLogTime = 0;
 
   public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
       ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+    this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+        DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+  }
+
+  public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+      ThreadResourceUsageAccountant threadResourceUsageAccountant,
+      int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
     super(executorService);
     _threadResourceUsageAccountant = threadResourceUsageAccountant;
+    _maxQueueSize = maxQueueSize;
+    _queueTimeoutMs = queueTimeoutMs;
+    _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+    // Create a single-threaded scheduler for monitoring heap usage
+    _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "throttle-heap-monitor");
+      t.setDaemon(true);
+      return t;
+    });
+
+    // Start the monitoring task
+    _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+        monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+    LOGGER.info(
+        "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {}, 
timeout: {}ms, monitor interval: {}ms",
+        maxQueueSize, queueTimeoutMs, monitorIntervalMs);
   }
 
   protected void checkTaskAllowed() {
-    if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
-      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks 
throttled due to high heap usage.");
+  }
+
+  /**
+   * Check if a task should be queued due to critical heap usage
+   * @return true if the task should be queued, false if it can be executed 
immediately
+   */
+  protected boolean shouldQueueTask() {
+    return _threadResourceUsageAccountant.throttleQuerySubmission();
+  }
+
+  /**
+   * Process queued tasks when heap usage is below critical level
+   */
+  private void processQueuedTasks() {
+    if (_isShutdown.get()) {
+      return;
+    }
+
+    try {
+      int initialQueueSize = _taskQueue.size();
+      long currentTime = System.currentTimeMillis();
+
+      // Log queue size for monitoring if there are queued tasks (throttled to 
prevent log flooding)
+      if (initialQueueSize > 0 && (currentTime - _lastQueueStatusLogTime) > 
LOG_THROTTLE_INTERVAL_MS) {
+        LOGGER.info("Processing queued tasks. Current queue size: {}, Queued: 
{}, Processed: {}, Timed out: {}",
+            initialQueueSize, _queuedTaskCount.get(), 
_processedTaskCount.get(), _timedOutTaskCount.get());
+        _lastQueueStatusLogTime = currentTime;
+      }
+
+      // Process tasks while heap usage is not critical and queue is not empty
+      while (!shouldQueueTask() && !_taskQueue.isEmpty()) {
+        QueuedTask queuedTask = _taskQueue.poll();
+        if (queuedTask != null) {
+          long queueTime = System.currentTimeMillis() - 
queuedTask.getQueueTime();
+
+          if (queueTime > _queueTimeoutMs) {
+            // Task has timed out in queue
+            queuedTask.timeout();
+            _timedOutTaskCount.incrementAndGet();
+            LOGGER.warn("Task timed out after {}ms in queue", queueTime);
+          } else {
+            // Submit the task for execution
+            try {
+              queuedTask.execute();
+              _processedTaskCount.incrementAndGet();
+              LOGGER.debug("Processed queued task after {}ms in queue", 
queueTime);
+            } catch (Exception e) {
+              LOGGER.error("Error executing queued task", e);
+              queuedTask.fail(e);
+            }
+          }
+        }
+      }
+
+      // Log completion only for significant processing (5+ tasks) to avoid 
log spam
+      int finalQueueSize = _taskQueue.size();
+      if (initialQueueSize > 0 && initialQueueSize != finalQueueSize) {
+        int processedThisCycle = initialQueueSize - finalQueueSize;
+        if (processedThisCycle >= 5) {
+          LOGGER.info("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        } else {
+          LOGGER.debug("Completed processing cycle. Processed {} tasks, 
remaining queue size: {}",
+              processedThisCycle, finalQueueSize);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error in processQueuedTasks", e);
     }
   }
 
   @Override
   protected <T> Callable<T> decorate(Callable<T> task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      return task.call();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        return queueCallableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        return task.call();
+      }
     };
   }
 
   @Override
   protected Runnable decorate(Runnable task) {
-    checkTaskAllowed();
     return () -> {
-      checkTaskAllowed();
-      task.run();
+      if (shouldQueueTask()) {
+        // Queue the task if heap usage is critical
+        queueRunnableTask(task);
+      } else {
+        // Execute immediately if heap usage is normal
+        task.run();
+      }
     };
   }
+
+  /**
+   * Queue a callable task and wait for its execution
+   */
+  private <T> T queueCallableTask(Callable<T> task)
+      throws Exception {
+    QueuedCallableTask<T> queuedTask = new QueuedCallableTask<>(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued callable task, queue size: {}", _taskQueue.size());
+
+    // Wait for the task to complete or timeout
+    return queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Queue a runnable task and wait for its execution
+   */
+  private void queueRunnableTask(Runnable task) {
+    QueuedRunnableTask queuedTask = new QueuedRunnableTask(task);
+
+    if (!_taskQueue.offer(queuedTask)) {
+      // Queue is full
+      throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+          "Task queue is full (size: " + _maxQueueSize + ") due to high heap 
usage.");
+    }
+
+    _queuedTaskCount.incrementAndGet();
+    LOGGER.debug("Queued runnable task, queue size: {}", _taskQueue.size());
+
+    try {
+      // Wait for the task to complete or timeout
+      queuedTask.get(_queueTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      } else {
+        throw new RuntimeException("Error executing queued task", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    _isShutdown.set(true);
+    _monitorExecutor.shutdownNow();
+
+    // Allow the monitor thread to complete current processing
+    try {
+      if (!_monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
+        LOGGER.warn("Monitor executor did not terminate within the timeout 
period.");
+      }
+    } catch (InterruptedException e) {
+      LOGGER.warn("Interrupted while waiting for monitor executor to 
terminate.", e);
+      Thread.currentThread().interrupt();
+    }
+
+    // Process any remaining tasks in the queue

Review Comment:
   If it is already shut down, we should just ignore the pending tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to