This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch v0.7.x-test-front
in repository https://gitbox.apache.org/repos/asf/amoro.git

commit d38a201c6e2006d2ad95d6e56603e86fa694a9c2
Author: rfyu <[email protected]>
AuthorDate: Sun Mar 24 15:42:57 2024 +0800

    [AMORO-2612]Avoid task always being in scheduled state (#2613)
    
    * [AMORO-2612]Avoid task always being in scheduled state
    
    * Modify the filter conditions to allow resetting of abnormal tasks
    
    * modify condition
    
    * Modify test cases
    
    * add log
    
    ---------
    
    Co-authored-by: ZhouJinsong <[email protected]>
    (cherry picked from commit badf862a320a5c7382238c76d146af5eb0de0704)
---
 .../arctic/server/DefaultOptimizingService.java    | 28 ++++++++++++----------
 .../arctic/server/optimizing/OptimizingQueue.java  |  4 ++++
 .../server/TestDefaultOptimizingService.java       |  5 +++-
 3 files changed, 23 insertions(+), 14 deletions(-)

diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java
 
b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java
index 44dd82195..c6a1ee314 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java
@@ -52,6 +52,7 @@ import com.netease.arctic.server.table.TableRuntimeMeta;
 import com.netease.arctic.server.table.TableService;
 import com.netease.arctic.server.utils.Configurations;
 import com.netease.arctic.table.TableProperties;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -64,6 +65,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
@@ -220,7 +222,11 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
 
   @Override
   public void completeTask(String authToken, OptimizingTaskResult taskResult) {
-    LOG.info("Optimizer {} complete task {}", authToken, 
taskResult.getTaskId());
+    LOG.info(
+        "Optimizer {} (threadId {}) complete task {}",
+        authToken,
+        taskResult.getThreadId(),
+        taskResult.getTaskId());
     OptimizingQueue queue = getQueueByToken(authToken);
     OptimizerThread thread =
         
getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId());
@@ -538,7 +544,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
               .ifPresent(
                   queue ->
                       queue
-                          .collectTasks(buildSuspendingPredication(token, 
isExpired))
+                          
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
                           .forEach(task -> retryTask(task, queue)));
           if (isExpired) {
             LOG.info("Optimizer {} has been expired, unregister it", 
keepingTask.getOptimizer());
@@ -563,17 +569,13 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
       queue.retryTask(task);
     }
 
-    private Predicate<TaskRuntime> buildSuspendingPredication(
-        String token, boolean isOptimizerExpired) {
-      return task -> {
-        if (isOptimizerExpired) {
-          return TaskRuntime.Status.SUCCESS != task.getStatus() && 
token.equals(task.getToken());
-        } else {
-          return token.equals(task.getToken())
-              && task.getStatus() == TaskRuntime.Status.SCHEDULED
-              && task.getStartTime() + taskAckTimeout < 
System.currentTimeMillis();
-        }
-      };
+    private Predicate<TaskRuntime> buildSuspendingPredication(Set<String> 
activeTokens) {
+      return task ->
+          StringUtils.isNotBlank(task.getToken())
+                  && !activeTokens.contains(task.getToken())
+                  && task.getStatus() != TaskRuntime.Status.SUCCESS
+              || task.getStatus() == TaskRuntime.Status.SCHEDULED
+                  && task.getStartTime() + taskAckTimeout < 
System.currentTimeMillis();
     }
   }
 }
diff --git 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
index 43ddc411c..79cc66986 100644
--- 
a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
+++ 
b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java
@@ -449,6 +449,10 @@ public class OptimizingQueue extends PersistentBase {
           }
         } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) {
           if (taskRuntime.getRetry() < tableRuntime.getMaxExecuteRetryCount()) 
{
+            LOG.info(
+                "Put task {} to retry queue, because {}",
+                taskRuntime.getTaskId(),
+                taskRuntime.getFailReason());
             retryTask(taskRuntime);
           } else {
             clearProcess(this);
diff --git 
a/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java
 
b/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java
index 0cb80c9d9..9a6d880aa 100644
--- 
a/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java
+++ 
b/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java
@@ -231,8 +231,11 @@ public class TestDefaultOptimizingService extends 
AMSTableTestBase {
     Assertions.assertThrows(PluginRetryAuthException.class, () -> 
optimizingService().touch(token));
     Assertions.assertThrows(
         PluginRetryAuthException.class, () -> 
optimizingService().pollTask(token, THREAD_ID));
-    assertTaskStatus(TaskRuntime.Status.PLANNED);
+    assertTaskStatus(TaskRuntime.Status.SCHEDULED);
     token = optimizingService().authenticate(buildRegisterInfo());
+    toucher = new Toucher();
+    Thread.sleep(1000);
+    assertTaskStatus(TaskRuntime.Status.PLANNED);
     OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID);
     Assertions.assertEquals(task2, task);
   }

Reply via email to