[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

Disable failing when not all creator properties are known

Move CheckpointStatsCache out of legacy package; Remove unused 
CheckpointingStatistics#generateCheckpointStatistics method

Remove JsonInclude.Include.NON_NULL from CheckpointStatistics; Pull null check 
out of CheckpointStatistics#generateCheckpointStatistics; Make 
CheckpointStatistics#checkpointStatisticcsPerTask non nullable; Add fail on 
missing creator property

This closes #4763.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a286d0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a286d0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a286d0f

Branch: refs/heads/master
Commit: 0a286d0ff98afa68034daff4634f526eaaf97897
Parents: 6b3fdc2
Author: Till Rohrmann <[email protected]>
Authored: Mon Oct 2 19:39:38 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 10 17:34:14 2017 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  29 +-
 .../rest/handler/RestHandlerConfiguration.java  |  18 +-
 .../job/AbstractExecutionGraphHandler.java      |  10 +-
 .../rest/handler/job/JobConfigHandler.java      |   5 +-
 .../checkpoints/AbstractCheckpointHandler.java  |  91 +++
 .../checkpoints/CheckpointConfigHandler.java    |   7 +-
 .../CheckpointStatisticDetailsHandler.java      |  54 ++
 .../CheckpointStatisticsHandler.java            | 181 -----
 .../job/checkpoints/CheckpointStatsCache.java   |  81 ++
 .../CheckpointingStatisticsHandler.java         | 153 ++++
 .../checkpoints/CheckpointConfigHandler.java    |   2 +-
 .../checkpoints/CheckpointStatsCache.java       |  81 --
 .../CheckpointStatsDetailsHandler.java          |   1 +
 .../CheckpointStatsDetailsSubtasksHandler.java  |   1 +
 .../checkpoints/CheckpointStatsHandler.java     |  81 +-
 .../rest/messages/CheckpointConfigHeaders.java  |  70 --
 .../rest/messages/CheckpointConfigInfo.java     | 151 ----
 .../rest/messages/CheckpointStatistics.java     | 763 -------------------
 .../messages/CheckpointStatisticsHeaders.java   |  68 --
 .../rest/messages/JobMessageParameters.java     |   2 +-
 .../checkpoints/CheckpointConfigHeaders.java    |  73 ++
 .../checkpoints/CheckpointConfigInfo.java       | 152 ++++
 .../checkpoints/CheckpointIdPathParameter.java  |  48 ++
 .../CheckpointMessageParameters.java            |  38 +
 .../CheckpointStatisticDetailsHeaders.java      |  72 ++
 .../checkpoints/CheckpointStatistics.java       | 537 +++++++++++++
 .../checkpoints/CheckpointingStatistics.java    | 478 ++++++++++++
 .../CheckpointingStatisticsHeaders.java         |  71 ++
 .../messages/json/JobVertexIDDeserializer.java  |  37 +
 .../messages/json/JobVertexIDSerializer.java    |  44 ++
 .../checkpoints/CheckpointStatsCacheTest.java   |   1 +
 .../CheckpointStatsDetailsHandlerTest.java      |   1 +
 ...heckpointStatsSubtaskDetailsHandlerTest.java |   1 +
 .../messages/CheckpointConfigInfoTest.java      |   2 +-
 .../messages/CheckpointStatisticsTest.java      | 104 ---
 .../messages/CheckpointingStatisticsTest.java   | 134 ++++
 37 files changed, 2164 insertions(+), 1480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 0bf6552..1a6178f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.WebHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
@@ -58,7 +59,6 @@ import 
org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
-import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index d64e649..2a2d9be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -32,7 +32,9 @@ import 
org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -43,8 +45,6 @@ import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpeci
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfigurationInfo;
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import 
org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
-import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import 
org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -52,6 +52,9 @@ import 
org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FileUtils;
@@ -78,6 +81,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
        private final Executor executor;
 
        private final ExecutionGraphCache executionGraphCache;
+       private final CheckpointStatsCache checkpointStatsCache;
 
        public DispatcherRestEndpoint(
                        RestServerEndpointConfiguration endpointConfiguration,
@@ -94,6 +98,9 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                this.executionGraphCache = new ExecutionGraphCache(
                        restConfiguration.getTimeout(),
                        
Time.milliseconds(restConfiguration.getRefreshInterval()));
+
+               this.checkpointStatsCache = new CheckpointStatsCache(
+                       
restConfiguration.getMaxCheckpointStatisticCacheEntries());
        }
 
        @Override
@@ -162,14 +169,23 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        executionGraphCache,
                        executor);
 
-               CheckpointStatisticsHandler checkpointStatisticsHandler = new 
CheckpointStatisticsHandler(
+               CheckpointingStatisticsHandler checkpointStatisticsHandler = 
new CheckpointingStatisticsHandler(
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
-                       CheckpointStatisticsHeaders.getInstance(),
+                       CheckpointingStatisticsHeaders.getInstance(),
                        executionGraphCache,
                        executor);
 
+               CheckpointStatisticDetailsHandler 
checkpointStatisticDetailsHandler = new CheckpointStatisticDetailsHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       CheckpointStatisticDetailsHeaders.getInstance(),
+                       executionGraphCache,
+                       executor,
+                       checkpointStatsCache);
+
                final File tmpDir = restConfiguration.getTmpDir();
 
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
@@ -192,7 +208,8 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), 
jobTerminationHandler));
                handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), 
