This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a67bbf3c586 branch-2.1: [Fix](job)Fix CAS competition failure leading
to message publishing failure. #45018 (#45030)
a67bbf3c586 is described below
commit a67bbf3c5866868adeecf3d3f4a3f3a64fad69a3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 6 09:32:15 2024 +0800
branch-2.1: [Fix](job)Fix CAS competition failure leading to message
publishing failure. #45018 (#45030)
Cherry-picked from #45018
Co-authored-by: Calvin Kirs <[email protected]>
---
.../java/org/apache/doris/job/disruptor/TaskDisruptor.java | 11 ++++++++++-
.../java/org/apache/doris/job/scheduler/JobScheduler.java | 4 +++-
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
index 6ca2924c593..2b2e3df0418 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/disruptor/TaskDisruptor.java
@@ -74,7 +74,16 @@ public class TaskDisruptor<T> {
public boolean publishEvent(Object... args) {
try {
RingBuffer<T> ringBuffer = disruptor.getRingBuffer();
- return ringBuffer.tryPublishEvent(eventTranslator, args);
+ // Check if the RingBuffer has enough capacity to reserve 10 slots
for tasks
+ // If there is insufficient capacity (less than 10 slots available)
+ // log a warning and drop the current task
+ if (!ringBuffer.hasAvailableCapacity(10)) {
+ LOG.warn("ring buffer has no available capacity,task will be
dropped,"
+ + "please check the task queue size.");
+ return false;
+ }
+ ringBuffer.publishEvent(eventTranslator, args);
+ return true;
} catch (Exception e) {
LOG.warn("Failed to publish event", e);
// Handle the exception, e.g., retry or alert
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index ea0c263a5ee..7f8b39f1e66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -168,7 +168,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
for (AbstractTask task : tasks) {
if (!taskDisruptorGroupManager.dispatchInstantTask(task,
job.getJobType(),
job.getJobConfig())) {
- throw new
JobException(job.formatMsgWhenExecuteQueueFull(task.getTaskId()));
+ String errorMsg =
job.formatMsgWhenExecuteQueueFull(task.getTaskId());
+ task.onFail(errorMsg);
+ throw new JobException(errorMsg);
}
log.info("dispatch instant job, job id is {}, job name is {}, task
id is {}", job.getJobId(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]