GJL commented on a change in pull request #6678: [FLINK-10255] Only react to 
onAddedJobGraph signal when being leader
URL: https://github.com/apache/flink/pull/6678#discussion_r217082586
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 ##########
 @@ -879,24 +917,66 @@ public void handleError(final Exception exception) {
 
        @Override
        public void onAddedJobGraph(final JobID jobId) {
-               final CompletableFuture<SubmittedJobGraph> recoveredJob = 
getRpcService().execute(
-                       () -> submittedJobGraphStore.recoverJobGraph(jobId));
-
-               final CompletableFuture<Acknowledge> submissionFuture = 
recoveredJob.thenComposeAsync(
-                       (SubmittedJobGraph submittedJobGraph) -> 
submitJob(submittedJobGraph.getJobGraph(), RpcUtils.INF_TIMEOUT),
-                       getMainThreadExecutor());
-
-               submissionFuture.whenComplete(
-                       (Acknowledge acknowledge, Throwable throwable) -> {
-                               if (throwable != null) {
-                                       onFatalError(
-                                               new DispatcherException(
-                                                       String.format("Could 
not start the added job %s", jobId),
-                                                       
ExceptionUtils.stripCompletionException(throwable)));
+               runAsync(
+                       () -> {
+                               if (!jobManagerRunners.containsKey(jobId)) {
+                                       final CompletableFuture<JobGraph> 
recoveredJob = recoveryOperation.thenApplyAsync(
+                                               ignored -> {
+                                                       try {
+                                                               return 
recoverJob(jobId);
+                                                       } catch (Exception e) {
+                                                               
ExceptionUtils.rethrow(e);
+                                                       }
+                                                       return null;
+                                               },
+                                               getRpcService().getExecutor());
+
+                                       final DispatcherId dispatcherId = 
getFencingToken();
+                                       final CompletableFuture<Void> 
submissionFuture = recoveredJob.thenComposeAsync(
+                                               
(FunctionWithThrowable<JobGraph, CompletableFuture<Void>, Exception>) (JobGraph 
jobGraph) -> tryRunRecoveredJobGraph(jobGraph, dispatcherId)
+                                                       .thenAcceptAsync(
+                                                               
(ConsumerWithException<Boolean, Exception>) (Boolean isRecoveredJobRunning) -> {
 
 Review comment:
   Imo we are doing this wrong. The code would be much more readible with 
static factory methods:
   
   ```
   
   /**
    * {@link Consumer} that can throw checked exceptions.
    */
   @FunctionalInterface
   public interface CheckedConsumer<T> {
   
        void checkedAccept(T t) throws Exception;
   
        static <T> Consumer<T> unchecked(CheckedConsumer<T> checkedConsumer) {
                return (t) -> {
                        try {
                                checkedConsumer.checkedAccept(t);
                        } catch (Exception e) {
                                ExceptionUtils.rethrow(e);
                        }
                };
        }
   }
   ```
   This allows for:
   ```
   .thenAcceptAsync(CheckedConsumer.unchecked(isRecoveredJobRunning -> {
        ...
   }));
   ...
   ``` 
   No casts are required. Also when interacting with the Java API, it does not 
matter what exact type of exception can be thrown – what matters is that the 
checked exception becomes unchecked. We do not need to generify the exception 
type in `ConsumerWithException`. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to