jobConfigHandler));
                handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), 
checkpointConfigHandler));
-               
handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), 
checkpointStatisticsHandler));
+               
handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), 
checkpointStatisticsHandler));
+               
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), 
checkpointStatisticDetailsHandler));
 
                BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
                
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 9220bd9..0344597 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -32,14 +32,22 @@ public class RestHandlerConfiguration {
 
        private final long refreshInterval;
 
+       private final int maxCheckpointStatisticCacheEntries;
+
        private final Time timeout;
 
        private final File tmpDir;
 
-       public RestHandlerConfiguration(long refreshInterval, Time timeout, 
File tmpDir) {
+       public RestHandlerConfiguration(
+                       long refreshInterval,
+                       int maxCheckpointStatisticCacheEntries,
+                       Time timeout,
+                       File tmpDir) {
                Preconditions.checkArgument(refreshInterval > 0L, "The refresh 
interval (ms) should be larger than 0.");
                this.refreshInterval = refreshInterval;
 
+               this.maxCheckpointStatisticCacheEntries = 
maxCheckpointStatisticCacheEntries;
+
                this.timeout = Preconditions.checkNotNull(timeout);
                this.tmpDir = Preconditions.checkNotNull(tmpDir);
        }
@@ -48,6 +56,10 @@ public class RestHandlerConfiguration {
                return refreshInterval;
        }
 
+       public int getMaxCheckpointStatisticCacheEntries() {
+               return maxCheckpointStatisticCacheEntries;
+       }
+
        public Time getTimeout() {
                return timeout;
        }
@@ -59,10 +71,12 @@ public class RestHandlerConfiguration {
        public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
                final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
+               final int maxCheckpointStatisticCacheEntries = 
configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
+
                final Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
 
                final File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR));
 
-               return new RestHandlerConfiguration(refreshInterval, timeout, 
tmpDir);
+               return new RestHandlerConfiguration(refreshInterval, 
maxCheckpointStatisticCacheEntries, timeout, tmpDir);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index f2b1ac8..5348b55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -45,7 +45,7 @@ import java.util.concurrent.Executor;
  *
  * @param <R> response type
  */
-public abstract class AbstractExecutionGraphHandler<R extends ResponseBody> 
extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, R, 
JobMessageParameters> {
+public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M 
extends JobMessageParameters> extends AbstractRestHandler<RestfulGateway, 
EmptyRequestBody, R, M> {
 
        private final ExecutionGraphCache executionGraphCache;
 
@@ -55,7 +55,7 @@ public abstract class AbstractExecutionGraphHandler<R extends 
ResponseBody> exte
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
-                       MessageHeaders<EmptyRequestBody, R, 
JobMessageParameters> messageHeaders,
+                       MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
                super(localRestAddress, leaderRetriever, timeout, 
messageHeaders);
@@ -65,7 +65,7 @@ public abstract class AbstractExecutionGraphHandler<R extends 
ResponseBody> exte
        }
 
        @Override
-       protected CompletableFuture<R> handleRequest(@Nonnull 
HandlerRequest<EmptyRequestBody, JobMessageParameters> request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
+       protected CompletableFuture<R> handleRequest(@Nonnull 
HandlerRequest<EmptyRequestBody, M> request, @Nonnull RestfulGateway gateway) 
throws RestHandlerException {
                JobID jobId = 
request.getPathParameter(JobIDPathParameter.class);
 
                CompletableFuture<AccessExecutionGraph> executionGraphFuture = 
executionGraphCache.getExecutionGraph(jobId, gateway);
@@ -73,7 +73,7 @@ public abstract class AbstractExecutionGraphHandler<R extends 
ResponseBody> exte
                return executionGraphFuture.thenApplyAsync(
                        executionGraph -> {
                                try {
-                                       return handleRequest(executionGraph);
+                                       return handleRequest(request, 
executionGraph);
                                } catch (RestHandlerException rhe) {
                                        throw new CompletionException(rhe);
                                }
@@ -81,5 +81,5 @@ public abstract class AbstractExecutionGraphHandler<R extends 
ResponseBody> exte
                        executor);
        }
 
-       protected abstract R handleRequest(AccessExecutionGraph executionGraph) 
throws RestHandlerException;
+       protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> 
request, AccessExecutionGraph executionGraph) throws RestHandlerException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index bbe4eef..f27d84f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobConfigInfo;
@@ -35,7 +36,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job configuration.
  */
-public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInfo> {
+public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
 
        public JobConfigHandler(
                        CompletableFuture<String> localRestAddress,
@@ -55,7 +56,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInf
        }
 
        @Override
-       protected JobConfigInfo handleRequest(AccessExecutionGraph 
executionGraph) {
+       protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, 
JobMessageParameters> request, AccessExecutionGraph executionGraph) {
                final ArchivedExecutionConfig executionConfig = 
executionGraph.getArchivedExecutionConfig();
                final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
new file mode 100644
index 0000000..62ed1a4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for checkpoint related REST handler.
+ *
+ * @param <R> type of the response
+ */
+public abstract class AbstractCheckpointHandler<R extends ResponseBody> 
extends AbstractExecutionGraphHandler<R, CheckpointMessageParameters> {
+
+       private final CheckpointStatsCache checkpointStatsCache;
+
+       protected AbstractCheckpointHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       MessageHeaders<EmptyRequestBody, R, 
CheckpointMessageParameters> messageHeaders,
+                       ExecutionGraphCache executionGraphCache,
+                       Executor executor,
+                       CheckpointStatsCache checkpointStatsCache) {
+               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+
+               this.checkpointStatsCache = 
Preconditions.checkNotNull(checkpointStatsCache);
+       }
+
+       @Override
+       protected R handleRequest(HandlerRequest<EmptyRequestBody, 
CheckpointMessageParameters> request, AccessExecutionGraph executionGraph) 
throws RestHandlerException {
+               final long checkpointId = 
request.getPathParameter(CheckpointIdPathParameter.class);
+
+               final CheckpointStatsSnapshot checkpointStatsSnapshot = 
executionGraph.getCheckpointStatsSnapshot();
+
+               if (checkpointStatsSnapshot != null) {
+                       AbstractCheckpointStats checkpointStats = 
checkpointStatsSnapshot.getHistory().getCheckpointById(checkpointId);
+
+                       if (checkpointStats != null) {
+                               checkpointStatsCache.tryAdd(checkpointStats);
+                       } else {
+                               checkpointStats = 
checkpointStatsCache.tryGet(checkpointId);
+                       }
+
+                       if (checkpointStats != null) {
+                               return handleCheckpointRequest(checkpointStats);
+                       } else {
+                               throw new RestHandlerException("Could not find 
checkpointing statistics for checkpoint " + checkpointId + '.', 
HttpResponseStatus.NOT_FOUND);
+                       }
+               } else {
+                       throw new RestHandlerException("Checkpointing was not 
enabled for job " + executionGraph.getJobID() + '.', 
HttpResponseStatus.NOT_FOUND);
+               }
+       }
+
+       protected abstract R handleCheckpointRequest(AbstractCheckpointStats 
checkpointStats);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 94646eb..1efa7af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
@@ -40,7 +41,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint configuration.
  */
-public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<CheckpointConfigInfo> {
+public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
 
        public CheckpointConfigHandler(
                        CompletableFuture<String> localRestAddress,
@@ -59,7 +60,7 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<Check
        }
 
        @Override
-       protected CheckpointConfigInfo handleRequest(AccessExecutionGraph 
executionGraph) throws RestHandlerException {
+       protected CheckpointConfigInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) throws RestHandlerException {
                final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration = 
executionGraph.getCheckpointCoordinatorConfiguration();
 
                if (checkpointCoordinatorConfiguration == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
new file mode 100644
index 0000000..2fc3008
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * REST handler which returns the details for a checkpoint.
+ */
+public class CheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler<CheckpointStatistics> {
+
+       public CheckpointStatisticDetailsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       MessageHeaders<EmptyRequestBody, CheckpointStatistics, 
CheckpointMessageParameters> messageHeaders,
+                       ExecutionGraphCache executionGraphCache,
+                       Executor executor,
+                       CheckpointStatsCache checkpointStatsCache) {
+               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+       }
+
+       @Override
+       protected CheckpointStatistics 
handleCheckpointRequest(AbstractCheckpointStats checkpointStats) {
+               return 
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
deleted file mode 100644
index 21ded78..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.job.checkpoints;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler which serves the checkpoint statistics.
- */
-public class CheckpointStatisticsHandler extends 
AbstractExecutionGraphHandler<CheckpointStatistics> {
-
-       public CheckpointStatisticsHandler(
-                       CompletableFuture<String> localRestAddress,
-                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
-                       Time timeout,
-                       MessageHeaders<EmptyRequestBody, CheckpointStatistics, 
JobMessageParameters> messageHeaders,
-                       ExecutionGraphCache executionGraphCache,
-                       Executor executor) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
-       }
-
-       @Override
-       protected CheckpointStatistics handleRequest(AccessExecutionGraph 
executionGraph) throws RestHandlerException {
-
-               final CheckpointStatsSnapshot checkpointStatsSnapshot = 
executionGraph.getCheckpointStatsSnapshot();
-
-               if (checkpointStatsSnapshot == null) {
-                       throw new RestHandlerException("Checkpointing has not 
been enabled.", HttpResponseStatus.NOT_FOUND);
-               } else {
-                       final CheckpointStatsCounts checkpointStatsCounts = 
checkpointStatsSnapshot.getCounts();
-
-                       final CheckpointStatistics.Counts counts = new 
CheckpointStatistics.Counts(
-                               
checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
-                               
checkpointStatsCounts.getTotalNumberOfCheckpoints(),
-                               
checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
-                               
checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
-                               
checkpointStatsCounts.getNumberOfFailedCheckpoints());
-
-                       final CompletedCheckpointStatsSummary 
checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
-                       final MinMaxAvgStats stateSize = 
checkpointStatsSummary.getStateSizeStats();
-                       final MinMaxAvgStats duration = 
checkpointStatsSummary.getEndToEndDurationStats();
-                       final MinMaxAvgStats alignment = 
checkpointStatsSummary.getAlignmentBufferedStats();
-
-                       final CheckpointStatistics.Summary summary = new 
CheckpointStatistics.Summary(
-                               new CheckpointStatistics.MinMaxAvgStatistics(
-                                       stateSize.getMinimum(),
-                                       stateSize.getMaximum(),
-                                       stateSize.getAverage()),
-                               new CheckpointStatistics.MinMaxAvgStatistics(
-                                       duration.getMinimum(),
-                                       duration.getMaximum(),
-                                       duration.getAverage()),
-                               new CheckpointStatistics.MinMaxAvgStatistics(
-                                       alignment.getMinimum(),
-                                       alignment.getMaximum(),
-                                       alignment.getAverage()));
-
-                       final CheckpointStatsHistory checkpointStatsHistory = 
checkpointStatsSnapshot.getHistory();
-
-                       final 
CheckpointStatistics.CompletedCheckpointStatistics completed = 
(CheckpointStatistics.CompletedCheckpointStatistics) 
generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint());
-                       final 
CheckpointStatistics.CompletedCheckpointStatistics savepoint = 
(CheckpointStatistics.CompletedCheckpointStatistics) 
generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint());
-                       final CheckpointStatistics.FailedCheckpointStatistics 
failed = (CheckpointStatistics.FailedCheckpointStatistics) 
generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint());
-
-                       final RestoredCheckpointStats restoredCheckpointStats = 
checkpointStatsSnapshot.getLatestRestoredCheckpoint();
-
-                       final CheckpointStatistics.RestoredCheckpointStatistics 
restored;
-
-                       if (restoredCheckpointStats == null) {
-                               restored = null;
-                       } else {
-                               restored = new 
CheckpointStatistics.RestoredCheckpointStatistics(
-                                       
restoredCheckpointStats.getCheckpointId(),
-                                       
restoredCheckpointStats.getRestoreTimestamp(),
-                                       
restoredCheckpointStats.getProperties().isSavepoint(),
-                                       
restoredCheckpointStats.getExternalPath());
-                       }
-
-                       final CheckpointStatistics.LatestCheckpoints 
latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
-                               completed,
-                               savepoint,
-                               failed,
-                               restored);
-
-                       final 
List<CheckpointStatistics.BaseCheckpointStatistics> history = new 
ArrayList<>(16);
-
-                       for (AbstractCheckpointStats abstractCheckpointStats : 
checkpointStatsSnapshot.getHistory().getCheckpoints()) {
-                               
history.add(generateCheckpointStatistics(abstractCheckpointStats));
-                       }
-
-                       return new CheckpointStatistics(
-                               counts,
-                               summary,
-                               latestCheckpoints,
-                               history);
-               }
-       }
-
-       private static CheckpointStatistics.BaseCheckpointStatistics 
generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
-               if (checkpointStats != null) {
-                       if (checkpointStats instanceof 
CompletedCheckpointStats) {
-                               final CompletedCheckpointStats 
completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
-
-                               return new 
CheckpointStatistics.CompletedCheckpointStatistics(
-                                       
completedCheckpointStats.getCheckpointId(),
-                                       completedCheckpointStats.getStatus(),
-                                       
completedCheckpointStats.getProperties().isSavepoint(),
-                                       
completedCheckpointStats.getTriggerTimestamp(),
-                                       
completedCheckpointStats.getLatestAckTimestamp(),
-                                       completedCheckpointStats.getStateSize(),
-                                       
completedCheckpointStats.getEndToEndDuration(),
-                                       
completedCheckpointStats.getAlignmentBuffered(),
-                                       
completedCheckpointStats.getNumberOfSubtasks(),
-                                       
completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
-                                       
completedCheckpointStats.getExternalPath(),
-                                       completedCheckpointStats.isDiscarded());
-                       } else if (checkpointStats instanceof 
FailedCheckpointStats) {
-                               final FailedCheckpointStats 
failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
-
-                               return new 
CheckpointStatistics.FailedCheckpointStatistics(
-                                       failedCheckpointStats.getCheckpointId(),
-                                       failedCheckpointStats.getStatus(),
-                                       
failedCheckpointStats.getProperties().isSavepoint(),
-                                       
failedCheckpointStats.getTriggerTimestamp(),
-                                       
failedCheckpointStats.getLatestAckTimestamp(),
-                                       failedCheckpointStats.getStateSize(),
-                                       
failedCheckpointStats.getEndToEndDuration(),
-                                       
failedCheckpointStats.getAlignmentBuffered(),
-                                       
failedCheckpointStats.getNumberOfSubtasks(),
-                                       
failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
-                                       
failedCheckpointStats.getFailureTimestamp(),
-                                       
failedCheckpointStats.getFailureMessage());
-                       } else {
-                               throw new IllegalArgumentException("Given 
checkpoint stats object of type " + checkpointStats.getClass().getName() + " 
cannot be converted.");
-                       }
-               } else {
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..dcd36b0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+       @Nullable
+       private final Cache<Long, AbstractCheckpointStats> cache;
+
+       public CheckpointStatsCache(int maxNumEntries) {
+               if (maxNumEntries > 0) {
+                       this.cache = CacheBuilder.<Long, 
AbstractCheckpointStats>newBuilder()
+                               .maximumSize(maxNumEntries)
+                               .build();
+               } else {
+                       this.cache = null;
+               }
+       }
+
+       /**
+        * Try to add the checkpoint to the cache.
+        *
+        * @param checkpoint Checkpoint to be added.
+        */
+       public void tryAdd(AbstractCheckpointStats checkpoint) {
+               // Don't add in progress checkpoints as they will be replaced 
by their
+               // completed/failed version eventually.
+               if (cache != null && checkpoint != null && 
!checkpoint.getStatus().isInProgress()) {
+                       cache.put(checkpoint.getCheckpointId(), checkpoint);
+               }
+       }
+
+       /**
+        * Try to look up a checkpoint by it's ID in the cache.
+        *
+        * @param checkpointId ID of the checkpoint to look up.
+        * @return The checkpoint or <code>null</code> if checkpoint not found.
+        */
+       public AbstractCheckpointStats tryGet(long checkpointId) {
+               if (cache != null) {
+                       return cache.getIfPresent(checkpointId);
+               } else {
+                       return null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
new file mode 100644
index 0000000..1c5762e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint statistics.
+ */
+public class CheckpointingStatisticsHandler extends 
AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+
+       public CheckpointingStatisticsHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       MessageHeaders<EmptyRequestBody, 
CheckpointingStatistics, JobMessageParameters> messageHeaders,
+                       ExecutionGraphCache executionGraphCache,
+                       Executor executor) {
+               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+       }
+
+       @Override
+       protected CheckpointingStatistics 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+               final CheckpointStatsSnapshot checkpointStatsSnapshot = 
executionGraph.getCheckpointStatsSnapshot();
+
+               if (checkpointStatsSnapshot == null) {
+                       throw new RestHandlerException("Checkpointing has not 
been enabled.", HttpResponseStatus.NOT_FOUND);
+               } else {
+                       final CheckpointStatsCounts checkpointStatsCounts = 
checkpointStatsSnapshot.getCounts();
+
+                       final CheckpointingStatistics.Counts counts = new 
CheckpointingStatistics.Counts(
+                               
checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
+                               
checkpointStatsCounts.getTotalNumberOfCheckpoints(),
+                               
checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
+                               
checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
+                               
checkpointStatsCounts.getNumberOfFailedCheckpoints());
+
+                       final CompletedCheckpointStatsSummary 
checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
+                       final MinMaxAvgStats stateSize = 
checkpointStatsSummary.getStateSizeStats();
+                       final MinMaxAvgStats duration = 
checkpointStatsSummary.getEndToEndDurationStats();
+                       final MinMaxAvgStats alignment = 
checkpointStatsSummary.getAlignmentBufferedStats();
+
+                       final CheckpointingStatistics.Summary summary = new 
CheckpointingStatistics.Summary(
+                               new CheckpointingStatistics.MinMaxAvgStatistics(
+                                       stateSize.getMinimum(),
+                                       stateSize.getMaximum(),
+                                       stateSize.getAverage()),
+                               new CheckpointingStatistics.MinMaxAvgStatistics(
+                                       duration.getMinimum(),
+                                       duration.getMaximum(),
+                                       duration.getAverage()),
+                               new CheckpointingStatistics.MinMaxAvgStatistics(
+                                       alignment.getMinimum(),
+                                       alignment.getMaximum(),
+                                       alignment.getAverage()));
+
+                       final CheckpointStatsHistory checkpointStatsHistory = 
checkpointStatsSnapshot.getHistory();
+
+                       final 
CheckpointStatistics.CompletedCheckpointStatistics completed = 
checkpointStatsHistory.getLatestCompletedCheckpoint() != null ?
+                               
(CheckpointStatistics.CompletedCheckpointStatistics) 
CheckpointStatistics.generateCheckpointStatistics(
+                                       
checkpointStatsHistory.getLatestCompletedCheckpoint(),
+                                       false) :
+                               null;
+
+                       final 
CheckpointStatistics.CompletedCheckpointStatistics savepoint = 
checkpointStatsHistory.getLatestSavepoint() != null ?
+                               
(CheckpointStatistics.CompletedCheckpointStatistics) 
CheckpointStatistics.generateCheckpointStatistics(
+                                       
checkpointStatsHistory.getLatestSavepoint(),
+                                       false) :
+                               null;
+
+                       final CheckpointStatistics.FailedCheckpointStatistics 
failed = checkpointStatsHistory.getLatestFailedCheckpoint() != null ?
+                               
(CheckpointStatistics.FailedCheckpointStatistics) 
CheckpointStatistics.generateCheckpointStatistics(
+                                       
checkpointStatsHistory.getLatestFailedCheckpoint(),
+                                       false) :
+                               null;
+
+                       final RestoredCheckpointStats restoredCheckpointStats = 
checkpointStatsSnapshot.getLatestRestoredCheckpoint();
+
+                       final 
CheckpointingStatistics.RestoredCheckpointStatistics restored;
+
+                       if (restoredCheckpointStats == null) {
+                               restored = null;
+                       } else {
+                               restored = new 
CheckpointingStatistics.RestoredCheckpointStatistics(
+                                       
restoredCheckpointStats.getCheckpointId(),
+                                       
restoredCheckpointStats.getRestoreTimestamp(),
+                                       
restoredCheckpointStats.getProperties().isSavepoint(),
+                                       
restoredCheckpointStats.getExternalPath());
+                       }
+
+                       final CheckpointingStatistics.LatestCheckpoints 
latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+                               completed,
+                               savepoint,
+                               failed,
+                               restored);
+
+                       final List<CheckpointStatistics> history = new 
ArrayList<>(16);
+
+                       for (AbstractCheckpointStats abstractCheckpointStats : 
checkpointStatsSnapshot.getHistory().getCheckpoints()) {
+                               
history.add(CheckpointStatistics.generateCheckpointStatistics(abstractCheckpointStats,
 false));
+                       }
+
+                       return new CheckpointingStatistics(
+                               counts,
+                               summary,
+                               latestCheckpoints,
+                               history);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index f50c42d..60b9799 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointConfigInfo;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
deleted file mode 100644
index f21fc76..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-/**
- * A size-based cache of accessed checkpoints for completed and failed
- * checkpoints.
- *
- * <p>Having this cache in place for accessed stats improves the user
- * experience quite a bit as accessed checkpoint stats stay available
- * and don't expire. For example if you manage to click on the last
- * checkpoint in the history, it is not available via the stats as soon
- * as another checkpoint is triggered. With the cache in place, the
- * checkpoint will still be available for investigation.
- */
-public class CheckpointStatsCache {
-
-       @Nullable
-       private final Cache<Long, AbstractCheckpointStats> cache;
-
-       public CheckpointStatsCache(int maxNumEntries) {
-               if (maxNumEntries > 0) {
-                       this.cache = CacheBuilder.<Long, 
AbstractCheckpointStats>newBuilder()
-                               .maximumSize(maxNumEntries)
-                               .build();
-               } else {
-                       this.cache = null;
-               }
-       }
-
-       /**
-        * Try to add the checkpoint to the cache.
-        *
-        * @param checkpoint Checkpoint to be added.
-        */
-       void tryAdd(AbstractCheckpointStats checkpoint) {
-               // Don't add in progress checkpoints as they will be replaced 
by their
-               // completed/failed version eventually.
-               if (cache != null && checkpoint != null && 
!checkpoint.getStatus().isInProgress()) {
-                       cache.put(checkpoint.getCheckpointId(), checkpoint);
-               }
-       }
-
-       /**
-        * Try to look up a checkpoint by it's ID in the cache.
-        *
-        * @param checkpointId ID of the checkpoint to look up.
-        * @return The checkpoint or <code>null</code> if checkpoint not found.
-        */
-       AbstractCheckpointStats tryGet(long checkpointId) {
-               if (cache != null) {
-                       return cache.getIfPresent(checkpointId);
-               } else {
-                       return null;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index e277971..dce1641 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 5420cf4..1421fb2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import 
org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index 5b35c7f..b6c86be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -31,7 +31,8 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
-import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
@@ -129,37 +130,37 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
        }
 
        private static void writeCounts(JsonGenerator gen, 
CheckpointStatsCounts counts) throws IOException {
-               
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS);
-               
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS,
 counts.getNumberOfRestoredCheckpoints());
-               
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, 
counts.getTotalNumberOfCheckpoints());
-               
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS,
 counts.getNumberOfInProgressCheckpoints());
-               
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS,
 counts.getNumberOfCompletedCheckpoints());
-               
gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, 
counts.getNumberOfFailedCheckpoints());
+               
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_COUNTS);
+               
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS,
 counts.getNumberOfRestoredCheckpoints());
+               
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS,
 counts.getTotalNumberOfCheckpoints());
+               
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS,
 counts.getNumberOfInProgressCheckpoints());
+               
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS,
 counts.getNumberOfCompletedCheckpoints());
+               
gen.writeNumberField(CheckpointingStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS,
 counts.getNumberOfFailedCheckpoints());
                gen.writeEndObject();
        }
 
        private static void writeSummary(
                JsonGenerator gen,
                CompletedCheckpointStatsSummary summary) throws IOException {
-               
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY);
-               
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE);
+               
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_SUMMARY);
+               
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_STATE_SIZE);
                writeMinMaxAvg(gen, summary.getStateSizeStats());
                gen.writeEndObject();
 
