tillrohrmann commented on a change in pull request #8318: [FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph to it URL: https://github.com/apache/flink/pull/8318#discussion_r280433723
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ########## @@ -501,76 +432,21 @@ public void acknowledgeCheckpoint( final CheckpointMetrics checkpointMetrics, final TaskStateSnapshot checkpointState) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint( - jobID, - executionAttemptID, - checkpointId, - checkpointMetrics, - checkpointState); - - if (checkpointCoordinator != null) { - getRpcService().execute(() -> { - try { - checkpointCoordinator.receiveAcknowledgeMessage(ackMessage); - } catch (Throwable t) { - log.warn("Error while processing checkpoint acknowledgement message", t); - } - }); - } else { - String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator"; - if (executionGraph.getState() == JobStatus.RUNNING) { - log.error(errorMessage, jobGraph.getJobID()); - } else { - log.debug(errorMessage, jobGraph.getJobID()); - } - } + schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState); } // TODO: This method needs a leader session ID @Override public void declineCheckpoint(DeclineCheckpoint decline) { - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - - if (checkpointCoordinator != null) { - getRpcService().execute(() -> { - try { - checkpointCoordinator.receiveDeclineMessage(decline); - } catch (Exception e) { - log.error("Error in CheckpointCoordinator while processing {}", decline, e); - } - }); - } else { - String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator"; - if (executionGraph.getState() == JobStatus.RUNNING) { - log.error(errorMessage, jobGraph.getJobID()); - } else { - log.debug(errorMessage, jobGraph.getJobID()); - } - } + schedulerNG.declineCheckpoint(decline); } @Override public CompletableFuture<KvStateLocation> requestKvStateLocation(final JobID jobId, final String registrationName) { - // sanity check for the correct JobID - if (jobGraph.getJobID().equals(jobId)) { - if (log.isDebugEnabled()) { - log.debug("Lookup key-value state for job {} with registration " + - "name {}.", jobGraph.getJobID(), registrationName); - } - - final KvStateLocationRegistry registry = executionGraph.getKvStateLocationRegistry(); - final KvStateLocation location = registry.getKvStateLocation(registrationName); - if (location != null) { - return CompletableFuture.completedFuture(location); - } else { - return FutureUtils.completedExceptionally(new UnknownKvStateLocation(registrationName)); - } - } else { - if (log.isDebugEnabled()) { - log.debug("Request of key-value state location for unknown job {} received.", jobId); - } - return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + try { + return CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, registrationName)); + } catch (UnknownKvStateLocation | FlinkJobNotFoundException e) { Review comment: logging statement missing ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services