This is an automated email from the ASF dual-hosted git repository.

klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a2c1fb2 [AMORO-3638] Add support for task retries when task 
execution times out
02a2c1fb2 is described below

commit 02a2c1fb2c940a84c310e4e336bab0e25d0b20fa
Author: Jzjsnow <[email protected]>
AuthorDate: Tue Jul 29 15:43:35 2025 +0800

    [AMORO-3638] Add support for task retries when task execution times out
    
    This commit adds a timeout for the task state `ACK`, which is helpful for 
that
    AMS can't receive the complete notification in any case.
---
 .../apache/amoro/server/AmoroManagementConf.java   |  6 +++++
 .../amoro/server/DefaultOptimizingService.java     | 26 +++++++++++++++----
 .../amoro/server/optimizing/OptimizingQueue.java   |  4 +++
 .../apache/amoro/server/AMSServiceTestBase.java    |  2 ++
 .../amoro/server/TestDefaultOptimizingService.java | 30 ++++++++++++++++++++++
 dist/src/main/amoro-bin/conf/config.yaml           |  1 +
 6 files changed, 64 insertions(+), 5 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index 5bb9d1cdd..a92a5e4d3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -380,6 +380,12 @@ public class AmoroManagementConf {
           .defaultValue(Duration.ofSeconds(30))
           .withDescription("Timeout duration for task acknowledgment.");
 
+  public static final ConfigOption<Duration> OPTIMIZER_TASK_EXECUTE_TIMEOUT =
+      ConfigOptions.key("optimizer.task-execute-timeout")
+          .durationType()
+          .defaultValue(Duration.ofHours(1))
+          .withDescription("Timeout duration for task execution, default to 1 
hour.");
+
   public static final ConfigOption<Integer> OPTIMIZER_MAX_PLANNING_PARALLELISM 
=
       ConfigOptions.key("optimizer.max-planning-parallelism")
           .intType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index ffb680781..c69e9194e 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -59,6 +59,7 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -91,6 +92,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
 
   private final long optimizerTouchTimeout;
   private final long taskAckTimeout;
+  private final long taskExecuteTimeout;
   private final int maxPlanningParallelism;
   private final long pollingTimeout;
   private final long refreshGroupInterval;
@@ -114,6 +116,8 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
         serviceConfig.get(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT).toMillis();
     this.taskAckTimeout =
         
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT).toMillis();
+    this.taskExecuteTimeout =
+        
serviceConfig.get(AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT).toMillis();
     this.refreshGroupInterval =
         
serviceConfig.get(AmoroManagementConf.OPTIMIZING_REFRESH_GROUP_INTERVAL).toMillis();
     this.maxPlanningParallelism =
@@ -513,10 +517,20 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     }
 
     private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
-      LOG.info(
-          "Task {} is suspending, since it's optimizer is expired, put it to 
retry queue, optimizer {}",
-          task.getTaskId(),
-          task.getResourceDesc());
+      if (task.getStatus() == TaskRuntime.Status.ACKED
+          && task.getStartTime() + taskExecuteTimeout < 
System.currentTimeMillis()) {
+        LOG.warn(
+            "Task {} has been suspended in ACK state for {} (start time: {}), 
put it to retry queue, optimizer {}. (Note: The task may have finished 
executing, but ams did not receive the COMPLETE message from the optimizer.)",
+            task.getTaskId(),
+            Duration.ofMillis(taskExecuteTimeout),
+            task.getStartTime(),
+            task.getResourceDesc());
+      } else {
+        LOG.info(
+            "Task {} is suspending, since it's optimizer is expired, put it to 
retry queue, optimizer {}",
+            task.getTaskId(),
+            task.getResourceDesc());
+      }
       // optimizing task of suspending optimizer would not be counted for 
retrying
       try {
         queue.retryTask(task);
@@ -534,7 +548,9 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
                   && !activeTokens.contains(task.getToken())
                   && task.getStatus() != TaskRuntime.Status.SUCCESS
               || task.getStatus() == TaskRuntime.Status.SCHEDULED
-                  && task.getStartTime() + taskAckTimeout < 
System.currentTimeMillis();
+                  && task.getStartTime() + taskAckTimeout < 
System.currentTimeMillis()
+              || task.getStatus() == TaskRuntime.Status.ACKED
+                  && task.getStartTime() + taskExecuteTimeout < 
System.currentTimeMillis();
     }
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index d6e1a2a73..a3f3e2ccd 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -508,6 +508,10 @@ public class OptimizingQueue extends PersistentBase {
                 taskRuntime.getFailReason());
             retryTask(taskRuntime);
           } else {
+            LOG.info(
+                "Task {} has reached the max execute retry count. Process {} 
failed.",
+                taskRuntime.getTaskId(),
+                processId);
             this.failedReason = taskRuntime.getFailReason();
             this.status = ProcessStatus.FAILED;
             this.endTime = taskRuntime.getEndTime();
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 6cf8c6bda..175772cb2 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -38,6 +38,8 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
     try {
       Configurations configurations = new Configurations();
       configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, 
Duration.ofMillis(800L));
+      configurations.set(
+          AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, 
Duration.ofMillis(30000L));
       TABLE_SERVICE = new DefaultTableService(new Configurations(), 
CATALOG_MANAGER);
       OPTIMIZING_SERVICE =
           new DefaultOptimizingService(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 10ed01f8e..9c6051d44 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -311,6 +311,36 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     assertTaskCompleted(taskRuntime);
   }
 
+  @Test
+  public void testExecuteTaskTimeOutAndRetry() throws InterruptedException {
+    OptimizingTask task = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task);
+
+    optimizingService().ackTask(token, THREAD_ID, task.getTaskId());
+
+    TaskRuntime taskRuntime =
+        optimizingService().listTasks(defaultResourceGroup().getName()).get(0);
+    assertTaskStatus(TaskRuntime.Status.ACKED);
+
+    // In this test, OPTIMIZER_TASK_EXECUTE_TIMEOUT is set to 30 seconds, so 
after waiting 45
+    // seconds the task will be considered suspended and retried
+    Thread.sleep(45000);
+
+    assertTaskStatus(TaskRuntime.Status.PLANNED);
+    OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
+    Assertions.assertNotNull(task2);
+    Assertions.assertEquals(task2.getTaskId(), task.getTaskId());
+    TableOptimizing.OptimizingInput input =
+        SerializationUtil.simpleDeserialize(task.getTaskInput());
+    TableOptimizing.OptimizingInput input2 =
+        SerializationUtil.simpleDeserialize(task2.getTaskInput());
+    Assertions.assertEquals(input2.toString(), input.toString());
+
+    optimizingService().ackTask(token, THREAD_ID, task2.getTaskId());
+    optimizingService().completeTask(token, 
buildOptimizingTaskResult(task2.getTaskId()));
+    assertTaskCompleted(taskRuntime);
+  }
+
   @Test
   public void testReloadScheduledTask() {
     // 1.poll task
diff --git a/dist/src/main/amoro-bin/conf/config.yaml 
b/dist/src/main/amoro-bin/conf/config.yaml
index d66f2a347..7503317d0 100644
--- a/dist/src/main/amoro-bin/conf/config.yaml
+++ b/dist/src/main/amoro-bin/conf/config.yaml
@@ -55,6 +55,7 @@ ams:
   optimizer:
     heart-beat-timeout: 1min # 60000
     task-ack-timeout: 30s # 30000
+    task-execute-timeout: 1h # 3600000
     polling-timeout: 3s # 3000
     max-planning-parallelism: 1 # default 1
 

Reply via email to