zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1611406034


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -229,11 +231,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     private final BlocklistHandler blocklistHandler;
 
-    private final List<CompletableFuture<Collection<PartitionWithMetrics>>>
-            partitionWithMetricsOnTaskManagerFutures = new ArrayList<>();
+    private final Map<ResultPartitionID, PartitionWithMetrics> 
partitionWithMetricsOnTaskManagers =
+            new HashMap<>();
 
     private boolean fetchAndRetainPartitions = false;

Review Comment:
   Comments are needed to explain in what scenario it will be used?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1001,17 +1037,54 @@ private void 
fetchAndRetainPartitionWithMetricsOnTaskManager(ResourceID resource
         TaskManagerRegistration taskManager = 
registeredTaskManagers.get(resourceId);
         checkNotNull(taskManager);
 
-        partitionWithMetricsOnTaskManagerFutures.add(
-                taskManager
-                        .getTaskExecutorGateway()
-                        
.getAndRetainPartitionWithMetrics(jobGraph.getJobID()));
+        taskManager
+                .getTaskExecutorGateway()
+                .getAndRetainPartitionWithMetrics(jobGraph.getJobID())
+                .thenAccept(
+                        partitionWithMetrics -> {
+                            if (fetchAndRetainPartitions) {
+                                for (PartitionWithMetrics partitionWithMetric :
+                                        partitionWithMetrics) {
+                                    partitionWithMetricsOnTaskManagers.put(
+                                            partitionWithMetric
+                                                    .getPartition()
+                                                    .getResultPartitionID(),
+                                            partitionWithMetric);
+                                }
+                                checkPartitionOnTaskManagerReportFinished();
+                            } else {
+                                log.info(
+                                        "Received partition metrics from {} 
later. Releasing it.",

Review Comment:
   Received partition metrics from {} later. Releasing it. -> Received late 
report of partition metrics from {}. Release the partitions.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -1001,17 +1037,54 @@ private void 
fetchAndRetainPartitionWithMetricsOnTaskManager(ResourceID resource
         TaskManagerRegistration taskManager = 
registeredTaskManagers.get(resourceId);
         checkNotNull(taskManager);
 
-        partitionWithMetricsOnTaskManagerFutures.add(
-                taskManager
-                        .getTaskExecutorGateway()
-                        
.getAndRetainPartitionWithMetrics(jobGraph.getJobID()));
+        taskManager
+                .getTaskExecutorGateway()
+                .getAndRetainPartitionWithMetrics(jobGraph.getJobID())
+                .thenAccept(
+                        partitionWithMetrics -> {
+                            if (fetchAndRetainPartitions) {
+                                for (PartitionWithMetrics partitionWithMetric :
+                                        partitionWithMetrics) {
+                                    partitionWithMetricsOnTaskManagers.put(
+                                            partitionWithMetric
+                                                    .getPartition()
+                                                    .getResultPartitionID(),
+                                            partitionWithMetric);
+                                }
+                                checkPartitionOnTaskManagerReportFinished();
+                            } else {
+                                log.info(
+                                        "Received partition metrics from {} 
later. Releasing it.",
+                                        resourceId);
+
+                                taskManager
+                                        .getTaskExecutorGateway()
+                                        .releasePartitions(
+                                                jobGraph.getJobID(),
+                                                partitionWithMetrics.stream()
+                                                        
.map(PartitionWithMetrics::getPartition)
+                                                        .map(
+                                                                
ShuffleDescriptor
+                                                                        
::getResultPartitionID)
+                                                        
.collect(Collectors.toSet()));
+                            }
+                        });
     }
 
-    @Override
-    public void stopFetchAndRetainPartitionWithMetricsOnTaskManager() {
+    private void stopFetchAndRetainPartitionWithMetricsOnTaskManager() {
         fetchAndRetainPartitions = false;
     }
 
+    private void checkPartitionOnTaskManagerReportFinished() {
+        if (partitionOnTaskManagerReportFuture != null) {
+            if 
(partitionWithMetricsOnTaskManagers.keySet().equals(expectedReportPartitionIds)

Review Comment:
   It does not need to be `equals`. The received partitions can be a superset 
of `expectedReportPartitionIds` to finish the report stage earlier.
   
   Seems this case is not recovered by tests?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -229,11 +231,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
 
     private final BlocklistHandler blocklistHandler;
 
-    private final List<CompletableFuture<Collection<PartitionWithMetrics>>>
-            partitionWithMetricsOnTaskManagerFutures = new ArrayList<>();
+    private final Map<ResultPartitionID, PartitionWithMetrics> 
partitionWithMetricsOnTaskManagers =
+            new HashMap<>();
 
     private boolean fetchAndRetainPartitions = false;
 
+    private Set<ResultPartitionID> expectedReportPartitionIds;

Review Comment:
   These names need to be refined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to