gaoyunhaii commented on a change in pull request #14740:
URL: https://github.com/apache/flink/pull/14740#discussion_r570712494



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2132,23 +2123,41 @@ public boolean isForce() {
     }
 
     private void reportToStatsTracker(
-            PendingCheckpoint checkpoint, Map<ExecutionAttemptID, 
ExecutionVertex> tasks) {
+            PendingCheckpoint checkpoint,
+            Map<ExecutionAttemptID, ExecutionVertex> tasks,
+            List<Execution> finishedTasks) {
         if (statsTracker == null) {
             return;
         }
         Map<JobVertexID, Integer> vertices =
-                tasks.values().stream()
+                Stream.concat(
+                                tasks.values().stream(),
+                                
finishedTasks.stream().map(Execution::getVertex))
                         .map(ExecutionVertex::getJobVertex)
                         .distinct()
                         .collect(
                                 toMap(
                                         ExecutionJobVertex::getJobVertexId,
                                         ExecutionJobVertex::getParallelism));
-        checkpoint.setStatsCallback(
+
+        PendingCheckpointStats pendingCheckpointStats =
                 statsTracker.reportPendingCheckpoint(
                         checkpoint.getCheckpointID(),
                         checkpoint.getCheckpointTimestamp(),
                         checkpoint.getProps(),
-                        vertices));
+                        vertices);
+        checkpoint.setStatsCallback(pendingCheckpointStats);
+
+        reportFinishedTasks(pendingCheckpointStats, finishedTasks);
+    }
+
+    private void reportFinishedTasks(
+            PendingCheckpointStats pendingCheckpointStats, List<Execution> 
finishedTasks) {
+        long now = System.currentTimeMillis();
+        finishedTasks.forEach(
+                execution ->
+                        pendingCheckpointStats.reportSubtaskStats(

Review comment:
       Yes, currently it would report 0 for the metrics of finished tasks. 
   
   I think it would be desired since if we do not report these tasks, users 
would be not easy to know which tasks are finished when the checkpoint trigger, 
thus he could not easily distinguish the finished tasks with the tasks that 
indeed not report snapshot for some reason. We may also consider add another 
flag to indicate if a task is finished when triggering checkpoints in a 
separate issue. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to