vrajat commented on code in PR #16409:
URL: https://github.com/apache/pinot/pull/16409#discussion_r2226250082
##########
pinot-spi/src/main/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutor.java:
##########
@@ -18,46 +18,469 @@
*/
package org.apache.pinot.spi.executor;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * An Executor that throttles task submission when the heap usage is critical.
+ * An Executor that queues tasks when the heap usage is critical instead of
rejecting them.
* Heap Usage level is obtained from {@link
ThreadResourceUsageAccountant#throttleQuerySubmission()}.
+ *
+ * Features:
+ * - Tasks are queued when heap usage is critical
+ * - Queued tasks are processed when heap usage drops below critical level
+ * - Configurable queue size and timeout
+ * - Background monitoring of heap usage to process queued tasks
*/
public class ThrottleOnCriticalHeapUsageExecutor extends
DecoratorExecutorService {
- ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ThrottleOnCriticalHeapUsageExecutor.class);
+
+ // Default configuration values
+ private static final int DEFAULT_QUEUE_SIZE = 1000;
+ private static final long DEFAULT_QUEUE_TIMEOUT_MS = 30000; // 30 seconds
+ private static final long DEFAULT_MONITOR_INTERVAL_MS = 1000; // 1 second
+
+ private final ThreadResourceUsageAccountant _threadResourceUsageAccountant;
+ private final BlockingQueue<QueuedTask> _taskQueue;
+ private final int _maxQueueSize;
+ private final long _queueTimeoutMs;
+ private final ScheduledExecutorService _monitorExecutor;
+ private final AtomicBoolean _isShutdown = new AtomicBoolean(false);
+ private final AtomicInteger _queuedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _processedTaskCount = new AtomicInteger(0);
+ private final AtomicInteger _timedOutTaskCount = new AtomicInteger(0);
public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
ThreadResourceUsageAccountant threadResourceUsageAccountant) {
+ this(executorService, threadResourceUsageAccountant, DEFAULT_QUEUE_SIZE,
+ DEFAULT_QUEUE_TIMEOUT_MS, DEFAULT_MONITOR_INTERVAL_MS);
+ }
+
+ public ThrottleOnCriticalHeapUsageExecutor(ExecutorService executorService,
+ ThreadResourceUsageAccountant threadResourceUsageAccountant,
+ int maxQueueSize, long queueTimeoutMs, long monitorIntervalMs) {
super(executorService);
_threadResourceUsageAccountant = threadResourceUsageAccountant;
+ _maxQueueSize = maxQueueSize;
+ _queueTimeoutMs = queueTimeoutMs;
+ _taskQueue = new LinkedBlockingQueue<>(maxQueueSize);
+
+ // Create a single-threaded scheduler for monitoring heap usage
+ _monitorExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "throttle-heap-monitor");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // Start the monitoring task
+ _monitorExecutor.scheduleWithFixedDelay(this::processQueuedTasks,
+ monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS);
+
+ LOGGER.info(
+ "ThrottleOnCriticalHeapUsageExecutor initialized with queue size: {},
timeout: {}ms, monitor interval: {}ms",
+ maxQueueSize, queueTimeoutMs, monitorIntervalMs);
}
protected void checkTaskAllowed() {
- if (_threadResourceUsageAccountant.throttleQuerySubmission()) {
- throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Tasks
throttled due to high heap usage.");
+ }
+
+ /**
+ * Check if a task should be queued due to critical heap usage
+ * @return true if the task should be queued, false if it can be executed
immediately
+ */
+ protected boolean shouldQueueTask() {
+ return _threadResourceUsageAccountant.throttleQuerySubmission();
+ }
+
+ /**
+ * Process queued tasks when heap usage is below critical level
+ */
+ private void processQueuedTasks() {
+ if (_isShutdown.get()) {
+ return;
+ }
+
Review Comment:
nit: Can you add a log line on how many tasks are in the queue ? That will
help to track the queue size over time if requried. A metric is another option.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]