pgaref commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1188101108


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time 
timeout) {
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        FlinkException taskExecutionException;
+        checkNotNull(taskExecutionState, "taskExecutionState");
+        // Use the main/caller thread for all updates to make sure they are 
processed in order.
+        // (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+        // Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+        if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+            return labelFailure(taskExecutionState)
+                    .thenApplyAsync(
+                            taskStateWithLabels -> {
+                                try {
+                                    return 
doUpdateTaskExecutionState(taskStateWithLabels);
+                                } catch (FlinkException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            getMainThreadExecutor());
+        }
         try {
-            checkNotNull(taskExecutionState, "taskExecutionState");
+            return CompletableFuture.completedFuture(
+                    doUpdateTaskExecutionState(taskExecutionState));
+        } catch (FlinkException e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
 
+    private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+            throws FlinkException {
+        @Nullable FlinkException taskExecutionException;
+        try {
             if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   Looks like the main decision we have to take here is if the failure-labels 
going to be used by restart strategies or not -- relying on enrichment for 
restarts is what makes it crucial.
   
   The existing implementation was [based on the 
assumption](https://lists.apache.org/thread/tq8yrncg7zqtpc8ddpxrkxfpovs1wkkw) 
that labels are going to be used by the custom restart strategies in the 
future. Since we wanted them asynchronous, the less risky way was through 
existing async calls e.g., `JobMaster#updateTaskExecutionState`, and probably 
modifying the InternalFailuresListener (rather than changing SchedulerNG update 
state to async).
   
   Deciding the failure enrichment is crucial enough to be synchronous -- maybe 
part of `DefaultScheduler#restartTasksWithDelay`--  is also an option. 
   
   However, decoupling failure labels completely from restart strategies sounds 
like a step back here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to