This is an automated email from the ASF dual-hosted git repository.

klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c74fd7f0 [AMORO-3637] Harden the task completion logic to avoid the 
task state in ACKED forever
4c74fd7f0 is described below

commit 4c74fd7f0d8bddcf0c88639cc8382b6fa11b97ad
Author: Zhuojun Jiang <[email protected]>
AuthorDate: Mon Jul 21 16:20:03 2025 +0800

    [AMORO-3637] Harden the task completion logic to avoid the task state in 
ACKED forever
    
    Before the commit, the task may state on `ACKED` forever if the task 
execution fails,
    This commit will harden the task completion notification logic, so that 
server will received the completion.
---
 .../amoro/server/DefaultOptimizingService.java     |  5 ++--
 .../amoro/optimizer/common/OptimizerExecutor.java  | 33 +++++++++++++++++-----
 2 files changed, 29 insertions(+), 9 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index a6bf70a6f..ffb680781 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -241,10 +241,11 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   @Override
   public void completeTask(String authToken, OptimizingTaskResult taskResult) {
     LOG.info(
-        "Optimizer {} (threadId {}) complete task {}",
+        "Optimizer {} (threadId {}) complete task {} (status: {})",
         authToken,
         taskResult.getThreadId(),
-        taskResult.getTaskId());
+        taskResult.getTaskId(),
+        taskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
     OptimizingQueue queue = getQueueByToken(authToken);
     OptimizerThread thread =
         
getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId());
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 854c991b8..d40e7ec94 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -47,14 +47,31 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
 
   public void start() {
     while (isStarted()) {
+      OptimizingTask ackTask = null;
+      OptimizingTaskResult result = null;
       try {
         OptimizingTask task = pollTask();
         if (task != null && ackTask(task)) {
-          OptimizingTaskResult result = executeTask(task);
-          completeTask(result);
+          ackTask = task;
+          result = executeTask(task);
         }
       } catch (Throwable t) {
-        LOG.error("Optimizer executor[{}] got an unexpected error", threadId, 
t);
+        if (ackTask != null) {
+          LOG.error(
+              "Optimizer executor[{}] handling task[{}] failed and got an 
unknown error",
+              threadId,
+              ackTask.getTaskId(),
+              t);
+          String errorMessage = ExceptionUtil.getErrorMessage(t, 
ERROR_MESSAGE_MAX_LENGTH);
+          result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
+          result.setErrorMessage(errorMessage);
+        } else {
+          LOG.error("Optimizer executor[{}] got an unexpected error", 
threadId, t);
+        }
+      } finally {
+        if (result != null) {
+          completeTask(result);
+        }
       }
     }
   }
@@ -112,14 +129,16 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
             return null;
           });
       LOG.info(
-          "Optimizer executor[{}] completed task[{}] to ams",
+          "Optimizer executor[{}] completed task[{}](status: {}) to ams",
           threadId,
-          optimizingTaskResult.getTaskId());
-    } catch (TException exception) {
+          optimizingTaskResult.getTaskId(),
+          optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
+    } catch (Exception exception) {
       LOG.error(
-          "Optimizer executor[{}] completed task[{}] failed",
+          "Optimizer executor[{}] completed task[{}](status: {}) failed",
           threadId,
           optimizingTaskResult.getTaskId(),
+          optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
           exception);
     }
   }

Reply via email to