[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-12 Thread via GitHub


zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1192155460


##
flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java:
##
@@ -1289,4 +1293,59 @@ public static  CompletableFuture switchExecutor(
 },
 executor);
 }
+
+/**
+ * A serializable implementation of CompletableFuture.
+ *
+ * This class extends CompletableFuture and implements the Serializable 
interface to allow it
+ * to be serialized and deserialized. The result of the CompletableFuture 
is extracted and
+ * serialized when the object is written to a stream, and the result is 
set using the complete()
+ * method when the object is read from a stream.
+ *
+ * @param  the type of the result of the CompletableFuture
+ */
+public static class SerializableCompletableFuture extends 
CompletableFuture
+implements Serializable {
+private static final long serialVersionUID = 1L;
+private transient T result;
+
+public SerializableCompletableFuture(T value) {
+this.result = value;
+this.complete(value);
+}
+
+/**
+ * Writes this object to the given OutputStream. The result of the 
CompletableFuture is
+ * extracted and serialized along with the object.
+ *
+ * @param out the ObjectOutputStream to write to
+ * @throws IOException if an I/O error occurs
+ */
+private void writeObject(ObjectOutputStream out)
+throws IOException, ExecutionException, InterruptedException {
+out.defaultWriteObject();
+if (result == null) {
+result = this.get();

Review Comment:
   This `get()` is possible to block the main thread. Maybe we do not need to 
introduce a `SerializableCompletableFuture`, but instead modify the ErrorInfo 
to achieve the goal. e.g.
   * introduce two fields to ErrorInfo: `transient  
CompletableFuture> labelsFuture` as well as a `Map labels`
   * the `labels` will be set as soon as `labelsFuture` is completed
   * `ErrorInfo#getLabels()` returns a `Map`, which can be 
empty if `labels` is null and `labelsFuture` is not completed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-12 Thread via GitHub


zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1191985909


##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java:
##
@@ -121,6 +122,7 @@ private FailureHandlingResult handleFailure(
 failedExecution,
 new JobException("The failure is not recoverable", cause),
 timestamp,
+FailureEnricherUtils.EMPTY_LABELS,
 globalFailure);
 }
 

Review Comment:
   How about to do failure labeling in this method, all failures of 
DefaultScheduler will go through this path. And extra benefits are:
   1. FailureHandlingResult can host all kinds of labels, no matter it's global 
or task failure.
   2. Can simplify the modification of Execution. No need to modify 
fail()/markFailed()/etc.
   3. No need to change TaskExecutionStateTransition to temporarily host the 
labels.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java:
##
@@ -113,14 +115,15 @@ public AdaptiveBatchScheduler(
 final CheckpointRecoveryFactory checkpointRecoveryFactory,
 final JobManagerJobMetricGroup jobManagerJobMetricGroup,
 final SchedulingStrategyFactory schedulingStrategyFactory,
-final FailoverStrategy.Factory failoverStrategyFactory,
+final Factory failoverStrategyFactory,

Review Comment:
   I feel that it may make to harder to understand by making this change. If we 
want to make this change, it's better to factor the `Factory` out of 
`FailoverStrategy` and rename it to `FailoverStrategyFactory`.
   However, that may break current custom implementations of 
`FailoverStrategy.Factory`. Although it is not a public interface of Flink, I 
prefer to keep it as is if the change does not bring much benefits. 



##
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java:
##
@@ -366,7 +367,7 @@ void testAccumulatorsAndMetricsForwarding() throws 
Exception {
 
 /**
  * Verifies that {@link Execution#completeCancelling(Map, IOMetrics, 
boolean)} and {@link
- * Execution#markFailed(Throwable, boolean, Map, IOMetrics, boolean, 
boolean)} store the given
+ * Execution#markFailed(ErrorInfo, boolean, Map, IOMetrics, boolean, 
boolean)} store the given

Review Comment:
   Seems to be a mistake?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling

2023-05-10 Thread via GitHub


zhuzhurk commented on code in PR #22506:
URL: https://github.com/apache/flink/pull/22506#discussion_r1189766188


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -473,26 +500,50 @@ public CompletableFuture cancel(Time 
timeout) {
 @Override
 public CompletableFuture updateTaskExecutionState(
 final TaskExecutionState taskExecutionState) {
-FlinkException taskExecutionException;
+checkNotNull(taskExecutionState, "taskExecutionState");
+// Use the main/caller thread for all updates to make sure they are 
processed in order.
+// (MainThreadExecutor i.e., the akka thread pool does not guarantee 
that)
+// Only detach for a FAILED state update that is terminal and may 
perform io heavy labeling.
+if 
(ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) {
+return labelFailure(taskExecutionState)
+.thenApplyAsync(
+taskStateWithLabels -> {
+try {
+return 
doUpdateTaskExecutionState(taskStateWithLabels);
+} catch (FlinkException e) {
+throw new CompletionException(e);
+}
+},
+getMainThreadExecutor());
+}
 try {
-checkNotNull(taskExecutionState, "taskExecutionState");
+return CompletableFuture.completedFuture(
+doUpdateTaskExecutionState(taskExecutionState));
+} catch (FlinkException e) {
+return FutureUtils.completedExceptionally(e);
+}
+}
 
+private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState 
taskExecutionState)
+throws FlinkException {
+@Nullable FlinkException taskExecutionException;
+try {
 if (schedulerNG.updateTaskExecutionState(taskExecutionState)) {

Review Comment:
   Thanks for updating the PR. @pgaref 
   I'm actually not against that restart strategies use failure labels. I'm 
just hope that we can avoid some complication if possible. Totally +1 to the 
concern about the risk to change some synchronous process into asynchronous 
ones. But given that it is not avoidable, considering labeling failure from 
with the JM, I prefer to do it in a common path. Absolutely later we need to 
carefully review and test it.
   Sorry that I did not have time to review it right now. I may take a look 
tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org