-               
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION);
+               
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_DURATION);
                writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
                gen.writeEndObject();
 
-               
gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
+               
gen.writeObjectFieldStart(CheckpointingStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
                writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
                gen.writeEndObject();
                gen.writeEndObject();
        }
 
        static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) 
throws IOException {
-               
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM,
 minMaxAvg.getMinimum());
-               
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM,
 minMaxAvg.getMaximum());
-               
gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE,
 minMaxAvg.getAverage());
+               
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM,
 minMaxAvg.getMinimum());
+               
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM,
 minMaxAvg.getMaximum());
+               
gen.writeNumberField(CheckpointingStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE,
 minMaxAvg.getAverage());
        }
 
        private static void writeLatestCheckpoints(
@@ -169,10 +170,10 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                @Nullable FailedCheckpointStats failed,
                @Nullable RestoredCheckpointStats restored) throws IOException {
 
-               
gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
+               
gen.writeObjectFieldStart(CheckpointingStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
                // Completed checkpoint
                if (completed != null) {
-                       
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
+                       
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
                        writeCheckpoint(gen, completed);
 
                        String externalPath = completed.getExternalPath();
@@ -185,7 +186,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
                // Completed savepoint
                if (savepoint != null) {
-                       
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
+                       
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
                        writeCheckpoint(gen, savepoint);
 
                        String externalPath = savepoint.getExternalPath();
@@ -197,7 +198,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
                // Failed checkpoint
                if (failed != null) {
-                       
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
+                       
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
                        writeCheckpoint(gen, failed);
 
                        
gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP,
 failed.getFailureTimestamp());
@@ -210,14 +211,14 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
                // Restored checkpoint
                if (restored != null) {
-                       
gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
-                       
gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID,
 restored.getCheckpointId());
-                       
gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP,
 restored.getRestoreTimestamp());
-                       
gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT,
 restored.getProperties().isSavepoint());
+                       
gen.writeObjectFieldStart(CheckpointingStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
+                       
gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID,
 restored.getCheckpointId());
+                       
gen.writeNumberField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP,
 restored.getRestoreTimestamp());
+                       
gen.writeBooleanField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT,
 restored.getProperties().isSavepoint());
 
                        String externalPath = restored.getExternalPath();
                        if (externalPath != null) {
-                               
gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH,
 externalPath);
