This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 3edfeb78815 branch-4.1: [fix](job) lock routine load task renew on
submit failure (#65007)
3edfeb78815 is described below
commit 3edfeb78815c266cfd9635e7c4a1ac025204b915
Author: hui lai <[email protected]>
AuthorDate: Tue Jun 30 19:38:47 2026 +0800
branch-4.1: [fix](job) lock routine load task renew on submit failure
(#65007)
pick https://github.com/apache/doris/pull/64731
---
.../load/routineload/RoutineLoadTaskScheduler.java | 42 +++++---
.../routineload/RoutineLoadTaskSchedulerTest.java | 118 +++++++++++++++++++++
2 files changed, 145 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
index 4156410ef86..e1c4ce2b3dc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskScheduler.java
@@ -250,22 +250,34 @@ public class RoutineLoadTaskScheduler extends
MasterDaemon {
routineLoadTaskInfo.getBeId(), errorMsg);
routineLoadTaskInfo.setBeId(-1);
RoutineLoadJob routineLoadJob =
routineLoadManager.getJob(routineLoadTaskInfo.getJobId());
- routineLoadJob.setOtherMsg(errorMsg);
-
- // Check if this is a resource pressure error that should not be
immediately rescheduled
- if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
- // submit task failed (such as TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED
error),
- // but txn has already begun. Here we will still set the
ExecuteStartTime of
- // this task, which means we "assume" that this task has been
successfully submitted.
- // And this task will then be aborted because of a timeout.
- // In this way, we can prevent the entire job from being paused
due to submit errors,
- // and we can also relieve the pressure on BE by waiting for the
timeout period.
-
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
- return;
- }
+ RoutineLoadTaskInfo newTask;
+
+ routineLoadJob.writeLock();
+ try {
+ routineLoadJob.setOtherMsg(errorMsg);
+
+ // Check if this is a resource pressure error that should not be
immediately rescheduled
+ if (errorMsg.contains("TOO_MANY_TASKS") ||
errorMsg.contains("MEM_LIMIT_EXCEEDED")) {
+ // submit task failed (such as
TOO_MANY_TASKS/MEM_LIMIT_EXCEEDED error),
+ // but txn has already begun. Here we will still set the
ExecuteStartTime of
+ // this task, which means we "assume" that this task has been
successfully submitted.
+ // And this task will then be aborted because of a timeout.
+ // In this way, we can prevent the entire job from being
paused due to submit errors,
+ // and we can also relieve the pressure on BE by waiting for
the timeout period.
+
routineLoadTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis());
+ return;
+ }
- // for other errors (network issues, BE restart, etc.), reschedule
immediately
- RoutineLoadTaskInfo newTask =
routineLoadJob.unprotectRenewTask(routineLoadTaskInfo, false);
+ if (routineLoadJob.getState() != JobState.RUNNING
+ ||
!routineLoadJob.containsTask(routineLoadTaskInfo.getId())) {
+ return;
+ }
+
+ // for other errors (network issues, BE restart, etc.), reschedule
immediately
+ newTask = routineLoadJob.unprotectRenewTask(routineLoadTaskInfo,
false);
+ } finally {
+ routineLoadJob.writeUnlock();
+ }
addTaskInQueue(newTask);
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 6e11fc5f71a..4a302b68da9 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -31,12 +31,15 @@ import org.apache.doris.thrift.BackendService;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgr;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
+import org.junit.Assert;
import org.junit.Test;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -119,4 +122,119 @@ public class RoutineLoadTaskSchedulerTest {
Deencapsulation.setField(routineLoadTaskScheduler,
"needScheduleTasksQueue", routineLoadTaskInfoQueue);
routineLoadTaskScheduler.runAfterCatalogReady();
}
+
+ @Test
+ public void testSubmitTaskFailureRenewsTaskWithJobWriteLock() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.RUNNING);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+ Lists.newArrayList(routineLoadTaskInfo));
+ new Expectations() {
+ {
+ routineLoadManager.getJob(1L);
+ result = routineLoadJob;
+ }
+ };
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertTrue(routineLoadJob.isRenewCalledWithWriteLock());
+ List<RoutineLoadTaskInfo> routineLoadTaskInfoList =
+ Deencapsulation.getField(routineLoadJob,
"routineLoadTaskInfoList");
+ Assert.assertEquals(1, routineLoadTaskInfoList.size());
+ Assert.assertNotSame(routineLoadTaskInfo,
routineLoadTaskInfoList.get(0));
+
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertSame(routineLoadTaskInfoList.get(0),
needScheduleTasksQueue.peek());
+ }
+
+ @Test
+ public void testSubmitTaskFailureSkipsRenewWhenTaskRemoved() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.RUNNING);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
Lists.newArrayList());
+ new Expectations() {
+ {
+ routineLoadManager.getJob(1L);
+ result = routineLoadJob;
+ }
+ };
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertFalse(routineLoadJob.isRenewCalled());
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+ }
+
+ @Test
+ public void testSubmitTaskFailureSkipsRenewWhenJobPaused() {
+ ConcurrentMap<Integer, Long> partitionIdToOffset =
Maps.newConcurrentMap();
+ partitionIdToOffset.put(1, 100L);
+ KafkaTaskInfo routineLoadTaskInfo = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
+ partitionIdToOffset, false, -1, false);
+ routineLoadTaskInfo.setBeId(100L);
+
+ LockCheckingKafkaRoutineLoadJob routineLoadJob = new
LockCheckingKafkaRoutineLoadJob();
+ Deencapsulation.setField(routineLoadJob, "state",
RoutineLoadJob.JobState.PAUSED);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+ Deencapsulation.setField(routineLoadJob, "routineLoadTaskInfoList",
+ Lists.newArrayList(routineLoadTaskInfo));
+ new Expectations() {
+ {
+ routineLoadManager.getJob(1L);
+ result = routineLoadJob;
+ }
+ };
+
+ RoutineLoadTaskScheduler routineLoadTaskScheduler = new
RoutineLoadTaskScheduler(routineLoadManager);
+ Deencapsulation.invoke(routineLoadTaskScheduler,
"handleSubmitTaskFailure",
+ routineLoadTaskInfo, "network error");
+
+ Assert.assertFalse(routineLoadJob.isRenewCalled());
+ LinkedBlockingDeque<RoutineLoadTaskInfo> needScheduleTasksQueue =
+ Deencapsulation.getField(routineLoadTaskScheduler,
"needScheduleTasksQueue");
+ Assert.assertTrue(needScheduleTasksQueue.isEmpty());
+ }
+
+ private static class LockCheckingKafkaRoutineLoadJob extends
KafkaRoutineLoadJob {
+ private boolean renewCalled;
+ private boolean renewCalledWithWriteLock;
+
+ @Override
+ protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo,
+ boolean delaySchedule) {
+ renewCalled = true;
+ renewCalledWithWriteLock = lock.isWriteLockedByCurrentThread();
+ return super.unprotectRenewTask(routineLoadTaskInfo,
delaySchedule);
+ }
+
+ private boolean isRenewCalled() {
+ return renewCalled;
+ }
+
+ private boolean isRenewCalledWithWriteLock() {
+ return renewCalledWithWriteLock;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]