This is an automated email from the ASF dual-hosted git repository.
klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 02a2c1fb2 [AMORO-3638] Add support for task retries when task
execution times out
02a2c1fb2 is described below
commit 02a2c1fb2c940a84c310e4e336bab0e25d0b20fa
Author: Jzjsnow <[email protected]>
AuthorDate: Tue Jul 29 15:43:35 2025 +0800
[AMORO-3638] Add support for task retries when task execution times out
This commit adds a timeout for the task state `ACK`, which is helpful for
that
AMS can't receive the complete notification in any case.
---
.../apache/amoro/server/AmoroManagementConf.java | 6 +++++
.../amoro/server/DefaultOptimizingService.java | 26 +++++++++++++++----
.../amoro/server/optimizing/OptimizingQueue.java | 4 +++
.../apache/amoro/server/AMSServiceTestBase.java | 2 ++
.../amoro/server/TestDefaultOptimizingService.java | 30 ++++++++++++++++++++++
dist/src/main/amoro-bin/conf/config.yaml | 1 +
6 files changed, 64 insertions(+), 5 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 5bb9d1cdd..a92a5e4d3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -380,6 +380,12 @@ public class AmoroManagementConf {
.defaultValue(Duration.ofSeconds(30))
.withDescription("Timeout duration for task acknowledgment.");
+ public static final ConfigOption<Duration> OPTIMIZER_TASK_EXECUTE_TIMEOUT =
+ ConfigOptions.key("optimizer.task-execute-timeout")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription("Timeout duration for task execution, default to 1
hour.");
+
public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM
=
ConfigOptions.key("optimizer.max-planning-parallelism")
.intType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index ffb680781..c69e9194e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -59,6 +59,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -91,6 +92,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final long optimizerTouchTimeout;
private final long taskAckTimeout;
+ private final long taskExecuteTimeout;
private final int maxPlanningParallelism;
private final long pollingTimeout;
private final long refreshGroupInterval;
@@ -114,6 +116,8 @@ public class DefaultOptimizingService extends
StatedPersistentBase
serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
this.taskAckTimeout =
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
+ this.taskExecuteTimeout =
+
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT).toMillis();
this.refreshGroupInterval =
serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis();
this.maxPlanningParallelism =
@@ -513,10 +517,20 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
- LOG.info(
- "Task {} is suspending, since it's optimizer is expired, put it to
retry queue, optimizer {}",
- task.getTaskId(),
- task.getResourceDesc());
+ if (task.getStatus() == TaskRuntime.Status.ACKED
+ && task.getStartTime() + taskExecuteTimeout <
System.currentTimeMillis()) {
+ LOG.warn(
+ "Task {} has been suspended in ACK state for {} (start time: {}),
put it to retry queue, optimizer {}. (Note: The task may have finished
executing, but ams did not receive the COMPLETE message from the optimizer.)",
+ task.getTaskId(),
+ Duration.ofMillis(taskExecuteTimeout),
+ task.getStartTime(),
+ task.getResourceDesc());
+ } else {
+ LOG.info(
+ "Task {} is suspending, since it's optimizer is expired, put it to
retry queue, optimizer {}",
+ task.getTaskId(),
+ task.getResourceDesc());
+ }
// optimizing task of suspending optimizer would not be counted for
retrying
try {
queue.retryTask(task);
@@ -534,7 +548,9 @@ public class DefaultOptimizingService extends
StatedPersistentBase
&& !activeTokens.contains(task.getToken())
&& task.getStatus() != TaskRuntime.Status.SUCCESS
|| task.getStatus() == TaskRuntime.Status.SCHEDULED
- && task.getStartTime() + taskAckTimeout <
System.currentTimeMillis();
+ && task.getStartTime() + taskAckTimeout <
System.currentTimeMillis()
+ || task.getStatus() == TaskRuntime.Status.ACKED
+ && task.getStartTime() + taskExecuteTimeout <
System.currentTimeMillis();
}
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index d6e1a2a73..a3f3e2ccd 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -508,6 +508,10 @@ public class OptimizingQueue extends PersistentBase {
taskRuntime.getFailReason());
retryTask(taskRuntime);
} else {
+ LOG.info(
+ "Task {} has reached the max execute retry count. Process {}
failed.",
+ taskRuntime.getTaskId(),
+ processId);
this.failedReason = taskRuntime.getFailReason();
this.status = ProcessStatus.FAILED;
this.endTime = taskRuntime.getEndTime();
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 6cf8c6bda..175772cb2 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -38,6 +38,8 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
try {
Configurations configurations = new Configurations();
configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT,
Duration.ofMillis(800L));
+ configurations.set(
+ AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT,
Duration.ofMillis(30000L));
TABLE_SERVICE = new DefaultTableService(new Configurations(),
CATALOG_MANAGER);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 10ed01f8e..9c6051d44 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -311,6 +311,36 @@ public class TestDefaultOptimizingService extends
AMSTableTestBase {
assertTaskCompleted(taskRuntime);
}
+ @Test
+ public void testExecuteTaskTimeOutAndRetry() throws InterruptedException {
+ OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task);
+
+ optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+ TaskRuntime taskRuntime =
+ optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
+ assertTaskStatus(TaskRuntime.Status.ACKED);
+
+ // In this test, OPTIMIZER_TASK_EXECUTE_TIMEOUT is set to 30 seconds, so
after waiting 45
+ // seconds the task will be considered suspended and retried
+ Thread.sleep(45000);
+
+ assertTaskStatus(TaskRuntime.Status.PLANNED);
+ OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
+ Assertions.assertNotNull(task2);
+ Assertions.assertEquals(task2.getTaskId(), task.getTaskId());
+ TableOptimizing.OptimizingInput input =
+ SerializationUtil.simpleDeserialize(task.getTaskInput());
+ TableOptimizing.OptimizingInput input2 =
+ SerializationUtil.simpleDeserialize(task2.getTaskInput());
+ Assertions.assertEquals(input2.toString(), input.toString());
+
+ optimizingService().ackTask(token, THREAD_ID, task2.getTaskId());
+ optimizingService().completeTask(token,
buildOptimizingTaskResult(task2.getTaskId()));
+ assertTaskCompleted(taskRuntime);
+ }
+
@Test
public void testReloadScheduledTask() {
// 1.poll task
diff --git a/dist/src/main/amoro-bin/conf/config.yaml
b/dist/src/main/amoro-bin/conf/config.yaml
index d66f2a347..7503317d0 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -55,6 +55,7 @@ ams:
optimizer:
heart-beat-timeout: 1min # 60000
task-ack-timeout: 30s # 30000
+ task-execute-timeout: 1h # 3600000
polling-timeout: 3s # 3000
max-planning-parallelism: 1 # default 1