Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4763#discussion_r143505801
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointStatistics.java
 ---
    @@ -0,0 +1,534 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.messages.checkpoints;
    +
    +import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
    +import org.apache.flink.runtime.checkpoint.TaskStateStats;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.rest.messages.ResponseBody;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
    +import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
    +import org.apache.flink.util.Preconditions;
    +
    +import com.fasterxml.jackson.annotation.JsonCreator;
    +import com.fasterxml.jackson.annotation.JsonInclude;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonSubTypes;
    +import com.fasterxml.jackson.annotation.JsonTypeInfo;
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
    +import com.fasterxml.jackson.databind.annotation.JsonSerialize;
    +
    +import javax.annotation.Nullable;
    +
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +
    +/**
    + * Statistics for a checkpoint.
    + */
    +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = 
JsonTypeInfo.As.PROPERTY, property = "@class")
    +@JsonSubTypes({
    +   @JsonSubTypes.Type(value = 
CheckpointStatistics.CompletedCheckpointStatistics.class, name = "completed"),
    +   @JsonSubTypes.Type(value = 
CheckpointStatistics.FailedCheckpointStatistics.class, name = "failed")})
    +@JsonInclude(JsonInclude.Include.NON_NULL)
    +public class CheckpointStatistics implements ResponseBody {
    +
    +   public static final String FIELD_NAME_ID = "id";
    +
    +   public static final String FIELD_NAME_STATUS = "status";
    +
    +   public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
    +
    +   public static final String FIELD_NAME_TRIGGER_TIMESTAMP = 
"trigger_timestamp";
    +
    +   public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
    +
    +   public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +   public static final String FIELD_NAME_DURATION = "end_to_end_duration";
    +
    +   public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
    +
    +   public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
    +
    +   public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
    +
    +   public static final String FIELD_NAME_TASKS = "tasks";
    +
    +   @JsonProperty(FIELD_NAME_ID)
    +   private final long id;
    +
    +   @JsonProperty(FIELD_NAME_STATUS)
    +   private final CheckpointStatsStatus status;
    +
    +   @JsonProperty(FIELD_NAME_IS_SAVEPOINT)
    +   private final boolean savepoint;
    +
    +   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
    +   private final long triggerTimestamp;
    +
    +   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +   private final long latestAckTimestamp;
    +
    +   @JsonProperty(FIELD_NAME_STATE_SIZE)
    +   private final long stateSize;
    +
    +   @JsonProperty(FIELD_NAME_DURATION)
    +   private final long duration;
    +
    +   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +   private final long alignmentBuffered;
    +
    +   @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +   private final int numSubtasks;
    +
    +   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +   private final int numAckSubtasks;
    +
    +   @JsonProperty(FIELD_NAME_TASKS)
    +   @JsonSerialize(keyUsing = JobVertexIDSerializer.class)
    +   @Nullable
    +   private final Map<JobVertexID, TaskCheckpointStatistics> 
checkpointStatisticsPerTask;
    +
    +   @JsonCreator
    +   private CheckpointStatistics(
    +                   @JsonProperty(FIELD_NAME_ID) long id,
    +                   @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
    +                   @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
    +                   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
    +                   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
    +                   @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +                   @JsonProperty(FIELD_NAME_DURATION) long duration,
    +                   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
    +                   @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +                   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
    +                   @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable 
Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask) {
    +           this.id = id;
    +           this.status = Preconditions.checkNotNull(status);
    +           this.savepoint = savepoint;
    +           this.triggerTimestamp = triggerTimestamp;
    +           this.latestAckTimestamp = latestAckTimestamp;
    +           this.stateSize = stateSize;
    +           this.duration = duration;
    +           this.alignmentBuffered = alignmentBuffered;
    +           this.numSubtasks = numSubtasks;
    +           this.numAckSubtasks = numAckSubtasks;
    +           this.checkpointStatisticsPerTask = checkpointStatisticsPerTask;
    +   }
    +
    +   public long getId() {
    +           return id;
    +   }
    +
    +   public CheckpointStatsStatus getStatus() {
    +           return status;
    +   }
    +
    +   public boolean isSavepoint() {
    +           return savepoint;
    +   }
    +
    +   public long getTriggerTimestamp() {
    +           return triggerTimestamp;
    +   }
    +
    +   public long getLatestAckTimestamp() {
    +           return latestAckTimestamp;
    +   }
    +
    +   public long getStateSize() {
    +           return stateSize;
    +   }
    +
    +   public long getDuration() {
    +           return duration;
    +   }
    +
    +   public long getAlignmentBuffered() {
    +           return alignmentBuffered;
    +   }
    +
    +   public int getNumSubtasks() {
    +           return numSubtasks;
    +   }
    +
    +   public int getNumAckSubtasks() {
    +           return numAckSubtasks;
    +   }
    +
    +   @Nullable
    +   public Map<JobVertexID, TaskCheckpointStatistics> 
getCheckpointStatisticsPerTask() {
    +           return checkpointStatisticsPerTask;
    +   }
    +
    +   @Override
    +   public boolean equals(Object o) {
    +           if (this == o) {
    +                   return true;
    +           }
    +           if (o == null || getClass() != o.getClass()) {
    +                   return false;
    +           }
    +           CheckpointStatistics that = (CheckpointStatistics) o;
    +           return id == that.id &&
    +                   savepoint == that.savepoint &&
    +                   triggerTimestamp == that.triggerTimestamp &&
    +                   latestAckTimestamp == that.latestAckTimestamp &&
    +                   stateSize == that.stateSize &&
    +                   duration == that.duration &&
    +                   alignmentBuffered == that.alignmentBuffered &&
    +                   numSubtasks == that.numSubtasks &&
    +                   numAckSubtasks == that.numAckSubtasks &&
    +                   status == that.status &&
    +                   Objects.equals(checkpointStatisticsPerTask, 
that.checkpointStatisticsPerTask);
    +   }
    +
    +   @Override
    +   public int hashCode() {
    +           return Objects.hash(id, status, savepoint, triggerTimestamp, 
latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, 
numAckSubtasks, checkpointStatisticsPerTask);
    +   }
    +
    +   public static CheckpointStatistics 
generateCheckpointStatistics(AbstractCheckpointStats checkpointStats, boolean 
includeTaskCheckpointStatistics) {
    +           if (checkpointStats != null) {
    +
    +                   Map<JobVertexID, TaskCheckpointStatistics> 
checkpointStatisticsPerTask;
    +
    +                   if (includeTaskCheckpointStatistics) {
    +                           Collection<TaskStateStats> taskStateStats = 
checkpointStats.getAllTaskStateStats();
    +
    +                           checkpointStatisticsPerTask = new 
HashMap<>(taskStateStats.size());
    +
    +                           for (TaskStateStats taskStateStat : 
taskStateStats) {
    +                                   checkpointStatisticsPerTask.put(
    +                                           taskStateStat.getJobVertexId(),
    +                                           new TaskCheckpointStatistics(
    +                                                   
taskStateStat.getLatestAckTimestamp(),
    +                                                   
taskStateStat.getStateSize(),
    +                                                   
taskStateStat.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
    +                                                   
taskStateStat.getAlignmentBuffered(),
    +                                                   
taskStateStat.getNumberOfSubtasks(),
    +                                                   
taskStateStat.getNumberOfAcknowledgedSubtasks()));
    +                           }
    +                   } else {
    +                           checkpointStatisticsPerTask = null;
    +                   }
    +
    +                   if (checkpointStats instanceof 
CompletedCheckpointStats) {
    +                           final CompletedCheckpointStats 
completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
    +
    +                           return new 
CheckpointStatistics.CompletedCheckpointStatistics(
    +                                   
completedCheckpointStats.getCheckpointId(),
    +                                   completedCheckpointStats.getStatus(),
    +                                   
completedCheckpointStats.getProperties().isSavepoint(),
    +                                   
completedCheckpointStats.getTriggerTimestamp(),
    +                                   
completedCheckpointStats.getLatestAckTimestamp(),
    +                                   completedCheckpointStats.getStateSize(),
    +                                   
completedCheckpointStats.getEndToEndDuration(),
    +                                   
completedCheckpointStats.getAlignmentBuffered(),
    +                                   
completedCheckpointStats.getNumberOfSubtasks(),
    +                                   
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +                                   checkpointStatisticsPerTask,
    +                                   
completedCheckpointStats.getExternalPath(),
    +                                   completedCheckpointStats.isDiscarded());
    +                   } else if (checkpointStats instanceof 
FailedCheckpointStats) {
    +                           final FailedCheckpointStats 
failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
    +
    +                           return new 
CheckpointStatistics.FailedCheckpointStatistics(
    +                                   failedCheckpointStats.getCheckpointId(),
    +                                   failedCheckpointStats.getStatus(),
    +                                   
failedCheckpointStats.getProperties().isSavepoint(),
    +                                   
failedCheckpointStats.getTriggerTimestamp(),
    +                                   
failedCheckpointStats.getLatestAckTimestamp(),
    +                                   failedCheckpointStats.getStateSize(),
    +                                   
failedCheckpointStats.getEndToEndDuration(),
    +                                   
failedCheckpointStats.getAlignmentBuffered(),
    +                                   
failedCheckpointStats.getNumberOfSubtasks(),
    +                                   
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
    +                                   checkpointStatisticsPerTask,
    +                                   
failedCheckpointStats.getFailureTimestamp(),
    +                                   
failedCheckpointStats.getFailureMessage());
    +                   } else {
    +                           throw new IllegalArgumentException("Given 
checkpoint stats object of type " + checkpointStats.getClass().getName() + " 
cannot be converted.");
    +                   }
    +           } else {
    +                   return null;
    +           }
    +   }
    +
    +   /**
    +    * Checkpoint statistics for a single task.
    +    */
    +   public static final class TaskCheckpointStatistics {
    +
    +           public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = 
"latest_ack_timestamp";
    +
    +           public static final String FIELD_NAME_STATE_SIZE = "state_size";
    +
    +           public static final String FIELD_NAME_DURATION = 
"end_to_end_duration";
    +
    +           public static final String FIELD_NAME_ALIGNMENT_BUFFERED = 
"alignment_buffered";
    +
    +           public static final String FIELD_NAME_NUM_SUBTASKS = 
"num_subtasks";
    +
    +           public static final String FIELD_NAME_NUM_ACK_SUBTASKS = 
"num_acknowledged_subtasks";
    +
    +           @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
    +           private final long latestAckTimestamp;
    +
    +           @JsonProperty(FIELD_NAME_STATE_SIZE)
    +           private final long stateSize;
    +
    +           @JsonProperty(FIELD_NAME_DURATION)
    +           private final long duration;
    +
    +           @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
    +           private final long alignmentBuffered;
    +
    +           @JsonProperty(FIELD_NAME_NUM_SUBTASKS)
    +           private final int numSubtasks;
    +
    +           @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
    +           private final int numAckSubtasks;
    +
    +           @JsonCreator
    +           public TaskCheckpointStatistics(
    +                   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
    +                   @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +                   @JsonProperty(FIELD_NAME_DURATION) long duration,
    +                   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
    +                   @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +                   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks) {
    +                   this.latestAckTimestamp = latestAckTimestamp;
    +                   this.stateSize = stateSize;
    +                   this.duration = duration;
    +                   this.alignmentBuffered = alignmentBuffered;
    +                   this.numSubtasks = numSubtasks;
    +                   this.numAckSubtasks = numAckSubtasks;
    +           }
    +
    +           public long getLatestAckTimestamp() {
    +                   return latestAckTimestamp;
    +           }
    +
    +           public long getStateSize() {
    +                   return stateSize;
    +           }
    +
    +           public long getDuration() {
    +                   return duration;
    +           }
    +
    +           public long getAlignmentBuffered() {
    +                   return alignmentBuffered;
    +           }
    +
    +           public int getNumSubtasks() {
    +                   return numSubtasks;
    +           }
    +
    +           public int getNumAckSubtasks() {
    +                   return numAckSubtasks;
    +           }
    +
    +           @Override
    +           public boolean equals(Object o) {
    +                   if (this == o) {
    +                           return true;
    +                   }
    +                   if (o == null || getClass() != o.getClass()) {
    +                           return false;
    +                   }
    +                   TaskCheckpointStatistics that = 
(TaskCheckpointStatistics) o;
    +                   return latestAckTimestamp == that.latestAckTimestamp &&
    +                           stateSize == that.stateSize &&
    +                           duration == that.duration &&
    +                           alignmentBuffered == that.alignmentBuffered &&
    +                           numSubtasks == that.numSubtasks &&
    +                           numAckSubtasks == that.numAckSubtasks;
    +           }
    +
    +           @Override
    +           public int hashCode() {
    +                   return Objects.hash(latestAckTimestamp, stateSize, 
duration, alignmentBuffered, numSubtasks, numAckSubtasks);
    +           }
    +   }
    +
    +   /**
    +    * Statistics for a completed checkpoint.
    +    */
    +   public static final class CompletedCheckpointStatistics extends 
CheckpointStatistics {
    +
    +           public static final String FIELD_NAME_EXTERNAL_PATH = 
"external_path";
    +
    +           public static final String FIELD_NAME_DISCARDED = "discarded";
    +
    +           @JsonProperty(FIELD_NAME_EXTERNAL_PATH)
    +           @Nullable
    +           private final String externalPath;
    +
    +           @JsonProperty(FIELD_NAME_DISCARDED)
    +           private final boolean discarded;
    +
    +           @JsonCreator
    +           public CompletedCheckpointStatistics(
    +                   @JsonProperty(FIELD_NAME_ID) long id,
    +                   @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
    +                   @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
    +                   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
    +                   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
    +                   @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +                   @JsonProperty(FIELD_NAME_DURATION) long duration,
    +                   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
    +                   @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +                   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
    +                   @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +                   @JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable 
String externalPath,
    +                   @JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
    +                   super(
    +                           id,
    +                           status,
    +                           savepoint,
    +                           triggerTimestamp,
    +                           latestAckTimestamp,
    +                           stateSize,
    +                           duration,
    +                           alignmentBuffered,
    +                           numSubtasks,
    +                           numAckSubtasks,
    +                           checkpointingStatisticsPerTask);
    +
    +                   this.externalPath = externalPath;
    +                   this.discarded = discarded;
    +           }
    +
    +           @Nullable
    +           public String getExternalPath() {
    +                   return externalPath;
    +           }
    +
    +           public boolean isDiscarded() {
    +                   return discarded;
    +           }
    +
    +           @Override
    +           public boolean equals(Object o) {
    +                   if (this == o) {
    +                           return true;
    +                   }
    +                   if (o == null || getClass() != o.getClass()) {
    +                           return false;
    +                   }
    +                   if (!super.equals(o)) {
    +                           return false;
    +                   }
    +                   CompletedCheckpointStatistics that = 
(CompletedCheckpointStatistics) o;
    +                   return discarded == that.discarded &&
    +                           Objects.equals(externalPath, that.externalPath);
    +           }
    +
    +           @Override
    +           public int hashCode() {
    +                   return Objects.hash(super.hashCode(), externalPath, 
discarded);
    +           }
    +   }
    +
    +   /**
    +    * Statistics for a failed checkpoint.
    +    */
    +   public static final class FailedCheckpointStatistics extends 
CheckpointStatistics {
    +
    +           public static final String FIELD_NAME_FAILURE_TIMESTAMP = 
"failure_timestamp";
    +
    +           public static final String FIELD_NAME_FAILURE_MESSAGE = 
"failure_message";
    +
    +           @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
    +           private final long failureTimestamp;
    +
    +           @JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
    +           @Nullable
    +           private final String failureMessage;
    +
    +           @JsonCreator
    +           public FailedCheckpointStatistics(
    +                   @JsonProperty(FIELD_NAME_ID) long id,
    +                   @JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus 
status,
    +                   @JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean 
savepoint,
    +                   @JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long 
triggerTimestamp,
    +                   @JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long 
latestAckTimestamp,
    +                   @JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
    +                   @JsonProperty(FIELD_NAME_DURATION) long duration,
    +                   @JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long 
alignmentBuffered,
    +                   @JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
    +                   @JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int 
numAckSubtasks,
    +                   @JsonDeserialize(keyUsing = 
JobVertexIDDeserializer.class) @JsonProperty(FIELD_NAME_TASKS) @Nullable 
Map<JobVertexID, TaskCheckpointStatistics> checkpointingStatisticsPerTask,
    +                   @JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long 
failureTimestamp,
    +                   @JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable 
String failureMessage) {
    +                   super(
    +                           id,
    +                           status,
    +                           savepoint,
    +                           triggerTimestamp,
    +                           latestAckTimestamp,
    +                           stateSize,
    +                           duration,
    +                           alignmentBuffered,
    +                           numSubtasks,
    +                           numAckSubtasks,
    +                           checkpointingStatisticsPerTask);
    +
    +                   this.failureTimestamp = failureTimestamp;
    --- End diff --
    
    Should be a primitive.


---

Reply via email to