[
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709571#comment-15709571
]
ASF GitHub Bot commented on FLINK-5069:
---------------------------------------
Github user greghogan commented on a diff in the pull request:
https://github.com/apache/flink/pull/2809#discussion_r90305721
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/PendingCheckpointStats.java
---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.stats;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Statistics for a pending checkpoint.
+ */
+public class PendingCheckpointStats implements Serializable {
+
+ /** ID of the checkpoint. */
+ private final long checkpointId;
+
+ /** Timestamp when the checkpoint was triggered. */
+ private final long triggerTimestamp;
+
+ /** Number Of Acknowledged Tasks. */
+ private final int numberOfAcknowledgedTasks;
+
+ /** Number Of Not yet Acknowledged Tasks. */
+ private final int numberOfNonAcknowledgedTasks;
+
+ /** Not yet Acknowledged Tasks. */
+ private final Map<JobVertexID, Set<Integer>> notYetAcknowledgedTasks;
+
+ /**
+ * Creates a pending checkpoint statistic.
+ *
+ * @param checkpointId Checkpoint ID
+ * @param triggerTimestamp Timestamp when the checkpoint
was triggered
+ * @param numberOfAcknowledgedTasks Number Of Acknowledged Tasks
+ * @param numberOfNonAcknowledgedTasks Number Of Not yet Acknowledged
Tasks
+ * @param notYetAcknowledgedTasks Not yet Acknowledged Tasks
+ */
+ public PendingCheckpointStats(
+ long checkpointId,
+ long triggerTimestamp,
+ int numberOfAcknowledgedTasks,
+ int numberOfNonAcknowledgedTasks,
+ Map<JobVertexID, Set<Integer>> notYetAcknowledgedTasks) {
+
+ this.checkpointId = checkpointId;
+ this.triggerTimestamp = triggerTimestamp;
+ this.numberOfAcknowledgedTasks = numberOfAcknowledgedTasks;
+ this.numberOfNonAcknowledgedTasks =
numberOfNonAcknowledgedTasks;
+ this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+ }
+
+ /**
+ * Returns the ID of the checkpoint.
+ *
+ * @return ID of the checkpoint.
+ */
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ /**
+ * Returns the timestamp when the checkpoint was triggered.
+ *
+ * @return Timestamp when the checkpoint was triggered.
+ */
+ public long getTriggerTimestamp() {
+ return triggerTimestamp;
+ }
+
+ /**
+ * Returns the number of acknowledged tasks of the checkpoint.
+ *
+ * @return Number Of acknowledged tasks of the checkpoint.
+ */
+ public int getNumberOfAcknowledgedTasks() {
+ return numberOfAcknowledgedTasks;
+ }
+
+ /**
+ * Returns the number of not yet acknowledged tasks of the checkpoint.
+ *
+ * @return Number of not yet acknowledged tasks of the checkpoint.
+ */
+ public int getNumberOfNonAcknowledgedTasks() {
+ return numberOfNonAcknowledgedTasks;
+ }
+
+ /**
+ * Returns the not yet acknowledged tasks of the checkpoint.
+ *
+ * @return Not yet acknowledged tasks of the checkpoint.
+ */
+ public Map<JobVertexID, Set<Integer>> getNotYetAcknowledgedTasks() {
+ return notYetAcknowledgedTasks;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Checkpoint{ID=").append(checkpointId).
+ append(",triggerTime=").append(triggerTimestamp).
+
append(",numOfAckTasks=").append(numberOfAcknowledgedTasks).
+
append(",numOfNonAckTasks=").append(numberOfNonAcknowledgedTasks).
+ append(",nonAckTasks=[");
+
+ Iterator<Entry<JobVertexID, Set<Integer>>> nonAckTasks =
notYetAcknowledgedTasks.entrySet().iterator();
+ while (nonAckTasks.hasNext()) {
+ Entry<JobVertexID, Set<Integer>> jobVertex =
nonAckTasks.next();
+
+ sb.append("JobVertex{ID=").append(jobVertex.getKey()).
+
append(",subtaskIDs=").append(jobVertex.getValue()).append("}");
+
+ if (nonAckTasks.hasNext()) {
+ sb.append(",");
+ }
+ }
+
+ sb.append("]}");
+
+ return sb.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
--- End diff --
Why override equals / hashCode?
> Pending checkpoint statistics gauge
> -----------------------------------
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
> Issue Type: Improvement
> Components: Metrics
> Reporter: zhuhaifeng
> Assignee: zhuhaifeng
> Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the
> checkpoint appears not to be completed, this metric would help to get the
> state of a pending checkpoint , e.g which task did not complete the
> checkpoint.
> The statistic will be as the follows:
> checkpointID,
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks,
> The detail of not yet acknowledged JobVertexID, taskID
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)