This is an automated email from the ASF dual-hosted git repository. xuba pushed a commit to branch v0.7.x-test-front in repository https://gitbox.apache.org/repos/asf/amoro.git
commit 5f029c9616335b4e8e6fc7d8246d03c021de96dd Author: rfyu <[email protected]> AuthorDate: Sun Mar 24 19:45:00 2024 +0800 [AMORO-2662]After the token expires,the optimizer repeatedly commits tasks that have been reset to ams (#2663) * [AMORO-2662]After the token expires,the optimizer repeatedly commits tasks that have been reset to ams * fix bug * format * fix (cherry picked from commit 82670b68fc95720c7c3badc59100eeebc4001ca0) --- .../netease/arctic/server/exception/ArcticRuntimeException.java | 2 +- .../{DuplicateRuntimeException.java => TaskRuntimeException.java} | 8 ++++++-- .../com/netease/arctic/server/optimizing/OptimizingQueue.java | 2 +- .../java/com/netease/arctic/server/optimizing/TaskRuntime.java | 8 +++++--- core/src/main/java/com/netease/arctic/ErrorCodes.java | 2 +- .../test/java/com/netease/arctic/MockArcticMetastoreServer.java | 2 +- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java b/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java index b2b833f16..88ba36ef3 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java +++ b/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java @@ -46,7 +46,7 @@ public class ArcticRuntimeException extends RuntimeException { CODE_MAP.put(ForbiddenException.class, ErrorCodes.FORBIDDEN_ERROR_CODE); CODE_MAP.put(TaskNotFoundException.class, ErrorCodes.TASK_NOT_FOUND_ERROR_CODE); - CODE_MAP.put(DuplicateRuntimeException.class, ErrorCodes.DUPLICATED_TASK_ERROR_CODE); + CODE_MAP.put(TaskRuntimeException.class, ErrorCodes.TASK_RUNTIME_ERROR_CODE); CODE_MAP.put(OptimizingClosedException.class, ErrorCodes.OPTIMIZING_CLOSED_ERROR_CODE); CODE_MAP.put(IllegalTaskStateException.class, ErrorCodes.ILLEGAL_TASK_STATE_ERROR_CODE); CODE_MAP.put(PluginAuthException.class, ErrorCodes.PLUGIN_AUTH_ERROR_CODE); diff --git a/ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java b/ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java similarity index 79% rename from ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java rename to ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java index b17347fcd..6d089fc61 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java +++ b/ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java @@ -18,9 +18,13 @@ package com.netease.arctic.server.exception; -public class DuplicateRuntimeException extends ArcticRuntimeException { +public class TaskRuntimeException extends ArcticRuntimeException { - public DuplicateRuntimeException(String message) { + public TaskRuntimeException(String message) { super(message); } + + public TaskRuntimeException(String format, Object... args) { + super(String.format(format, args)); + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java index 79cc66986..12322fcba 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java @@ -551,7 +551,7 @@ public class OptimizingQueue extends PersistentBase { endTime = System.currentTimeMillis(); persistProcessCompleted(true); } catch (Exception e) { - LOG.warn("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e); + LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e); status = Status.FAILED; failedReason = ExceptionUtil.getErrorMessage(e, 4000); endTime = System.currentTimeMillis(); diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java index 122c0028d..eab409f20 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java @@ -27,9 +27,9 @@ import com.netease.arctic.optimizing.RewriteFilesInput; import com.netease.arctic.optimizing.RewriteFilesOutput; import com.netease.arctic.server.ArcticServiceConstants; import com.netease.arctic.server.dashboard.utils.OptimizingUtil; -import com.netease.arctic.server.exception.DuplicateRuntimeException; import com.netease.arctic.server.exception.IllegalTaskStateException; import com.netease.arctic.server.exception.OptimizingClosedException; +import com.netease.arctic.server.exception.TaskRuntimeException; import com.netease.arctic.server.optimizing.plan.TaskDescriptor; import com.netease.arctic.server.persistence.StatedPersistentBase; import com.netease.arctic.server.persistence.TaskFilesPersistence; @@ -299,10 +299,12 @@ public class TaskRuntime extends StatedPersistentBase { private void validThread(OptimizerThread thread) { if (token == null) { - throw new IllegalStateException("Task not scheduled yet, taskId:" + taskId); + throw new TaskRuntimeException("Task has been reset or not yet scheduled, taskId:%s", taskId); } if (!thread.getToken().equals(getToken()) || thread.getThreadId() != threadId) { - throw new DuplicateRuntimeException("Task already acked by optimizer thread + " + thread); + throw new TaskRuntimeException( + "The optimizer thread does not match, the thread in the task is OptimizerThread(token=%s, threadId=%s), and the thread in the request is OptimizerThread(token=%s, threadId=%s).", + getToken(), threadId, thread.getToken(), thread.getThreadId()); } } diff --git a/core/src/main/java/com/netease/arctic/ErrorCodes.java b/core/src/main/java/com/netease/arctic/ErrorCodes.java index 25ebef2dd..b2d01f1bf 100644 --- a/core/src/main/java/com/netease/arctic/ErrorCodes.java +++ b/core/src/main/java/com/netease/arctic/ErrorCodes.java @@ -29,7 +29,7 @@ public final class ErrorCodes { public static final int FORBIDDEN_ERROR_CODE = 1004; public static final int TASK_NOT_FOUND_ERROR_CODE = 2001; - public static final int DUPLICATED_TASK_ERROR_CODE = 2002; + public static final int TASK_RUNTIME_ERROR_CODE = 2002; public static final int OPTIMIZING_CLOSED_ERROR_CODE = 2003; public static final int ILLEGAL_TASK_STATE_ERROR_CODE = 2004; public static final int PLUGIN_AUTH_ERROR_CODE = 2005; diff --git a/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java b/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java index b530033e1..f1e2bdafa 100644 --- a/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java +++ b/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java @@ -411,7 +411,7 @@ public class MockArcticMetastoreServer implements Runnable { Map<Integer, OptimizingTaskId> executingTasksMap = executingTasks.get(authToken); if (executingTasksMap.containsKey(threadId)) { throw new ArcticException( - ErrorCodes.DUPLICATED_TASK_ERROR_CODE, + ErrorCodes.TASK_RUNTIME_ERROR_CODE, "DuplicateTask", String.format( "Optimizer:%s" + " thread:%d is executing another task", authToken, threadId));