+                               
gen.writeStringField(CheckpointingStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH,
 externalPath);
                        }
                        gen.writeEndObject();
                }
@@ -225,29 +226,29 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
        }
 
        private static void writeCheckpoint(JsonGenerator gen, 
AbstractCheckpointStats checkpoint) throws IOException {
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID,
 checkpoint.getCheckpointId());
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP,
 checkpoint.getTriggerTimestamp());
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP,
 checkpoint.getLatestAckTimestamp());
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE,
 checkpoint.getStateSize());
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION,
 checkpoint.getEndToEndDuration());
-               
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED,
 checkpoint.getAlignmentBuffered());
+               gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, 
checkpoint.getCheckpointId());
+               
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, 
checkpoint.getTriggerTimestamp());
+               
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, 
checkpoint.getLatestAckTimestamp());
+               
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, 
checkpoint.getStateSize());
+               gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, 
checkpoint.getEndToEndDuration());
+               
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, 
checkpoint.getAlignmentBuffered());
 
        }
 
        private static void writeHistory(JsonGenerator gen, 
CheckpointStatsHistory history) throws IOException {
-               
gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY);
+               
gen.writeArrayFieldStart(CheckpointingStatistics.FIELD_NAME_HISTORY);
                for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
                        gen.writeStartObject();
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID,
 checkpoint.getCheckpointId());
