This is an automated email from the ASF dual-hosted git repository. xuba pushed a commit to branch v0.7.x-test-front in repository https://gitbox.apache.org/repos/asf/amoro.git
commit d38a201c6e2006d2ad95d6e56603e86fa694a9c2 Author: rfyu <[email protected]> AuthorDate: Sun Mar 24 15:42:57 2024 +0800 [AMORO-2612]Avoid task always being in scheduled state (#2613) * [AMORO-2612]Avoid task always being in scheduled state * Modify the filter conditions to allow resetting of abnormal tasks * modify condition * Modify test cases * add log --------- Co-authored-by: ZhouJinsong <[email protected]> (cherry picked from commit badf862a320a5c7382238c76d146af5eb0de0704) --- .../arctic/server/DefaultOptimizingService.java | 28 ++++++++++++---------- .../arctic/server/optimizing/OptimizingQueue.java | 4 ++++ .../server/TestDefaultOptimizingService.java | 5 +++- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java index 44dd82195..c6a1ee314 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java +++ b/ams/server/src/main/java/com/netease/arctic/server/DefaultOptimizingService.java @@ -52,6 +52,7 @@ import com.netease.arctic.server.table.TableRuntimeMeta; import com.netease.arctic.server.table.TableService; import com.netease.arctic.server.utils.Configurations; import com.netease.arctic.table.TableProperties; +import org.apache.commons.lang3.StringUtils; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -64,6 +65,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; @@ -220,7 +222,11 @@ public class DefaultOptimizingService extends StatedPersistentBase @Override public void completeTask(String authToken, OptimizingTaskResult taskResult) { - LOG.info("Optimizer {} complete task {}", authToken, taskResult.getTaskId()); + LOG.info( + "Optimizer {} (threadId {}) complete task {}", + authToken, + taskResult.getThreadId(), + taskResult.getTaskId()); OptimizingQueue queue = getQueueByToken(authToken); OptimizerThread thread = getAuthenticatedOptimizer(authToken).getThread(taskResult.getThreadId()); @@ -538,7 +544,7 @@ public class DefaultOptimizingService extends StatedPersistentBase .ifPresent( queue -> queue - .collectTasks(buildSuspendingPredication(token, isExpired)) + .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) .forEach(task -> retryTask(task, queue))); if (isExpired) { LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); @@ -563,17 +569,13 @@ public class DefaultOptimizingService extends StatedPersistentBase queue.retryTask(task); } - private Predicate<TaskRuntime> buildSuspendingPredication( - String token, boolean isOptimizerExpired) { - return task -> { - if (isOptimizerExpired) { - return TaskRuntime.Status.SUCCESS != task.getStatus() && token.equals(task.getToken()); - } else { - return token.equals(task.getToken()) - && task.getStatus() == TaskRuntime.Status.SCHEDULED - && task.getStartTime() + taskAckTimeout < System.currentTimeMillis(); - } - }; + private Predicate<TaskRuntime> buildSuspendingPredication(Set<String> activeTokens) { + return task -> + StringUtils.isNotBlank(task.getToken()) + && !activeTokens.contains(task.getToken()) + && task.getStatus() != TaskRuntime.Status.SUCCESS + || task.getStatus() == TaskRuntime.Status.SCHEDULED + && task.getStartTime() + taskAckTimeout < System.currentTimeMillis(); } } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java index 43ddc411c..79cc66986 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/OptimizingQueue.java @@ -449,6 +449,10 @@ public class OptimizingQueue extends PersistentBase { } } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) { if (taskRuntime.getRetry() < tableRuntime.getMaxExecuteRetryCount()) { + LOG.info( + "Put task {} to retry queue, because {}", + taskRuntime.getTaskId(), + taskRuntime.getFailReason()); retryTask(taskRuntime); } else { clearProcess(this); diff --git a/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java b/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java index 0cb80c9d9..9a6d880aa 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java +++ b/ams/server/src/test/java/com/netease/arctic/server/TestDefaultOptimizingService.java @@ -231,8 +231,11 @@ public class TestDefaultOptimizingService extends AMSTableTestBase { Assertions.assertThrows(PluginRetryAuthException.class, () -> optimizingService().touch(token)); Assertions.assertThrows( PluginRetryAuthException.class, () -> optimizingService().pollTask(token, THREAD_ID)); - assertTaskStatus(TaskRuntime.Status.PLANNED); + assertTaskStatus(TaskRuntime.Status.SCHEDULED); token = optimizingService().authenticate(buildRegisterInfo()); + toucher = new Toucher(); + Thread.sleep(1000); + assertTaskStatus(TaskRuntime.Status.PLANNED); OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); Assertions.assertEquals(task2, task); }
