[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling
zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1192155460 ## flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java: ## @@ -1289,4 +1293,59 @@ public static CompletableFuture switchExecutor( }, executor); } + +/** + * A serializable implementation of CompletableFuture. + * + * This class extends CompletableFuture and implements the Serializable interface to allow it + * to be serialized and deserialized. The result of the CompletableFuture is extracted and + * serialized when the object is written to a stream, and the result is set using the complete() + * method when the object is read from a stream. + * + * @param the type of the result of the CompletableFuture + */ +public static class SerializableCompletableFuture extends CompletableFuture +implements Serializable { +private static final long serialVersionUID = 1L; +private transient T result; + +public SerializableCompletableFuture(T value) { +this.result = value; +this.complete(value); +} + +/** + * Writes this object to the given OutputStream. The result of the CompletableFuture is + * extracted and serialized along with the object. + * + * @param out the ObjectOutputStream to write to + * @throws IOException if an I/O error occurs + */ +private void writeObject(ObjectOutputStream out) +throws IOException, ExecutionException, InterruptedException { +out.defaultWriteObject(); +if (result == null) { +result = this.get(); Review Comment: This `get()` is possible to block the main thread. Maybe we do not need to introduce a `SerializableCompletableFuture`, but instead modify the ErrorInfo to achieve the goal. e.g. * introduce two fields to ErrorInfo: `transient CompletableFuture> labelsFuture` as well as a `Map labels` * the `labels` will be set as soon as `labelsFuture` is completed * `ErrorInfo#getLabels()` returns a `Map`, which can be empty if `labels` is null and `labelsFuture` is not completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling
zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1191985909 ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java: ## @@ -121,6 +122,7 @@ private FailureHandlingResult handleFailure( failedExecution, new JobException("The failure is not recoverable", cause), timestamp, +FailureEnricherUtils.EMPTY_LABELS, globalFailure); } Review Comment: How about to do failure labeling in this method, all failures of DefaultScheduler will go through this path. And extra benefits are: 1. FailureHandlingResult can host all kinds of labels, no matter it's global or task failure. 2. Can simplify the modification of Execution. No need to modify fail()/markFailed()/etc. 3. No need to change TaskExecutionStateTransition to temporarily host the labels. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java: ## @@ -113,14 +115,15 @@ public AdaptiveBatchScheduler( final CheckpointRecoveryFactory checkpointRecoveryFactory, final JobManagerJobMetricGroup jobManagerJobMetricGroup, final SchedulingStrategyFactory schedulingStrategyFactory, -final FailoverStrategy.Factory failoverStrategyFactory, +final Factory failoverStrategyFactory, Review Comment: I feel that it may make to harder to understand by making this change. If we want to make this change, it's better to factor the `Factory` out of `FailoverStrategy` and rename it to `FailoverStrategyFactory`. However, that may break current custom implementations of `FailoverStrategy.Factory`. Although it is not a public interface of Flink, I prefer to keep it as is if the change does not bring much benefits. ## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java: ## @@ -366,7 +367,7 @@ void testAccumulatorsAndMetricsForwarding() throws Exception { /** * Verifies that {@link Execution#completeCancelling(Map, IOMetrics, boolean)} and {@link - * Execution#markFailed(Throwable, boolean, Map, IOMetrics, boolean, boolean)} store the given + * Execution#markFailed(ErrorInfo, boolean, Map, IOMetrics, boolean, boolean)} store the given Review Comment: Seems to be a mistake? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] zhuzhurk commented on a diff in pull request #22506: [FLINK-31890][runtime] Introduce SchedulerBase per-task failure enrichment/labeling
zhuzhurk commented on code in PR #22506: URL: https://github.com/apache/flink/pull/22506#discussion_r1189766188 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -473,26 +500,50 @@ public CompletableFuture cancel(Time timeout) { @Override public CompletableFuture updateTaskExecutionState( final TaskExecutionState taskExecutionState) { -FlinkException taskExecutionException; +checkNotNull(taskExecutionState, "taskExecutionState"); +// Use the main/caller thread for all updates to make sure they are processed in order. +// (MainThreadExecutor i.e., the akka thread pool does not guarantee that) +// Only detach for a FAILED state update that is terminal and may perform io heavy labeling. +if (ExecutionState.FAILED.equals(taskExecutionState.getExecutionState())) { +return labelFailure(taskExecutionState) +.thenApplyAsync( +taskStateWithLabels -> { +try { +return doUpdateTaskExecutionState(taskStateWithLabels); +} catch (FlinkException e) { +throw new CompletionException(e); +} +}, +getMainThreadExecutor()); +} try { -checkNotNull(taskExecutionState, "taskExecutionState"); +return CompletableFuture.completedFuture( +doUpdateTaskExecutionState(taskExecutionState)); +} catch (FlinkException e) { +return FutureUtils.completedExceptionally(e); +} +} +private Acknowledge doUpdateTaskExecutionState(final TaskExecutionState taskExecutionState) +throws FlinkException { +@Nullable FlinkException taskExecutionException; +try { if (schedulerNG.updateTaskExecutionState(taskExecutionState)) { Review Comment: Thanks for updating the PR. @pgaref I'm actually not against that restart strategies use failure labels. I'm just hope that we can avoid some complication if possible. Totally +1 to the concern about the risk to change some synchronous process into asynchronous ones. But given that it is not avoidable, considering labeling failure from with the JM, I prefer to do it in a common path. Absolutely later we need to carefully review and test it. Sorry that I did not have time to review it right now. I may take a look tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org