tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432648
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -437,43 +391,20 @@ public JobMaster(
                        final IntermediateDataSetID intermediateResultId,
                        final ResultPartitionID resultPartitionId) {
 
-               final Execution execution = 
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
-               if (execution != null) {
-                       return 
CompletableFuture.completedFuture(execution.getState());
-               }
-               else {
-                       final IntermediateResult intermediateResult =
-                                       
executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
-                       if (intermediateResult != null) {
-                               // Try to find the producing execution
-                               Execution producerExecution = intermediateResult
-                                               
.getPartitionById(resultPartitionId.getPartitionId())
-                                               .getProducer()
-                                               .getCurrentExecutionAttempt();
-
-                               if 
(producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
-                                       return 
CompletableFuture.completedFuture(producerExecution.getState());
-                               } else {
-                                       return 
FutureUtils.completedExceptionally(new 
PartitionProducerDisposedException(resultPartitionId));
-                               }
-                       } else {
-                               return FutureUtils.completedExceptionally(new 
IllegalArgumentException("Intermediate data set with ID "
-                                               + intermediateResultId + " not 
found."));
-                       }
+               try {
+                       return 
CompletableFuture.completedFuture(schedulerNG.requestPartitionState(intermediateResultId,
 resultPartitionId));
+               } catch (PartitionProducerDisposedException e) {
 
 Review comment:
   I would also log the exception here before it leaves the component.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to