GJL commented on a change in pull request #6678: [FLINK-10255] Only react to onAddedJobGraph signal when being leader URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -879,24 +917,66 @@ public void handleError(final Exception exception) { @Override public void onAddedJobGraph(final JobID jobId) { - final CompletableFuture<SubmittedJobGraph> recoveredJob = getRpcService().execute( - () -> submittedJobGraphStore.recoverJobGraph(jobId)); - - final CompletableFuture<Acknowledge> submissionFuture = recoveredJob.thenComposeAsync( - (SubmittedJobGraph submittedJobGraph) -> submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT), - getMainThreadExecutor()); - - submissionFuture.whenComplete( - (Acknowledge acknowledge, Throwable throwable) -> { - if (throwable != null) { - onFatalError( - new DispatcherException( - String.format("Could not start the added job %s", jobId), - ExceptionUtils.stripCompletionException(throwable))); + runAsync( + () -> { + if (!jobManagerRunners.containsKey(jobId)) { + final CompletableFuture<JobGraph> recoveredJob = recoveryOperation.thenApplyAsync( + ignored -> { + try { + return recoverJob(jobId); + } catch (Exception e) { + ExceptionUtils.rethrow(e); + } + return null; + }, + getRpcService().getExecutor()); + + final DispatcherId dispatcherId = getFencingToken(); + final CompletableFuture<Void> submissionFuture = recoveredJob.thenComposeAsync( + (FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId) + .thenAcceptAsync( + (ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> { Review comment: Imo we are doing this wrong. The code would be much more readible with static factory methods: ``` /** * {@link Consumer} that can throw checked exceptions. */ @FunctionalInterface public interface CheckedConsumer<T> { void checkedAccept(T t) throws Exception; static <T> Consumer<T> unchecked(CheckedConsumer<T> checkedConsumer) { return (t) -> { try { checkedConsumer.checkedAccept(t); } catch (Exception e) { ExceptionUtils.rethrow(e); } }; } } ``` This allows for: ``` .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> { ... }); ``` No casts are required. Also when interacting with the Java API, it does not matter what exact type of exception can be thrown – what matters is that the checked exception becomes a unchecked. We do not need to generify the exception type in `ConsumerWithException`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services