Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5107#discussion_r155245759 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -507,6 +514,41 @@ public void handleError(final Exception exception) { onFatalError(new DispatcherException("Received an error from the LeaderElectionService.", exception)); } + //------------------------------------------------------ + // SubmittedJobGraphListener + //------------------------------------------------------ + + @Override + public void onAddedJobGraph(final JobID jobId) { + getRpcService().execute(() -> { + final SubmittedJobGraph submittedJobGraph; + try { + submittedJobGraph = submittedJobGraphStore.recoverJobGraph(jobId); + } catch (final Exception e) { + log.error("Could not recover job graph for job {}.", jobId, e); + return; + } + runAsync(() -> { + if (!jobManagerRunners.containsKey(jobId)) { --- End diff -- Removed.
---