This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5901e94f29e branch-2.1: [Job](Fix)Improve Event Publishing with
Timeout #45103 (#45300)
5901e94f29e is described below
commit 5901e94f29e30a3f0b8b384df63ad0d13dec5df0
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 12 09:52:00 2024 +0800
branch-2.1: [Job](Fix)Improve Event Publishing with Timeout #45103 (#45300)
Cherry-picked from #45103
Co-authored-by: Calvin Kirs <[email protected]>
---
.../doris/job/disruptor/ExecuteTaskEvent.java | 5 ++
.../apache/doris/job/disruptor/TaskDisruptor.java | 48 ++++++++----
.../job/executor/DefaultTaskExecutorHandler.java | 40 ++++------
.../doris/job/executor/DispatchTaskHandler.java | 7 +-
.../apache/doris/job/executor/TaskProcessor.java | 87 ++++++++++++++++++++++
.../job/manager/TaskDisruptorGroupManager.java | 48 ++++--------
6 files changed, 158 insertions(+), 77 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
index 3d8f9ed1534..ac67715d2ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/ExecuteTaskEvent.java
@@ -34,4 +34,9 @@ public class ExecuteTaskEvent<T extends AbstractTask> {
return ExecuteTaskEvent::new;
}
+ public void clear() {
+ this.task = null;
+ this.jobConfig = null;
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 2b2e3df0418..9fb9d94e8df 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -19,7 +19,6 @@ package org.apache.doris.job.disruptor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventTranslatorVararg;
-import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
@@ -28,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
/**
* Utility class for creating and managing a Disruptor instance.
@@ -73,20 +73,42 @@ public class TaskDisruptor<T> {
*/
public boolean publishEvent(Object... args) {
try {
- RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
- // Check if the RingBuffer has enough capacity to reserve 10 slots
for tasks
- // If there is insufficient capacity (less than 10 slots available)
- // log a warning and drop the current task
- if (!ringBuffer.hasAvailableCapacity(10)) {
- LOG.warn("ring buffer has no available capacity,task will be
dropped,"
- + "please check the task queue size.");
- return false;
+ // Set the timeout to 1 second, converted to nanoseconds for
precision
+ long timeoutInNanos = TimeUnit.SECONDS.toNanos(1); // Timeout set
to 1 second
+ long startTime = System.nanoTime(); // Record the start time
+
+ // Loop until the timeout is reached
+ while (System.nanoTime() - startTime < timeoutInNanos) {
+ // Check if there is enough remaining capacity in the ring
buffer
+ // Adjusting to check if the required capacity is available
(instead of hardcoding 1)
+ if (disruptor.getRingBuffer().remainingCapacity() > 1) {
+ // Publish the event if there is enough capacity
+ disruptor.getRingBuffer().publishEvent(eventTranslator,
args);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("publishEvent success,the remaining buffer
size is {}",
+ disruptor.getRingBuffer().remainingCapacity());
+ }
+ return true;
+ }
+
+ // Wait for a short period before retrying
+ try {
+ Thread.sleep(10); // Adjust the wait time as needed
(maybe increase if not high-frequency)
+ } catch (InterruptedException e) {
+ // Log the exception and return false if interrupted
+ Thread.currentThread().interrupt(); // Restore interrupt
status
+ LOG.warn("Thread interrupted while waiting to publish
event", e);
+ return false;
+ }
}
- ringBuffer.publishEvent(eventTranslator, args);
- return true;
+
+ // Timeout reached without publishing the event
+ LOG.warn("Failed to publish event within the specified timeout (1
second)."
+ + "Queue may be full. the remaining buffer size is
{}",
+ disruptor.getRingBuffer().remainingCapacity());
} catch (Exception e) {
- LOG.warn("Failed to publish event", e);
- // Handle the exception, e.g., retry or alert
+ // Catching general exceptions to handle unexpected errors
+ LOG.warn("Failed to publish event due to an unexpected error", e);
}
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
index befa8cc35fc..cdfe7c0fe08 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DefaultTaskExecutorHandler.java
@@ -36,35 +36,23 @@ public class DefaultTaskExecutorHandler<T extends
AbstractTask> implements WorkH
@Override
public void onEvent(ExecuteTaskEvent<T> executeTaskEvent) {
- T task = executeTaskEvent.getTask();
- if (null == task) {
- log.warn("task is null, ignore,maybe task has been canceled");
- return;
- }
- if (task.isCancelled()) {
- log.info("task is canceled, ignore. task id is {}",
task.getTaskId());
- return;
- }
- log.info("start to execute task, task id is {}", task.getTaskId());
- try {
- task.runTask();
- } catch (Exception e) {
- //if task.onFail() throw exception, we will catch it here
- log.warn("task before error, task id is {}", task.getTaskId(), e);
- }
- //todo we need discuss whether we need to use semaphore to control the
concurrent task num
- /* Semaphore semaphore = null;
- // get token
try {
- int maxConcurrentTaskNum =
executeTaskEvent.getJobConfig().getMaxConcurrentTaskNum();
- semaphore = TaskTokenManager.tryAcquire(task.getJobId(),
maxConcurrentTaskNum);
+ T task = executeTaskEvent.getTask();
+ if (null == task) {
+ log.warn("task is null, ignore,maybe task has been canceled");
+ return;
+ }
+ if (task.isCancelled()) {
+ log.info("task is canceled, ignore. task id is {}",
task.getTaskId());
+ return;
+ }
+ log.info("start to execute task, task id is {}", task.getTaskId());
task.runTask();
} catch (Exception e) {
- task.onFail();
- log.error("execute task error, task id is {}", task.getTaskId(),
e);
+ log.error("execute task error, task id is {}",
executeTaskEvent.getTask().getTaskId(), e);
} finally {
- if (null != semaphore) {
- semaphore.release();
- }*/
+ executeTaskEvent.clear();
+ }
+
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index d93393aa0ef..b8f726c4a0c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -21,7 +21,6 @@ import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.common.JobType;
import org.apache.doris.job.common.TaskType;
-import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.disruptor.TimerJobEvent;
import org.apache.doris.job.task.AbstractTask;
@@ -40,9 +39,9 @@ import java.util.Map;
@Log4j2
public class DispatchTaskHandler<T extends AbstractJob> implements
WorkHandler<TimerJobEvent<T>> {
- private final Map<JobType, TaskDisruptor<T>> disruptorMap;
+ private final Map<JobType, TaskProcessor> disruptorMap;
- public DispatchTaskHandler(Map<JobType, TaskDisruptor<T>> disruptorMap) {
+ public DispatchTaskHandler(Map<JobType, TaskProcessor> disruptorMap) {
this.disruptorMap = disruptorMap;
}
@@ -66,7 +65,7 @@ public class DispatchTaskHandler<T extends AbstractJob>
implements WorkHandler<T
}
JobType jobType = event.getJob().getJobType();
for (AbstractTask task : tasks) {
- if (!disruptorMap.get(jobType).publishEvent(task,
event.getJob().getJobConfig())) {
+ if (!disruptorMap.get(jobType).addTask(task)) {
task.cancel();
continue;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java
new file mode 100644
index 00000000000..d9d3f25dcd8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TaskProcessor.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.executor;
+
+import org.apache.doris.job.task.AbstractTask;
+
+import lombok.extern.log4j.Log4j2;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Log4j2
+public class TaskProcessor {
+ private ExecutorService executor;
+
+ public TaskProcessor(int numberOfThreads, int queueSize, ThreadFactory
threadFactory) {
+ this.executor = new ThreadPoolExecutor(
+ numberOfThreads,
+ numberOfThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(queueSize),
+ threadFactory,
+ new ThreadPoolExecutor.AbortPolicy()
+ );
+ }
+
+ public boolean addTask(AbstractTask task) {
+ try {
+ executor.execute(() -> runTask(task));
+ log.info("Add task to executor, task id: {}", task.getTaskId());
+ return true;
+ } catch (RejectedExecutionException e) {
+ log.warn("Failed to add task to executor, task id: {}",
task.getTaskId(), e);
+ return false;
+ }
+ }
+
+ public void shutdown() {
+ log.info("Shutting down executor service...");
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ log.info("Executor service shut down successfully.");
+ }
+
+ private void runTask(AbstractTask task) {
+ try {
+ if (task == null) {
+ log.warn("Task is null, ignore. Maybe it has been canceled.");
+ return;
+ }
+ if (task.isCancelled()) {
+ log.info("Task is canceled, ignore. Task id: {}",
task.getTaskId());
+ return;
+ }
+ log.info("Start to execute task, task id: {}", task.getTaskId());
+ task.runTask();
+ } catch (Exception e) {
+ log.warn("Execute task error, task id: {}", task.getTaskId(), e);
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
index cc82b59a36a..e77dfbadcb3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/TaskDisruptorGroupManager.java
@@ -22,13 +22,10 @@ import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.disruptor.ExecuteTaskEvent;
import org.apache.doris.job.disruptor.TaskDisruptor;
import org.apache.doris.job.disruptor.TimerJobEvent;
-import org.apache.doris.job.executor.DefaultTaskExecutorHandler;
import org.apache.doris.job.executor.DispatchTaskHandler;
-import org.apache.doris.job.extensions.insert.InsertTask;
-import org.apache.doris.job.extensions.mtmv.MTMVTask;
+import org.apache.doris.job.executor.TaskProcessor;
import org.apache.doris.job.task.AbstractTask;
import com.lmax.disruptor.EventFactory;
@@ -44,7 +41,7 @@ import java.util.concurrent.TimeUnit;
public class TaskDisruptorGroupManager<T extends AbstractTask> {
- private final Map<JobType, TaskDisruptor<T>> disruptorMap = new
EnumMap<>(JobType.class);
+ private final Map<JobType, TaskProcessor> disruptorMap = new
EnumMap<>(JobType.class);
@Getter
private TaskDisruptor<TimerJobEvent<AbstractJob>> dispatchDisruptor;
@@ -92,44 +89,27 @@ public class TaskDisruptorGroupManager<T extends
AbstractTask> {
}
private void registerInsertDisruptor() {
- EventFactory<ExecuteTaskEvent<InsertTask>> insertEventFactory =
ExecuteTaskEvent.factory();
ThreadFactory insertTaskThreadFactory = new
CustomThreadFactory("insert-task-execute");
- WorkHandler[] insertTaskExecutorHandlers = new
WorkHandler[DISPATCH_INSERT_THREAD_NUM];
- for (int i = 0; i < DISPATCH_INSERT_THREAD_NUM; i++) {
- insertTaskExecutorHandlers[i] = new
DefaultTaskExecutorHandler<InsertTask>();
- }
- EventTranslatorVararg<ExecuteTaskEvent<InsertTask>> eventTranslator =
- (event, sequence, args) -> {
- event.setTask((InsertTask) args[0]);
- event.setJobConfig((JobExecutionConfiguration) args[1]);
- };
- TaskDisruptor insertDisruptor = new
TaskDisruptor<>(insertEventFactory, DISPATCH_INSERT_TASK_QUEUE_SIZE,
- insertTaskThreadFactory, new
LiteTimeoutBlockingWaitStrategy(10, TimeUnit.MILLISECONDS),
- insertTaskExecutorHandlers, eventTranslator);
- disruptorMap.put(JobType.INSERT, insertDisruptor);
+
+
+ TaskProcessor insertTaskProcessor = new
TaskProcessor(DISPATCH_INSERT_THREAD_NUM,
+ DISPATCH_INSERT_TASK_QUEUE_SIZE, insertTaskThreadFactory);
+ disruptorMap.put(JobType.INSERT, insertTaskProcessor);
}
private void registerMTMVDisruptor() {
- EventFactory<ExecuteTaskEvent<MTMVTask>> mtmvEventFactory =
ExecuteTaskEvent.factory();
+
ThreadFactory mtmvTaskThreadFactory = new
CustomThreadFactory("mtmv-task-execute");
- WorkHandler[] insertTaskExecutorHandlers = new
WorkHandler[DISPATCH_MTMV_THREAD_NUM];
- for (int i = 0; i < DISPATCH_MTMV_THREAD_NUM; i++) {
- insertTaskExecutorHandlers[i] = new
DefaultTaskExecutorHandler<MTMVTask>();
- }
- EventTranslatorVararg<ExecuteTaskEvent<MTMVTask>> eventTranslator =
- (event, sequence, args) -> {
- event.setTask((MTMVTask) args[0]);
- event.setJobConfig((JobExecutionConfiguration) args[1]);
- };
- TaskDisruptor mtmvDisruptor = new TaskDisruptor<>(mtmvEventFactory,
DISPATCH_MTMV_TASK_QUEUE_SIZE,
- mtmvTaskThreadFactory, new LiteTimeoutBlockingWaitStrategy(10,
TimeUnit.MILLISECONDS),
- insertTaskExecutorHandlers, eventTranslator);
- disruptorMap.put(JobType.MV, mtmvDisruptor);
+ TaskProcessor mtmvTaskProcessor = new
TaskProcessor(DISPATCH_MTMV_THREAD_NUM,
+ DISPATCH_MTMV_TASK_QUEUE_SIZE, mtmvTaskThreadFactory);
+ disruptorMap.put(JobType.MV, mtmvTaskProcessor);
}
public boolean dispatchInstantTask(AbstractTask task, JobType jobType,
JobExecutionConfiguration
jobExecutionConfiguration) {
- return disruptorMap.get(jobType).publishEvent(task,
jobExecutionConfiguration);
+
+
+ return disruptorMap.get(jobType).addTask(task);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]