zhuzhurk commented on code in PR #24771: URL: https://github.com/apache/flink/pull/24771#discussion_r1611406034
########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -229,11 +231,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> private final BlocklistHandler blocklistHandler; - private final List<CompletableFuture<Collection<PartitionWithMetrics>>> - partitionWithMetricsOnTaskManagerFutures = new ArrayList<>(); + private final Map<ResultPartitionID, PartitionWithMetrics> partitionWithMetricsOnTaskManagers = + new HashMap<>(); private boolean fetchAndRetainPartitions = false; Review Comment: Comments are needed to explain in what scenario it will be used? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -1001,17 +1037,54 @@ private void fetchAndRetainPartitionWithMetricsOnTaskManager(ResourceID resource TaskManagerRegistration taskManager = registeredTaskManagers.get(resourceId); checkNotNull(taskManager); - partitionWithMetricsOnTaskManagerFutures.add( - taskManager - .getTaskExecutorGateway() - .getAndRetainPartitionWithMetrics(jobGraph.getJobID())); + taskManager + .getTaskExecutorGateway() + .getAndRetainPartitionWithMetrics(jobGraph.getJobID()) + .thenAccept( + partitionWithMetrics -> { + if (fetchAndRetainPartitions) { + for (PartitionWithMetrics partitionWithMetric : + partitionWithMetrics) { + partitionWithMetricsOnTaskManagers.put( + partitionWithMetric + .getPartition() + .getResultPartitionID(), + partitionWithMetric); + } + checkPartitionOnTaskManagerReportFinished(); + } else { + log.info( + "Received partition metrics from {} later. Releasing it.", Review Comment: Received partition metrics from {} later. Releasing it. -> Received late report of partition metrics from {}. Release the partitions. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -1001,17 +1037,54 @@ private void fetchAndRetainPartitionWithMetricsOnTaskManager(ResourceID resource TaskManagerRegistration taskManager = registeredTaskManagers.get(resourceId); checkNotNull(taskManager); - partitionWithMetricsOnTaskManagerFutures.add( - taskManager - .getTaskExecutorGateway() - .getAndRetainPartitionWithMetrics(jobGraph.getJobID())); + taskManager + .getTaskExecutorGateway() + .getAndRetainPartitionWithMetrics(jobGraph.getJobID()) + .thenAccept( + partitionWithMetrics -> { + if (fetchAndRetainPartitions) { + for (PartitionWithMetrics partitionWithMetric : + partitionWithMetrics) { + partitionWithMetricsOnTaskManagers.put( + partitionWithMetric + .getPartition() + .getResultPartitionID(), + partitionWithMetric); + } + checkPartitionOnTaskManagerReportFinished(); + } else { + log.info( + "Received partition metrics from {} later. Releasing it.", + resourceId); + + taskManager + .getTaskExecutorGateway() + .releasePartitions( + jobGraph.getJobID(), + partitionWithMetrics.stream() + .map(PartitionWithMetrics::getPartition) + .map( + ShuffleDescriptor + ::getResultPartitionID) + .collect(Collectors.toSet())); + } + }); } - @Override - public void stopFetchAndRetainPartitionWithMetricsOnTaskManager() { + private void stopFetchAndRetainPartitionWithMetricsOnTaskManager() { fetchAndRetainPartitions = false; } + private void checkPartitionOnTaskManagerReportFinished() { + if (partitionOnTaskManagerReportFuture != null) { + if (partitionWithMetricsOnTaskManagers.keySet().equals(expectedReportPartitionIds) Review Comment: It does not need to be `equals`. The received partitions can be a superset of `expectedReportPartitionIds` to finish the report stage earlier. Seems this case is not recovered by tests? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -229,11 +231,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> private final BlocklistHandler blocklistHandler; - private final List<CompletableFuture<Collection<PartitionWithMetrics>>> - partitionWithMetricsOnTaskManagerFutures = new ArrayList<>(); + private final Map<ResultPartitionID, PartitionWithMetrics> partitionWithMetricsOnTaskManagers = + new HashMap<>(); private boolean fetchAndRetainPartitions = false; + private Set<ResultPartitionID> expectedReportPartitionIds; Review Comment: These names need to be refined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org