zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1184723091


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -473,26 +500,50 @@ public CompletableFuture<Acknowledge> cancel(Time 
timeout) {
     @Override
     public CompletableFuture<Acknowledge> updateTaskExecutionState(
             final TaskExecutionState taskExecutionState) {
-        FlinkException taskExecutionException;
+        checkNotNull(taskExecutionState, "taskExecutionState");
+        // Use the main/caller thread for all updates to make sure they are 
processed in order.
+        // (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+        // Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+        if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+            return labelFailure(taskExecutionState)
+                    .thenApplyAsync(
+                            taskStateWithLabels -> {
+                                try {
+                                    return 
doUpdateTaskExecutionState(taskStateWithLabels);
+                                } catch (FlinkException e) {
+                                    throw new CompletionException(e);
+                                }
+                            },
+                            getMainThreadExecutor());
+        }
         try {
-            checkNotNull(taskExecutionState, "taskExecutionState");
+            return CompletableFuture.completedFuture(
+                    doUpdateTaskExecutionState(taskExecutionState));
+        } catch (FlinkException e) {
+            return FutureUtils.completedExceptionally(e);
+        }
+    }
 
+    private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+            throws FlinkException {
+        @Nullable FlinkException taskExecutionException;
+        try {
             if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   This is not the only entry to notify a task failure to the scheduler.  There 
are other entries, e.g. 
`UpdateSchedulerNgOnInternalFailuresListener#notifyTaskFailure(...)`. 
   
   These task failures may originate from within the JobMaster, e.g. from the 
source coordinators, from the scheduling process. Seems these failures are not 
labeled.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java:
##########
@@ -98,4 +112,13 @@ public String getExceptionAsString() {
     public long getTimestamp() {
         return timestamp;
     }
+
+    /**
+     * Returns the labels associated with the exception.
+     *
+     * @return Map of exception labels
+     */
+    public Map<String, String> getLabels() {
+        return labels;

Review Comment:
   It's better to make it an unmodifiable map.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -778,7 +778,7 @@ private static PartitionInfo createFinishedPartitionInfo(
      */
     @Override
     public void fail(Throwable t) {
-        processFail(t, true);
+        processFail(t, true, Collections.emptyMap());

Review Comment:
   Why not enriching this failure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to