zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1186678825


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time 
timeout) {
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        FlinkException taskExecutionException;
+        checkNotNull(taskExecutionState, "taskExecutionState");
+        // Use the main/caller thread for all updates to make sure they are 
processed in order.
+        // (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+        // Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+        if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+            return labelFailure(taskExecutionState)
+                    .thenApplyAsync(
+                            taskStateWithLabels -> {
+                                try {
+                                    return 
doUpdateTaskExecutionState(taskStateWithLabels);
+                                } catch (FlinkException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            getMainThreadExecutor());
+        }
         try {
-            checkNotNull(taskExecutionState, "taskExecutionState");
+            return CompletableFuture.completedFuture(
+                    doUpdateTaskExecutionState(taskExecutionState));
+        } catch (FlinkException e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
 
+    private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+            throws FlinkException {
+        @Nullable FlinkException taskExecutionException;
+        try {
             if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   > There has been a consensus that people want to use this for implementing 
custom restart strategies 
   
   If we expect failure enricher to be a crucial part, I would doubt whether we 
should make it an asynchronous process. Before the enrichment is done, a failed 
job cannot recover, which may last for minutes. 
   
   Also considering future custom restart strategy, one open question is it 
should act in an asynchronous way, or in a synchronous way to simplify 
implementation? If taking the former option, a simpler implementation("not 
treating failure labeling a critical process at the moment") may be more 
friendlier for future development(less likely to be changed over and over). If 
taking the latter option, should the failure enrichment also be synchronous?
   
   Therefore I hesitate to complicate the implementation at the moment before 
the FLIP of custom restart strategy is finalized.
   The concern was raised by not answered because custom restart strategy was 
excluded from the FLIP discussion.
   https://lists.apache.org/thread/zoyltjb3k4v9zsbczm8yb4zrw659540v



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to