This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch v0.7.x-test-front
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit 5f029c9616335b4e8e6fc7d8246d03c021de96dd
Author: rfyu <[email protected]>
AuthorDate: Sun Mar 24 19:45:00 2024 +0800

    [AMORO-2662]After the token expires,the optimizer repeatedly commits tasks 
that have been reset to ams (#2663)
    
    * [AMORO-2662]After the token expires,the optimizer repeatedly commits 
tasks that have been reset to ams
    
    * fix bug
    
    * format
    
    * fix
    
    (cherry picked from commit 82670b68fc95720c7c3badc59100eeebc4001ca0)
---
 .../netease/arctic/server/exception/ArcticRuntimeException.java   | 2 +-
 .../{DuplicateRuntimeException.java => TaskRuntimeException.java} | 8 ++++++--
 .../com/netease/arctic/server/optimizing/OptimizingQueue.java     | 2 +-
 .../java/com/netease/arctic/server/optimizing/TaskRuntime.java    | 8 +++++---
 core/src/main/java/com/netease/arctic/ErrorCodes.java             | 2 +-
 .../test/java/com/netease/arctic/MockArcticMetastoreServer.java   | 2 +-
 6 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java
 
b/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java
index b2b833f16..88ba36ef3 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/exception/ArcticRuntimeException.java
@@ -46,7 +46,7 @@ public class ArcticRuntimeException extends RuntimeException {
     CODE_MAP.put(ForbiddenException.class, ErrorCodes.FORBIDDEN_ERROR_CODE);
 
     CODE_MAP.put(TaskNotFoundException.class, 
ErrorCodes.TASK_NOT_FOUND_ERROR_CODE);
-    CODE_MAP.put(DuplicateRuntimeException.class, 
ErrorCodes.DUPLICATED_TASK_ERROR_CODE);
+    CODE_MAP.put(TaskRuntimeException.class, 
ErrorCodes.TASK_RUNTIME_ERROR_CODE);
     CODE_MAP.put(OptimizingClosedException.class, 
ErrorCodes.OPTIMIZING_CLOSED_ERROR_CODE);
     CODE_MAP.put(IllegalTaskStateException.class, 
ErrorCodes.ILLEGAL_TASK_STATE_ERROR_CODE);
     CODE_MAP.put(PluginAuthException.class, ErrorCodes.PLUGIN_AUTH_ERROR_CODE);
diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java
 
b/ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java
similarity index 79%
rename from 
ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java
rename to 
ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java
index b17347fcd..6d089fc61 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/exception/DuplicateRuntimeException.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/exception/TaskRuntimeException.java
@@ -18,9 +18,13 @@
 
 package com.netease.arctic.server.exception;
 
-public class DuplicateRuntimeException extends ArcticRuntimeException {
+public class TaskRuntimeException extends ArcticRuntimeException {
 
-  public DuplicateRuntimeException(String message) {
+  public TaskRuntimeException(String message) {
     super(message);
   }
+
+  public TaskRuntimeException(String format, Object... args) {
+    super(String.format(format, args));
+  }
 }
diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
index 79cc66986..12322fcba 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
@@ -551,7 +551,7 @@ public class OptimizingQueue extends PersistentBase {
         endTime = System.currentTimeMillis();
         persistProcessCompleted(true);
       } catch (Exception e) {
-        LOG.warn("{} Commit optimizing failed ", 
tableRuntime.getTableIdentifier(), e);
+        LOG.error("{} Commit optimizing failed ", 
tableRuntime.getTableIdentifier(), e);
         status = Status.FAILED;
         failedReason = ExceptionUtil.getErrorMessage(e, 4000);
         endTime = System.currentTimeMillis();
diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java
 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java
index 122c0028d..eab409f20 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/TaskRuntime.java
@@ -27,9 +27,9 @@ import com.netease.arctic.optimizing.RewriteFilesInput;
 import com.netease.arctic.optimizing.RewriteFilesOutput;
 import com.netease.arctic.server.ArcticServiceConstants;
 import com.netease.arctic.server.dashboard.utils.OptimizingUtil;
-import com.netease.arctic.server.exception.DuplicateRuntimeException;
 import com.netease.arctic.server.exception.IllegalTaskStateException;
 import com.netease.arctic.server.exception.OptimizingClosedException;
+import com.netease.arctic.server.exception.TaskRuntimeException;
 import com.netease.arctic.server.optimizing.plan.TaskDescriptor;
 import com.netease.arctic.server.persistence.StatedPersistentBase;
 import com.netease.arctic.server.persistence.TaskFilesPersistence;
@@ -299,10 +299,12 @@ public class TaskRuntime extends StatedPersistentBase {
 
   private void validThread(OptimizerThread thread) {
     if (token == null) {
-      throw new IllegalStateException("Task not scheduled yet, taskId:" + 
taskId);
+      throw new TaskRuntimeException("Task has been reset or not yet 
scheduled, taskId:%s", taskId);
     }
     if (!thread.getToken().equals(getToken()) || thread.getThreadId() != 
threadId) {
-      throw new DuplicateRuntimeException("Task already acked by optimizer 
thread + " + thread);
+      throw new TaskRuntimeException(
+          "The optimizer thread does not match, the thread in the task is 
OptimizerThread(token=%s, threadId=%s), and the thread in the request is 
OptimizerThread(token=%s, threadId=%s).",
+          getToken(), threadId, thread.getToken(), thread.getThreadId());
     }
   }
 
diff --git a/core/src/main/java/com/netease/arctic/ErrorCodes.java 
b/core/src/main/java/com/netease/arctic/ErrorCodes.java
index 25ebef2dd..b2d01f1bf 100644
--- a/core/src/main/java/com/netease/arctic/ErrorCodes.java
+++ b/core/src/main/java/com/netease/arctic/ErrorCodes.java
@@ -29,7 +29,7 @@ public final class ErrorCodes {
   public static final int FORBIDDEN_ERROR_CODE = 1004;
 
   public static final int TASK_NOT_FOUND_ERROR_CODE = 2001;
-  public static final int DUPLICATED_TASK_ERROR_CODE = 2002;
+  public static final int TASK_RUNTIME_ERROR_CODE = 2002;
   public static final int OPTIMIZING_CLOSED_ERROR_CODE = 2003;
   public static final int ILLEGAL_TASK_STATE_ERROR_CODE = 2004;
   public static final int PLUGIN_AUTH_ERROR_CODE = 2005;
diff --git 
a/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java 
b/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java
index b530033e1..f1e2bdafa 100644
--- a/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java
+++ b/core/src/test/java/com/netease/arctic/MockArcticMetastoreServer.java
@@ -411,7 +411,7 @@ public class MockArcticMetastoreServer implements Runnable {
       Map<Integer, OptimizingTaskId> executingTasksMap = 
executingTasks.get(authToken);
       if (executingTasksMap.containsKey(threadId)) {
         throw new ArcticException(
-            ErrorCodes.DUPLICATED_TASK_ERROR_CODE,
+            ErrorCodes.TASK_RUNTIME_ERROR_CODE,
             "DuplicateTask",
             String.format(
                 "Optimizer:%s" + " thread:%d is executing another task", 
authToken, threadId));

Reply via email to