[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709568#comment-15709568
 ] 

ASF GitHub Bot commented on FLINK-5069:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2809#discussion_r90312273
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
    @@ -957,4 +969,65 @@ public void run() {
                        }
                });
        }
    +
    +   // 
------------------------------------------------------------------------
    +   //  pending checkpoints stats metrics
    +   // 
------------------------------------------------------------------------
    +   private class PendingCheckpointStatGauge implements 
Gauge<List<PendingCheckpointStats>> {
    +
    +           @Override
    +           public List<PendingCheckpointStats> getValue() {
    +                   List<PendingCheckpointStats> pendingCheckpointStatsList 
= new ArrayList<>();
    +
    +                   for (PendingCheckpoint checkpoint : 
pendingCheckpoints.values()) {
    +                           long checkpointId = 
checkpoint.getCheckpointId();
    +                           long triggerTime = 
checkpoint.getCheckpointTimestamp();
    +                           int numberOfAckTasks = 
checkpoint.getNumberOfAcknowledgedTasks();
    +                           int numberOfNonAckTasks = 
checkpoint.getNumberOfNonAcknowledgedTasks();
    +
    +                           // Acknowledged tasks for double check 
NonAcknowledged tasks
    +                           Set<String> ackTasks = new HashSet<>();
    +                           for (TaskState taskState : 
checkpoint.getTaskStates().values()) {
    +                                   JobVertexID jobVertexID = 
taskState.getJobVertexID();
    +                                   for (int subtaskId : 
taskState.getSubtaskStates().keySet()) {
    +                                           ackTasks.add(jobVertexID + "_" 
+ subtaskId);
    +                                   }
    +                           }
    +
    +                           // Not yet Acknowledged tasks
    +                           Map<JobVertexID, Set<Integer>> nonAckTasks = 
new HashMap<>();
    --- End diff --
    
    Can we make use of `PendingCheckpointnotYetAcknowledgedTasks`?


> Pending checkpoint statistics gauge
> -----------------------------------
>
>                 Key: FLINK-5069
>                 URL: https://issues.apache.org/jira/browse/FLINK-5069
>             Project: Flink
>          Issue Type: Improvement
>          Components: Metrics
>            Reporter: zhuhaifeng
>            Assignee: zhuhaifeng
>            Priority: Minor
>             Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to