-                       
gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS,
 checkpoint.getStatus().toString());
-                       
gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT,
 checkpoint.getProperties().isSavepoint());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP,
 checkpoint.getTriggerTimestamp());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP,
 checkpoint.getLatestAckTimestamp());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE,
 checkpoint.getStateSize());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION,
 checkpoint.getEndToEndDuration());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED,
 checkpoint.getAlignmentBuffered());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS,
 checkpoint.getNumberOfSubtasks());
-                       
gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS,
 checkpoint.getNumberOfAcknowledgedSubtasks());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ID, 
checkpoint.getCheckpointId());
+                       
gen.writeStringField(CheckpointStatistics.FIELD_NAME_STATUS, 
checkpoint.getStatus().toString());
+                       
gen.writeBooleanField(CheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, 
checkpoint.getProperties().isSavepoint());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, 
checkpoint.getTriggerTimestamp());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, 
checkpoint.getLatestAckTimestamp());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_STATE_SIZE, 
checkpoint.getStateSize());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_DURATION, 
checkpoint.getEndToEndDuration());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, 
checkpoint.getAlignmentBuffered());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, 
checkpoint.getNumberOfSubtasks());
+                       
gen.writeNumberField(CheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, 
checkpoint.getNumberOfAcknowledgedSubtasks());
 
                        if (checkpoint.getStatus().isCompleted()) {
                                // --- Completed ---

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
deleted file mode 100644
index bfc0b7a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigHeaders.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * Message headers for the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigHeaders implements 
MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> {
-
-       private static final CheckpointConfigHeaders INSTANCE = new 
CheckpointConfigHeaders();
-
-       public static final String URL = "/jobs/:jobid/checkpoints/config";
-
-       private CheckpointConfigHeaders() {}
-
-       @Override
-       public Class<EmptyRequestBody> getRequestClass() {
-               return EmptyRequestBody.class;
-       }
-
-       @Override
-       public Class<CheckpointConfigInfo> getResponseClass() {
-               return CheckpointConfigInfo.class;
-       }
-
-       @Override
-       public HttpResponseStatus getResponseStatusCode() {
-               return HttpResponseStatus.OK;
-       }
-
-       @Override
-       public JobMessageParameters getUnresolvedMessageParameters() {
-               return new JobMessageParameters();
-       }
-
-       @Override
-       public HttpMethodWrapper getHttpMethod() {
-               return HttpMethodWrapper.GET;
-       }
-
-       @Override
-       public String getTargetRestEndpointURL() {
-               return URL;
-       }
-
-       public static CheckpointConfigHeaders getInstance() {
-               return INSTANCE;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0a286d0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
deleted file mode 100644
index fbda12a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointConfigInfo.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.messages;
-
-import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Response class of the {@link CheckpointConfigHandler}.
- */
-public class CheckpointConfigInfo implements ResponseBody {
-
-       public static final String FIELD_NAME_PROCESSING_MODE = "mode";
-
-       public static final String FIELD_NAME_CHECKPOINT_INTERVAL = "interval";
-
-       public static final String FIELD_NAME_CHECKPOINT_TIMEOUT = "timeout";
-
-       public static final String FIELD_NAME_CHECKPOINT_MIN_PAUSE = 
"min_pause";
-
-       public static final String FIELD_NAME_CHECKPOINT_MAX_CONCURRENT = 
"max_concurrent";
-
-       public static final String FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG = 
"externalization";
-
-       @JsonProperty(FIELD_NAME_PROCESSING_MODE)
-       private final ProcessingMode processingMode;
-
-       @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL)
-       private final long checkpointInterval;
-
-       @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT)
-       private final long checkpointTimeout;
-
-       @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE)
-       private final long minPauseBetweenCheckpoints;
-
-       @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT)
-       private final long maxConcurrentCheckpoints;
-
-       @JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG)
-       private final ExternalizedCheckpointInfo externalizedCheckpointInfo;
-
-       @JsonCreator
-       public CheckpointConfigInfo(
-                       @JsonProperty(FIELD_NAME_PROCESSING_MODE) 
ProcessingMode processingMode,
-                       @JsonProperty(FIELD_NAME_CHECKPOINT_INTERVAL) long 
checkpointInterval,
-                       @JsonProperty(FIELD_NAME_CHECKPOINT_TIMEOUT) long 
checkpointTimeout,
-                       @JsonProperty(FIELD_NAME_CHECKPOINT_MIN_PAUSE) long 
minPauseBetweenCheckpoints,
-                       @JsonProperty(FIELD_NAME_CHECKPOINT_MAX_CONCURRENT) int 
maxConcurrentCheckpoints,
-                       
@JsonProperty(FIELD_NAME_EXTERNALIZED_CHECKPOINT_CONFIG) 
ExternalizedCheckpointInfo externalizedCheckpointInfo) {
-               this.processingMode = 
Preconditions.checkNotNull(processingMode);
-               this.checkpointInterval = checkpointInterval;
-               this.checkpointTimeout = checkpointTimeout;
-               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
-               this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-               this.externalizedCheckpointInfo = 
Preconditions.checkNotNull(externalizedCheckpointInfo);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               CheckpointConfigInfo that = (CheckpointConfigInfo) o;
-               return checkpointInterval == that.checkpointInterval &&
-                       checkpointTimeout == that.checkpointTimeout &&
-                       minPauseBetweenCheckpoints == 
that.minPauseBetweenCheckpoints &&
-                       maxConcurrentCheckpoints == 
that.maxConcurrentCheckpoints &&
-                       processingMode == that.processingMode &&
-                       Objects.equals(externalizedCheckpointInfo, 
that.externalizedCheckpointInfo);
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(processingMode, checkpointInterval, 
checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, 
externalizedCheckpointInfo);
-       }
-
-       /**
-        * Contains information about the externalized checkpoint configuration.
-        */
-       public static final class ExternalizedCheckpointInfo {
-
-               public static final String FIELD_NAME_ENABLED = "enabled";
-
-               public static final String FIELD_NAME_DELETE_ON_CANCELLATION = 
"delete_on_cancellation";
-
-               @JsonProperty(FIELD_NAME_ENABLED)
-               private final boolean enabled;
-
-               @JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION)
-               private final boolean deleteOnCancellation;
-
-               @JsonCreator
-               public ExternalizedCheckpointInfo(
-                               @JsonProperty(FIELD_NAME_ENABLED) boolean 
enabled,
-                               
@JsonProperty(FIELD_NAME_DELETE_ON_CANCELLATION) boolean deleteOnCancellation) {
-                       this.enabled = enabled;
-                       this.deleteOnCancellation = deleteOnCancellation;
-               }
-
-               @Override
-               public boolean equals(Object o) {
-                       if (this == o) {
-                               return true;
-                       }
-                       if (o == null || getClass() != o.getClass()) {
-                               return false;
-                       }
-                       ExternalizedCheckpointInfo that = 
(ExternalizedCheckpointInfo) o;
-                       return enabled == that.enabled &&
-                               deleteOnCancellation == 
that.deleteOnCancellation;
-               }
-
-               @Override
-               public int hashCode() {
-                       return Objects.hash(enabled, deleteOnCancellation);
-               }
-       }
-
-       /**
-        * Processing mode.
-        */
-       public enum ProcessingMode {
-               AT_LEAST_ONCE,
-               EXACTLY_ONCE
-       }
-}

Reply via email to