This is an automated email from the ASF dual-hosted git repository.
klion26 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 4c74fd7f0 [AMORO-3637] Harden the task completion logic to avoid the
task state in ACKED forever
4c74fd7f0 is described below
commit 4c74fd7f0d8bddcf0c88639cc8382b6fa11b97ad
Author: Zhuojun Jiang <[email protected]>
AuthorDate: Mon Jul 21 16:20:03 2025 +0800
[AMORO-3637] Harden the task completion logic to avoid the task state in
ACKED forever
Before the commit, the task may state on `ACKED` forever if the task
execution fails,
This commit will harden the task completion notification logic, so that
server will received the completion.
---
.../amoro/server/DefaultOptimizingService.java | 5 ++--
.../amoro/optimizer/common/OptimizerExecutor.java | 33 +++++++++++++++++-----
2 files changed, 29 insertions(+), 9 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index a6bf70a6f..ffb680781 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -241,10 +241,11 @@ public class DefaultOptimizingService extends
StatedPersistentBase
@Override
public void completeTask(String authToken, OptimizingTaskResult taskResult) {
LOG.info(
- "Optimizer {} (threadId {}) complete task {}",
+ "Optimizer {} (threadId {}) complete task {} (status: {})",
authToken,
taskResult.getThreadId(),
- taskResult.getTaskId());
+ taskResult.getTaskId(),
+ taskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
OptimizingQueue queue = getQueueByToken(authToken);
OptimizerThread thread =
getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId());
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 854c991b8..d40e7ec94 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -47,14 +47,31 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
public void start() {
while (isStarted()) {
+ OptimizingTask ackTask = null;
+ OptimizingTaskResult result = null;
try {
OptimizingTask task = pollTask();
if (task != null && ackTask(task)) {
- OptimizingTaskResult result = executeTask(task);
- completeTask(result);
+ ackTask = task;
+ result = executeTask(task);
}
} catch (Throwable t) {
- LOG.error("Optimizer executor[{}] got an unexpected error", threadId,
t);
+ if (ackTask != null) {
+ LOG.error(
+ "Optimizer executor[{}] handling task[{}] failed and got an
unknown error",
+ threadId,
+ ackTask.getTaskId(),
+ t);
+ String errorMessage = ExceptionUtil.getErrorMessage(t,
ERROR_MESSAGE_MAX_LENGTH);
+ result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
+ result.setErrorMessage(errorMessage);
+ } else {
+ LOG.error("Optimizer executor[{}] got an unexpected error",
threadId, t);
+ }
+ } finally {
+ if (result != null) {
+ completeTask(result);
+ }
}
}
}
@@ -112,14 +129,16 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
return null;
});
LOG.info(
- "Optimizer executor[{}] completed task[{}] to ams",
+ "Optimizer executor[{}] completed task[{}](status: {}) to ams",
threadId,
- optimizingTaskResult.getTaskId());
- } catch (TException exception) {
+ optimizingTaskResult.getTaskId(),
+ optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
+ } catch (Exception exception) {
LOG.error(
- "Optimizer executor[{}] completed task[{}] failed",
+ "Optimizer executor[{}] completed task[{}](status: {}) failed",
threadId,
optimizingTaskResult.getTaskId(),
+ optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
exception);
}
}