TisonKun commented on a change in pull request #7918: [FLINK-11846] Don't delete HA job files in case of duplicate job submission URL: https://github.com/apache/flink/pull/7918#discussion_r263179412
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -260,40 +260,50 @@ private void stopDispatcherServices() throws Exception { @Override public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) { - return internalSubmitJob(jobGraph).whenCompleteAsync((acknowledge, throwable) -> { - if (throwable != null) { - cleanUpJobData(jobGraph.getJobID(), true); + log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName()); + + try { + if (checkDuplicateJob(jobGraph.getJobID())) { + return FutureUtils.completedExceptionally( + new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted.")); + } else { + return internalSubmitJob(jobGraph); } - }, getRpcService().getExecutor()); + } catch (FlinkException e) { + return FutureUtils.completedExceptionally(e); + } } - private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) { - final JobID jobId = jobGraph.getJobID(); - - log.info("Submitting job {} ({}).", jobId, jobGraph.getName()); + private boolean checkDuplicateJob(JobID jobId) throws FlinkException { Review comment: Better to have javadoc about the meaning of return values. (A bit nonintuitive whether true if not duplicate or true if duplicate exists) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services