tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280433723
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -501,76 +432,21 @@ public void acknowledgeCheckpoint(
                        final CheckpointMetrics checkpointMetrics,
                        final TaskStateSnapshot checkpointState) {
 
-               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-               final AcknowledgeCheckpoint ackMessage = new 
AcknowledgeCheckpoint(
-                       jobID,
-                       executionAttemptID,
-                       checkpointId,
-                       checkpointMetrics,
-                       checkpointState);
-
-               if (checkpointCoordinator != null) {
-                       getRpcService().execute(() -> {
-                               try {
-                                       
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
-                               } catch (Throwable t) {
-                                       log.warn("Error while processing 
checkpoint acknowledgement message", t);
-                               }
-                       });
-               } else {
-                       String errorMessage = "Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator";
-                       if (executionGraph.getState() == JobStatus.RUNNING) {
-                               log.error(errorMessage, jobGraph.getJobID());
-                       } else {
-                               log.debug(errorMessage, jobGraph.getJobID());
-                       }
-               }
+               schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID, 
checkpointId, checkpointMetrics, checkpointState);
        }
 
        // TODO: This method needs a leader session ID
        @Override
        public void declineCheckpoint(DeclineCheckpoint decline) {
-               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-
-               if (checkpointCoordinator != null) {
-                       getRpcService().execute(() -> {
-                               try {
-                                       
checkpointCoordinator.receiveDeclineMessage(decline);
-                               } catch (Exception e) {
-                                       log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
-                               }
-                       });
-               } else {
-                       String errorMessage = "Received DeclineCheckpoint 
message for job {} with no CheckpointCoordinator";
-                       if (executionGraph.getState() == JobStatus.RUNNING) {
-                               log.error(errorMessage, jobGraph.getJobID());
-                       } else {
-                               log.debug(errorMessage, jobGraph.getJobID());
-                       }
-               }
+               schedulerNG.declineCheckpoint(decline);
        }
 
        @Override
        public CompletableFuture<KvStateLocation> requestKvStateLocation(final 
JobID jobId, final String registrationName) {
-               // sanity check for the correct JobID
-               if (jobGraph.getJobID().equals(jobId)) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Lookup key-value state for job {} 
with registration " +
-                                       "name {}.", jobGraph.getJobID(), 
registrationName);
-                       }
-
-                       final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
-                       final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
-                       if (location != null) {
-                               return 
CompletableFuture.completedFuture(location);
-                       } else {
-                               return FutureUtils.completedExceptionally(new 
UnknownKvStateLocation(registrationName));
-                       }
-               } else {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Request of key-value state location 
for unknown job {} received.", jobId);
-                       }
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+               try {
+                       return 
CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId, 
registrationName));
+               } catch (UnknownKvStateLocation | FlinkJobNotFoundException e) {
 
 Review comment:
   logging statement missing

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to