This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef2fc44e5c [Improve](Job)Allows modify the configuration of the Job 
queue and the number of consumer threads (#23547)
ef2fc44e5c is described below

commit ef2fc44e5c07312011e33820c71679fc13643235
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Aug 28 12:01:49 2023 +0800

    [Improve](Job)Allows modify the configuration of the Job queue and the 
number of consumer threads (#23547)
---
 .../src/main/java/org/apache/doris/common/Config.java    | 14 ++++++++++++++
 .../apache/doris/scheduler/disruptor/TaskDisruptor.java  | 16 ++++++----------
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9da22b20da..44b7a27007 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1556,6 +1556,20 @@ public class Config extends ConfigBase {
     @ConfField
     public static int scheduler_job_task_max_num = 10;
 
+    /**
+     * The number of async tasks that can be queued. @See TaskDisruptor
+     * if consumer is slow, the queue will be full, and the producer will be 
blocked.
+     */
+    @ConfField
+    public static int async_task_queen_size = 1024;
+
+    /**
+     * The number of threads used to consume async tasks. @See TaskDisruptor
+     * if we have a lot of async tasks, we need more threads to consume them. 
Sure, it's depends on the cpu cores.
+     */
+    @ConfField
+    public static int async_task_consumer_thread_num = 10;
+
     // enable_workload_group should be immutable and temporarily set to 
mutable during the development test phase
     @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
     public static boolean enable_workload_group = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 889355f2cd..3b59a5187e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.scheduler.disruptor;
 
+import org.apache.doris.common.Config;
 import org.apache.doris.scheduler.constants.TaskType;
 import org.apache.doris.scheduler.manager.TimerJobManager;
 import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -48,20 +49,15 @@ import java.util.concurrent.TimeUnit;
 public class TaskDisruptor implements Closeable {
 
     private final Disruptor<TaskEvent> disruptor;
-    private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
+    private static final int DEFAULT_RING_BUFFER_SIZE = 
Config.async_task_queen_size;
+
+    private static int consumerThreadCount = 
Config.async_task_consumer_thread_num;
 
     /**
      * The default timeout for {@link #close()} in seconds.
      */
     private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
 
-    /**
-     * The default number of consumers to create for each {@link Disruptor} 
instance.
-     */
-    private static final int DEFAULT_CONSUMER_COUNT = 
System.getProperty("event.task.disruptor.consumer.count")
-            == null ? Runtime.getRuntime().availableProcessors()
-            : 
Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count"));
-
     /**
      * Whether this disruptor has been closed.
      * if true, then we can't publish any more events.
@@ -82,8 +78,8 @@ public class TaskDisruptor implements Closeable {
         ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
         disruptor = new Disruptor<>(TaskEvent.FACTORY, 
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
                 ProducerType.SINGLE, new BlockingWaitStrategy());
-        WorkHandler<TaskEvent>[] workers = new 
TaskHandler[DEFAULT_CONSUMER_COUNT];
-        for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) {
+        WorkHandler<TaskEvent>[] workers = new 
TaskHandler[consumerThreadCount];
+        for (int i = 0; i < consumerThreadCount; i++) {
             workers[i] = new TaskHandler(timerJobManager, 
transientTaskManager);
         }
         disruptor.handleEventsWithWorkerPool(workers);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to