This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new a67bbf3c586 branch-2.1: [Fix](job)Fix CAS competition failure leading to message publishing failure. #45018 (#45030) a67bbf3c586 is described below commit a67bbf3c5866868adeecf3d3f4a3f3a64fad69a3 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Fri Dec 6 09:32:15 2024 +0800 branch-2.1: [Fix](job)Fix CAS competition failure leading to message publishing failure. #45018 (#45030) Cherry-picked from #45018 Co-authored-by: Calvin Kirs <guoqi...@selectdb.com> --- .../java/org/apache/doris/job/disruptor/TaskDisruptor.java | 11 ++++++++++- .../java/org/apache/doris/job/scheduler/JobScheduler.java | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java index 6ca2924c593..2b2e3df0418 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java @@ -74,7 +74,16 @@ public class TaskDisruptor<T> { public boolean publishEvent(Object... args) { try { RingBuffer<T> ringBuffer = disruptor.getRingBuffer(); - return ringBuffer.tryPublishEvent(eventTranslator, args); + // Check if the RingBuffer has enough capacity to reserve 10 slots for tasks + // If there is insufficient capacity (less than 10 slots available) + // log a warning and drop the current task + if (!ringBuffer.hasAvailableCapacity(10)) { + LOG.warn("ring buffer has no available capacity,task will be dropped," + + "please check the task queue size."); + return false; + } + ringBuffer.publishEvent(eventTranslator, args); + return true; } catch (Exception e) { LOG.warn("Failed to publish event", e); // Handle the exception, e.g., retry or alert diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index ea0c263a5ee..7f8b39f1e66 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -168,7 +168,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { for (AbstractTask task : tasks) { if (!taskDisruptorGroupManager.dispatchInstantTask(task, job.getJobType(), job.getJobConfig())) { - throw new JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId())); + String errorMsg = job.formatMsgWhenExecuteQueueFull(task.getTaskId()); + task.onFail(errorMsg); + throw new JobException(errorMsg); } log.info("dispatch instant job, job id is {}, job name is {}, task id is {}", job.getJobId(), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org