This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef2fc44e5c [Improve](Job)Allows modify the configuration of the Job
queue and the number of consumer threads (#23547)
ef2fc44e5c is described below
commit ef2fc44e5c07312011e33820c71679fc13643235
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Aug 28 12:01:49 2023 +0800
[Improve](Job)Allows modify the configuration of the Job queue and the
number of consumer threads (#23547)
---
.../src/main/java/org/apache/doris/common/Config.java | 14 ++++++++++++++
.../apache/doris/scheduler/disruptor/TaskDisruptor.java | 16 ++++++----------
2 files changed, 20 insertions(+), 10 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 9da22b20da..44b7a27007 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1556,6 +1556,20 @@ public class Config extends ConfigBase {
@ConfField
public static int scheduler_job_task_max_num = 10;
+ /**
+ * The number of async tasks that can be queued. @See TaskDisruptor
+ * if consumer is slow, the queue will be full, and the producer will be
blocked.
+ */
+ @ConfField
+ public static int async_task_queen_size = 1024;
+
+ /**
+ * The number of threads used to consume async tasks. @See TaskDisruptor
+ * if we have a lot of async tasks, we need more threads to consume them.
Sure, it's depends on the cpu cores.
+ */
+ @ConfField
+ public static int async_task_consumer_thread_num = 10;
+
// enable_workload_group should be immutable and temporarily set to
mutable during the development test phase
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_workload_group = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
index 889355f2cd..3b59a5187e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskDisruptor.java
@@ -17,6 +17,7 @@
package org.apache.doris.scheduler.disruptor;
+import org.apache.doris.common.Config;
import org.apache.doris.scheduler.constants.TaskType;
import org.apache.doris.scheduler.manager.TimerJobManager;
import org.apache.doris.scheduler.manager.TransientTaskManager;
@@ -48,20 +49,15 @@ import java.util.concurrent.TimeUnit;
public class TaskDisruptor implements Closeable {
private final Disruptor<TaskEvent> disruptor;
- private static final int DEFAULT_RING_BUFFER_SIZE = 1024;
+ private static final int DEFAULT_RING_BUFFER_SIZE =
Config.async_task_queen_size;
+
+ private static int consumerThreadCount =
Config.async_task_consumer_thread_num;
/**
* The default timeout for {@link #close()} in seconds.
*/
private static final int DEFAULT_CLOSE_WAIT_TIME_SECONDS = 5;
- /**
- * The default number of consumers to create for each {@link Disruptor}
instance.
- */
- private static final int DEFAULT_CONSUMER_COUNT =
System.getProperty("event.task.disruptor.consumer.count")
- == null ? Runtime.getRuntime().availableProcessors()
- :
Integer.parseInt(System.getProperty("event.task.disruptor.consumer.count"));
-
/**
* Whether this disruptor has been closed.
* if true, then we can't publish any more events.
@@ -82,8 +78,8 @@ public class TaskDisruptor implements Closeable {
ThreadFactory producerThreadFactory = DaemonThreadFactory.INSTANCE;
disruptor = new Disruptor<>(TaskEvent.FACTORY,
DEFAULT_RING_BUFFER_SIZE, producerThreadFactory,
ProducerType.SINGLE, new BlockingWaitStrategy());
- WorkHandler<TaskEvent>[] workers = new
TaskHandler[DEFAULT_CONSUMER_COUNT];
- for (int i = 0; i < DEFAULT_CONSUMER_COUNT; i++) {
+ WorkHandler<TaskEvent>[] workers = new
TaskHandler[consumerThreadCount];
+ for (int i = 0; i < consumerThreadCount; i++) {
workers[i] = new TaskHandler(timerJobManager,
transientTaskManager);
}
disruptor.handleEventsWithWorkerPool(workers);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]