This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a67bbf3c586 branch-2.1: [Fix](job)Fix CAS competition failure leading 
to message publishing failure. #45018 (#45030)
a67bbf3c586 is described below

commit a67bbf3c5866868adeecf3d3f4a3f3a64fad69a3
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 6 09:32:15 2024 +0800

    branch-2.1: [Fix](job)Fix CAS competition failure leading to message 
publishing failure. #45018 (#45030)
    
    Cherry-picked from #45018
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../java/org/apache/doris/job/disruptor/TaskDisruptor.java    | 11 ++++++++++-
 .../java/org/apache/doris/job/scheduler/JobScheduler.java     |  4 +++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 6ca2924c593..2b2e3df0418 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -74,7 +74,16 @@ public class TaskDisruptor<T> {
     public boolean publishEvent(Object... args) {
         try {
             RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
-            return ringBuffer.tryPublishEvent(eventTranslator, args);
+            // Check if the RingBuffer has enough capacity to reserve 10 slots 
for tasks
+            // If there is insufficient capacity (less than 10 slots available)
+            // log a warning and drop the current task
+            if (!ringBuffer.hasAvailableCapacity(10)) {
+                LOG.warn("ring buffer has no available capacity,task will be 
dropped,"
+                        + "please check the task queue size.");
+                return false;
+            }
+            ringBuffer.publishEvent(eventTranslator, args);
+            return true;
         } catch (Exception e) {
             LOG.warn("Failed to publish event", e);
             // Handle the exception, e.g., retry or alert
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index ea0c263a5ee..7f8b39f1e66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -168,7 +168,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
         for (AbstractTask task : tasks) {
             if (!taskDisruptorGroupManager.dispatchInstantTask(task, 
job.getJobType(),
                     job.getJobConfig())) {
-                throw new 
JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
+                String errorMsg = 
job.formatMsgWhenExecuteQueueFull(task.getTaskId());
+                task.onFail(errorMsg);
+                throw new JobException(errorMsg);
 
             }
             log.info("dispatch instant job, job id is {}, job name is {}, task 
id is {}", job.getJobId(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to