[FLINK-4720] Implement archived ExecutionGraph

This closes #2577.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e8e2dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e8e2dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e8e2dc

Branch: refs/heads/master
Commit: 21e8e2dcf77f9d0dc3a74204626b776d87a9cd15
Parents: f6d8668
Author: zentol <ches...@apache.org>
Authored: Thu Sep 22 14:02:22 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Fri Oct 14 13:55:28 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/api/common/Archiveable.java    |  24 +
 .../api/common/ArchivedExecutionConfig.java     |  73 ++++
 .../flink/api/common/ExecutionConfig.java       |   7 +-
 .../webmonitor/ExecutionGraphHolder.java        |   9 +-
 .../AbstractExecutionGraphRequestHandler.java   |   6 +-
 .../AbstractJobVertexRequestHandler.java        |  10 +-
 .../AbstractSubtaskAttemptRequestHandler.java   |  12 +-
 .../handlers/AbstractSubtaskRequestHandler.java |  10 +-
 .../handlers/JobAccumulatorsHandler.java        |   4 +-
 .../handlers/JobCheckpointsHandler.java         |   4 +-
 .../webmonitor/handlers/JobConfigHandler.java   |  10 +-
 .../webmonitor/handlers/JobDetailsHandler.java  |  13 +-
 .../handlers/JobExceptionsHandler.java          |  19 +-
 .../webmonitor/handlers/JobPlanHandler.java     |   4 +-
 .../handlers/JobVertexAccumulatorsHandler.java  |   4 +-
 .../handlers/JobVertexBackPressureHandler.java  |   9 +-
 .../handlers/JobVertexCheckpointsHandler.java   |  42 +-
 .../handlers/JobVertexDetailsHandler.java       |  10 +-
 .../handlers/JobVertexTaskManagersHandler.java  |  21 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |   4 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |   6 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |   6 +-
 .../SubtasksAllAccumulatorsHandler.java         |   8 +-
 .../handlers/SubtasksTimesHandler.java          |  10 +-
 .../BackPressureStatsTrackerITCase.java         |   3 +-
 .../StackTraceSampleCoordinatorITCase.java      |   3 +-
 .../JobVertexCheckpointsHandlerTest.java        |  22 +-
 .../ArchivedCheckpointStatsTracker.java         |  53 +++
 .../checkpoint/stats/CheckpointStats.java       |   4 +-
 .../checkpoint/stats/JobCheckpointStats.java    |   3 +-
 .../stats/OperatorCheckpointStats.java          |   2 +
 .../stats/SimpleCheckpointStatsTracker.java     |   2 +
 .../runtime/executiongraph/AccessExecution.java | 105 +++++
 .../executiongraph/AccessExecutionGraph.java    | 161 +++++++
 .../AccessExecutionJobVertex.java               |  98 +++++
 .../executiongraph/AccessExecutionVertex.java   |  85 ++++
 .../executiongraph/ArchivedExecution.java       | 118 +++++
 .../executiongraph/ArchivedExecutionGraph.java  | 297 +++++++++++++
 .../ArchivedExecutionJobVertex.java             | 136 ++++++
 .../executiongraph/ArchivedExecutionVertex.java |  96 ++++
 .../flink/runtime/executiongraph/Execution.java |  41 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 154 ++++---
 .../executiongraph/ExecutionJobVertex.java      |  61 ++-
 .../runtime/executiongraph/ExecutionVertex.java |  50 +--
 .../archive/ExecutionConfigSummary.java         |  75 ----
 .../runtime/webmonitor/WebMonitorUtils.java     |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   7 +-
 .../runtime/jobmanager/MemoryArchivist.scala    |   4 +-
 .../runtime/messages/ArchiveMessages.scala      |  12 +-
 .../runtime/messages/JobManagerMessages.scala   |   4 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 .../ArchivedExecutionGraphTest.java             | 434 +++++++++++++++++++
 .../runtime/jobmanager/JobManagerTest.java      |   2 +-
 .../LeaderChangeJobRecoveryTest.java            |   2 +-
 .../TestingJobManagerMessages.scala             |   4 +-
 .../flink/test/query/QueryableStateITCase.java  |  14 +-
 56 files changed, 2014 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
new file mode 100644
index 0000000..09a3a0c
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Archiveable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+
+public interface Archiveable<T extends Serializable> {
+       T archive();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
new file mode 100644
index 0000000..faf920d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Serializable class which is created when archiving the job.
+ * It can be used to display job information on the web interface
+ * without having to keep the classloader around after job completion.
+ */
+public class ArchivedExecutionConfig implements Serializable {
+
+       private final String executionMode;
+       private final String restartStrategyDescription;
+       private final int parallelism;
+       private final boolean objectReuseEnabled;
+       private final Map<String, String> globalJobParameters;
+
+       public ArchivedExecutionConfig(ExecutionConfig ec) {
+               executionMode = ec.getExecutionMode().name();
+               if (ec.getRestartStrategy() != null) {
+                       restartStrategyDescription = 
ec.getRestartStrategy().getDescription();
+               } else {
+                       restartStrategyDescription = "default";
+               }
+               parallelism = ec.getParallelism();
+               objectReuseEnabled = ec.isObjectReuseEnabled();
+               if (ec.getGlobalJobParameters() != null
+                               && ec.getGlobalJobParameters().toMap() != null) 
{
+                       globalJobParameters = 
ec.getGlobalJobParameters().toMap();
+               } else {
+                       globalJobParameters = Collections.emptyMap();
+               }
+       }
+
+       public String getExecutionMode() {
+               return executionMode;
+       }
+
+       public String getRestartStrategyDescription() {
+               return restartStrategyDescription;
+       }
+
+       public int getParallelism() {
+               return parallelism;
+       }
+
+       public boolean getObjectReuseEnabled() {
+               return objectReuseEnabled;
+       }
+
+       public Map<String, String> getGlobalJobParameters() {
+               return globalJobParameters;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index aadf867..a0a63b1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -58,7 +58,7 @@ import java.util.Objects;
  * </ul>
  */
 @Public
-public class ExecutionConfig implements Serializable {
+public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecutionConfig> {
 
        private static final long serialVersionUID = 1L;
 
@@ -770,6 +770,11 @@ public class ExecutionConfig implements Serializable {
        public boolean canEqual(Object obj) {
                return obj instanceof ExecutionConfig;
        }
+       
+       @Override
+       public ArchivedExecutionConfig archive() {
+               return new ArchivedExecutionConfig(this);
+       }
 
 
        // ------------------------------ Utilities  
----------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 7691874..3d0cfc0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -47,7 +48,7 @@ public class ExecutionGraphHolder {
 
        private final FiniteDuration timeout;
 
-       private final WeakHashMap<JobID, ExecutionGraph> cache = new 
WeakHashMap<JobID, ExecutionGraph>();
+       private final WeakHashMap<JobID, AccessExecutionGraph> cache = new 
WeakHashMap<>();
 
        public ExecutionGraphHolder() {
                this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
@@ -63,8 +64,8 @@ public class ExecutionGraphHolder {
         * @param jid jobID of the execution graph to be retrieved
         * @return the retrieved execution graph or null if it is not 
retrievable
         */
-       public ExecutionGraph getExecutionGraph(JobID jid, ActorGateway 
jobManager) {
-               ExecutionGraph cached = cache.get(jid);
+       public AccessExecutionGraph getExecutionGraph(JobID jid, ActorGateway 
jobManager) {
+               AccessExecutionGraph cached = cache.get(jid);
                if (cached != null) {
                        return cached;
                }
@@ -78,7 +79,7 @@ public class ExecutionGraphHolder {
                                        return null;
                                }
                                else if (result instanceof 
JobManagerMessages.JobFound) {
-                                       ExecutionGraph eg = 
((JobManagerMessages.JobFound) result).executionGraph();
+                                       AccessExecutionGraph eg = 
((JobManagerMessages.JobFound) result).executionGraph();
                                        cache.put(jid, eg);
                                        return eg;
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 16cfb1a..ff28d4e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.NotFoundException;
@@ -53,7 +53,7 @@ public abstract class AbstractExecutionGraphRequestHandler 
implements RequestHan
                        throw new RuntimeException("Invalid JobID string '" + 
jidString + "': " + e.getMessage()); 
                }
                
-               ExecutionGraph eg = executionGraphHolder.getExecutionGraph(jid, 
jobManager);
+               AccessExecutionGraph eg = 
executionGraphHolder.getExecutionGraph(jid, jobManager);
                if (eg == null) {
                        throw new NotFoundException("Could not find job with id 
" + jid);
                }
@@ -61,5 +61,5 @@ public abstract class AbstractExecutionGraphRequestHandler 
implements RequestHan
                return handleRequest(eg, pathParams);
        }
        
-       public abstract String handleRequest(ExecutionGraph graph, Map<String, 
String> params) throws Exception;
+       public abstract String handleRequest(AccessExecutionGraph graph, 
Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index 5b12907..a36f94a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -36,7 +36,7 @@ public abstract class AbstractJobVertexRequestHandler extends 
AbstractExecutionG
        }
 
        @Override
-       public final String handleRequest(ExecutionGraph graph, Map<String, 
String> params) throws Exception {
+       public final String handleRequest(AccessExecutionGraph graph, 
Map<String, String> params) throws Exception {
                final String vidString = params.get("vertexid");
                if (vidString == null) {
                        throw new IllegalArgumentException("vertexId parameter 
missing");
@@ -50,7 +50,7 @@ public abstract class AbstractJobVertexRequestHandler extends 
AbstractExecutionG
                        throw new IllegalArgumentException("Invalid JobVertexID 
string '" + vidString + "': " + e.getMessage());
                }
 
-               final ExecutionJobVertex jobVertex = graph.getJobVertex(vid);
+               final AccessExecutionJobVertex jobVertex = 
graph.getJobVertex(vid);
                if (jobVertex == null) {
                        throw new IllegalArgumentException("No vertex with ID 
'" + vidString + "' exists.");
                }
@@ -58,5 +58,5 @@ public abstract class AbstractJobVertexRequestHandler extends 
AbstractExecutionG
                return handleRequest(jobVertex, params);
        }
        
-       public abstract String handleRequest(ExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception;
+       public abstract String handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 672df16..f3a5059 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -37,7 +37,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler 
extends AbstractSubta
        }
        
        @Override
-       public String handleRequest(ExecutionVertex vertex, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionVertex vertex, Map<String, 
String> params) throws Exception {
                final String attemptNumberString = params.get("attempt");
                if (attemptNumberString == null) {
                        throw new RuntimeException("Attempt number parameter 
missing");
@@ -51,12 +51,12 @@ public abstract class AbstractSubtaskAttemptRequestHandler 
extends AbstractSubta
                        throw new RuntimeException("Invalid attempt number 
parameter");
                }
                
-               final Execution currentAttempt = 
vertex.getCurrentExecutionAttempt();
+               final AccessExecution currentAttempt = 
vertex.getCurrentExecutionAttempt();
                if (attempt == currentAttempt.getAttemptNumber()) {
                        return handleRequest(currentAttempt, params);
                }
                else if (attempt >= 0 && attempt < 
currentAttempt.getAttemptNumber()) {
-                       Execution exec = 
vertex.getPriorExecutionAttempt(attempt);
+                       AccessExecution exec = 
vertex.getPriorExecutionAttempt(attempt);
                        return handleRequest(exec, params);
                }
                else {
@@ -64,5 +64,5 @@ public abstract class AbstractSubtaskAttemptRequestHandler 
extends AbstractSubta
                }
        }
 
-       public abstract String handleRequest(Execution execAttempt, Map<String, 
String> params) throws Exception;
+       public abstract String handleRequest(AccessExecution execAttempt, 
Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index 90866c6..d6b279c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -36,7 +36,7 @@ public abstract class AbstractSubtaskRequestHandler extends 
AbstractJobVertexReq
        }
 
        @Override
-       public final String handleRequest(ExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
+       public final String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                final String subtaskNumberString = params.get("subtasknum");
                if (subtaskNumberString == null) {
                        throw new RuntimeException("Subtask number parameter 
missing");
@@ -54,9 +54,9 @@ public abstract class AbstractSubtaskRequestHandler extends 
AbstractJobVertexReq
                        throw new RuntimeException("subtask does not exist: " + 
subtask); 
                }
                
-               final ExecutionVertex vertex = 
jobVertex.getTaskVertices()[subtask];
+               final AccessExecutionVertex vertex = 
jobVertex.getTaskVertices()[subtask];
                return handleRequest(vertex, params);
        }
 
-       public abstract String handleRequest(ExecutionVertex vertex, 
Map<String, String> params) throws Exception;
+       public abstract String handleRequest(AccessExecutionVertex vertex, 
Map<String, String> params) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index c5418b3..29613a0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -36,7 +36,7 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
                
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
index b63ab0e..404a14e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCheckpointsHandler.java
@@ -22,7 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import scala.Option;
 
@@ -39,7 +39,7 @@ public class JobCheckpointsHandler extends 
AbstractExecutionGraphRequestHandler
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 75389b1..21639ef 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -22,8 +22,8 @@ import java.io.StringWriter;
 import java.util.Map;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 /**
@@ -36,7 +36,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
 
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
@@ -45,7 +45,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                gen.writeStringField("jid", graph.getJobID().toString());
                gen.writeStringField("name", graph.getJobName());
 
-               final ExecutionConfigSummary summary = 
graph.getExecutionConfigSummary();
+               final ArchivedExecutionConfig summary = 
graph.getArchivedExecutionConfig();
 
                if (summary != null) {
                        gen.writeObjectFieldStart("execution-config");
@@ -59,7 +59,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                        Map<String, String> ucVals = 
summary.getGlobalJobParameters();
                        if (ucVals != null) {
                                gen.writeObjectFieldStart("user-config");
-
+                               
                                for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
                                        gen.writeStringField(ucVal.getKey(), 
ucVal.getValue());
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 884b859..e7a2a8c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -50,7 +51,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                final StringWriter writer = new StringWriter();
                final JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -84,13 +85,13 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                int[] jobVerticesPerState = new 
int[ExecutionState.values().length];
                gen.writeArrayFieldStart("vertices");
 
-               for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) 
{
+               for (AccessExecutionJobVertex ejv : 
graph.getVerticesTopologically()) {
                        int[] tasksPerState = new 
int[ExecutionState.values().length];
                        long startTime = Long.MAX_VALUE;
                        long endTime = 0;
                        boolean allFinished = true;
                        
-                       for (ExecutionVertex vertex : ejv.getTaskVertices()) {
+                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
                                final ExecutionState state = 
vertex.getExecutionState();
                                tasksPerState[state.ordinal()]++;
 
@@ -133,7 +134,7 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
 
                        gen.writeStartObject();
                        gen.writeStringField("id", 
ejv.getJobVertexId().toString());
-                       gen.writeStringField("name", 
ejv.getJobVertex().getName());
+                       gen.writeStringField("name", ejv.getName());
                        gen.writeNumberField("parallelism", 
ejv.getParallelism());
                        gen.writeStringField("status", jobVertexState.name());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index ce154e3..90197d0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.ExceptionUtils;
 
 import java.io.StringWriter;
 import java.util.Map;
@@ -40,16 +39,16 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
                gen.writeStartObject();
                
                // most important is the root failure cause
-               Throwable rootException = graph.getFailureCause();
+               String rootException = graph.getFailureCauseAsString();
                if (rootException != null) {
-                       gen.writeStringField("root-exception", 
ExceptionUtils.stringifyException(rootException));
+                       gen.writeStringField("root-exception", rootException);
                }
 
                // we additionally collect all exceptions (up to a limit) that 
occurred in the individual tasks
@@ -58,8 +57,8 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                int numExceptionsSoFar = 0;
                boolean truncated = false;
                
-               for (ExecutionVertex task : graph.getAllExecutionVertices()) {
-                       Throwable t = task.getFailureCause();
+               for (AccessExecutionVertex task : 
graph.getAllExecutionVertices()) {
+                       String t = task.getFailureCauseAsString();
                        if (t != null) {
                                if (numExceptionsSoFar >= 
MAX_NUMBER_EXCEPTION_TO_REPORT) {
                                        truncated = true;
@@ -71,8 +70,8 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                                                location.getFQDNHostname() + 
':' + location.dataPort() : "(unassigned)";
 
                                gen.writeStartObject();
-                               gen.writeStringField("exception", 
ExceptionUtils.stringifyException(t));
-                               gen.writeStringField("task", 
task.getSimpleName());
+                               gen.writeStringField("exception", t);
+                               gen.writeStringField("task", 
task.getTaskNameWithSubtaskIndex());
                                gen.writeStringField("location", 
locationString);
                                gen.writeEndObject();
                                numExceptionsSoFar++;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index 0389f5a..64f7000 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -34,7 +34,7 @@ public class JobPlanHandler extends 
AbstractExecutionGraphRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionGraph graph, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                return graph.getJsonPlan();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index 5df565a..ad4e207 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -35,7 +35,7 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
                
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
index 65f82a3..c5bacf2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -56,9 +58,12 @@ public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandle
 
        @Override
        public String handleRequest(
-                       ExecutionJobVertex jobVertex,
+                       AccessExecutionJobVertex accessJobVertex,
                        Map<String, String> params) throws Exception {
-
+               if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
+                       return "";
+               }
+               ExecutionJobVertex jobVertex = (ExecutionJobVertex) 
accessJobVertex;
                try (StringWriter writer = new StringWriter();
                                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer)) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
index 6522de5..8a68ffa 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandler.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import scala.Option;
 
@@ -38,36 +37,31 @@ public class JobVertexCheckpointsHandler extends 
AbstractJobVertexRequestHandler
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
                gen.writeStartObject();
 
-               CheckpointStatsTracker tracker = 
jobVertex.getGraph().getCheckpointStatsTracker();
+               Option<OperatorCheckpointStats> statsOption = 
jobVertex.getCheckpointStats();
 
-               if (tracker != null) {
-                       Option<OperatorCheckpointStats> statsOption = tracker
-                                       
.getOperatorStats(jobVertex.getJobVertexId());
+               if (statsOption.isDefined()) {
+                       OperatorCheckpointStats stats = statsOption.get();
 
-                       if (statsOption.isDefined()) {
-                               OperatorCheckpointStats stats = 
statsOption.get();
+                       gen.writeNumberField("id", stats.getCheckpointId());
+                       gen.writeNumberField("timestamp", 
stats.getTriggerTimestamp());
+                       gen.writeNumberField("duration", stats.getDuration());
+                       gen.writeNumberField("size", stats.getStateSize());
+                       gen.writeNumberField("parallelism", 
stats.getNumberOfSubTasks());
 
-                               gen.writeNumberField("id", 
stats.getCheckpointId());
-                               gen.writeNumberField("timestamp", 
stats.getTriggerTimestamp());
-                               gen.writeNumberField("duration", 
stats.getDuration());
-                               gen.writeNumberField("size", 
stats.getStateSize());
-                               gen.writeNumberField("parallelism", 
stats.getNumberOfSubTasks());
-
-                               gen.writeArrayFieldStart("subtasks");
-                               for (int i = 0; i < 
stats.getNumberOfSubTasks(); i++) {
-                                       gen.writeStartObject();
-                                       gen.writeNumberField("subtask", i);
-                                       gen.writeNumberField("duration", 
stats.getSubTaskDuration(i));
-                                       gen.writeNumberField("size", 
stats.getSubTaskStateSize(i));
-                                       gen.writeEndObject();
-                               }
-                               gen.writeEndArray();
+                       gen.writeArrayFieldStart("subtasks");
+                       for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
+                               gen.writeStartObject();
+                               gen.writeNumberField("subtask", i);
+                               gen.writeNumberField("duration", 
stats.getSubTaskDuration(i));
+                               gen.writeNumberField("size", 
stats.getSubTaskStateSize(i));
+                               gen.writeEndObject();
                        }
+                       gen.writeEndArray();
                }
 
                gen.writeEndObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 813ecb8..fbdd86b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -43,7 +43,7 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                final long now = System.currentTimeMillis();
                
                StringWriter writer = new StringWriter();
@@ -52,13 +52,13 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                gen.writeStartObject();
 
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", 
jobVertex.getJobVertex().getName());
+               gen.writeStringField("name", jobVertex.getName());
                gen.writeNumberField("parallelism", jobVertex.getParallelism());
                gen.writeNumberField("now", now);
 
                gen.writeArrayFieldStart("subtasks");
                int num = 0;
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
                        final ExecutionState status = 
vertex.getExecutionState();
                        
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index cbdb87f..0e94334 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -23,8 +23,9 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -46,18 +47,18 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                // Build a map that groups tasks by TaskManager
-               Map<String, List<ExecutionVertex>> taskManagerVertices = new 
HashMap<>();
+               Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
 
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
                        String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ":" + location.dataPort();
 
-                       List<ExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
+                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
 
                        if (vertices == null) {
-                               vertices = new ArrayList<ExecutionVertex>();
+                               vertices = new ArrayList<>();
                                taskManagerVertices.put(taskManager, vertices);
                        }
 
@@ -73,13 +74,13 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                gen.writeStartObject();
 
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", 
jobVertex.getJobVertex().getName());
+               gen.writeStringField("name", jobVertex.getName());
                gen.writeNumberField("now", now);
 
                gen.writeArrayFieldStart("taskmanagers");
-               for (Entry<String, List<ExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
+               for (Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
                        String host = entry.getKey();
-                       List<ExecutionVertex> taskVertices = entry.getValue();
+                       List<AccessExecutionVertex> taskVertices = 
entry.getValue();
 
                        int[] tasksPerState = new 
int[ExecutionState.values().length];
 
@@ -92,7 +93,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        LongCounter tmReadRecords = new LongCounter();
                        LongCounter tmWriteRecords = new LongCounter();
 
-                       for (ExecutionVertex vertex : taskVertices) {
+                       for (AccessExecutionVertex vertex : taskVertices) {
                                final ExecutionState state = 
vertex.getExecutionState();
                                tasksPerState[state.ordinal()]++;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
index d301bd1..811bea6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
@@ -33,7 +33,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttempt
        }
 
        @Override
-       public String handleRequest(ExecutionVertex vertex, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecutionVertex vertex, Map<String, 
String> params) throws Exception {
                return handleRequest(vertex.getCurrentExecutionAttempt(), 
params);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index 14ccc0c..786f5e8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.io.StringWriter;
@@ -38,7 +38,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
        }
 
        @Override
-       public String handleRequest(Execution execAttempt, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
                final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
                
                StringWriter writer = new StringWriter();
@@ -46,7 +46,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
                gen.writeStartObject();
 
-               gen.writeNumberField("subtask", 
execAttempt.getVertex().getParallelSubtaskIndex());
+               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
                gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
                gen.writeStringField("id", 
execAttempt.getAttemptId().toString());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index a1e6d0e..3cc7376 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -41,7 +41,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
        }
 
        @Override
-       public String handleRequest(Execution execAttempt, Map<String, String> 
params) throws Exception {
+       public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
                final ExecutionState status = execAttempt.getState();
                final long now = System.currentTimeMillis();
 
@@ -78,7 +78,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
                gen.writeStartObject();
-               gen.writeNumberField("subtask", 
execAttempt.getVertex().getParallelSubtaskIndex());
+               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
                gen.writeStringField("status", status.name());
                gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
                gen.writeStringField("host", locationString);

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 780bd4b..892a606 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -39,7 +39,7 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -50,7 +50,7 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
                gen.writeArrayFieldStart("subtasks");
                
                int num = 0;
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
 
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
                        String locationString = location == null ? 
"(unassigned)" : location.getHostname();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 9e6276d..76349ee 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
@@ -41,7 +41,7 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
        }
 
        @Override
-       public String handleRequest(ExecutionJobVertex jobVertex, Map<String, 
String> params) throws Exception {
+       public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
                final long now = System.currentTimeMillis();
 
                StringWriter writer = new StringWriter();
@@ -50,13 +50,13 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
                gen.writeStartObject();
 
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", 
jobVertex.getJobVertex().getName());
+               gen.writeStringField("name", jobVertex.getName());
                gen.writeNumberField("now", now);
                
                gen.writeArrayFieldStart("subtasks");
 
                int num = 0;
-               for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
                        
                        long[] timestamps = 
vertex.getCurrentExecutionAttempt().getStateTimestamps();
                        ExecutionState status = vertex.getExecutionState();

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 25dc189..507c977 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
@@ -153,7 +152,7 @@ public class BackPressureStatsTrackerITCase extends 
TestLogger {
                                                        ExecutionGraphFound 
executionGraphResponse =
                                                                        
expectMsgClass(ExecutionGraphFound.class);
 
-                                                       ExecutionGraph 
executionGraph = executionGraphResponse.executionGraph();
+                                                       ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
                                                        ExecutionJobVertex 
vertex = executionGraph.getJobVertex(task.getID());
 
                                                        
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..c4ce9d1 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -125,7 +124,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                                                                jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
                                                                
ExecutionGraphFound executionGraphResponse =
                                                                                
expectMsgClass(ExecutionGraphFound.class);
-                                                               ExecutionGraph 
executionGraph = executionGraphResponse.executionGraph();
+                                                               ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
                                                                
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
 
                                                                
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
index f882663..18aae35 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexCheckpointsHandlerTest.java
@@ -20,9 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -37,8 +35,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -49,10 +45,9 @@ public class JobVertexCheckpointsHandlerTest {
                JobVertexCheckpointsHandler handler = new 
JobVertexCheckpointsHandler(
                                mock(ExecutionGraphHolder.class));
 
-               ExecutionGraph graph = mock(ExecutionGraph.class);
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-
-               when(vertex.getGraph()).thenReturn(graph);
+               when(vertex.getCheckpointStats())
+                       .thenReturn(Option.<OperatorCheckpointStats>empty());
 
                String response = handler.handleRequest(vertex, 
Collections.<String, String>emptyMap());
 
@@ -65,15 +60,10 @@ public class JobVertexCheckpointsHandlerTest {
                JobVertexCheckpointsHandler handler = new 
JobVertexCheckpointsHandler(
                                mock(ExecutionGraphHolder.class));
 
-               ExecutionGraph graph = mock(ExecutionGraph.class);
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
-
-               when(vertex.getGraph()).thenReturn(graph);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
 
                // No stats
-               when(tracker.getOperatorStats(any(JobVertexID.class)))
+               when(vertex.getCheckpointStats())
                                
.thenReturn(Option.<OperatorCheckpointStats>empty());
 
                String response = handler.handleRequest(vertex, 
Collections.<String, String>emptyMap());
@@ -89,13 +79,9 @@ public class JobVertexCheckpointsHandlerTest {
 
                JobVertexID vertexId = new JobVertexID();
 
-               ExecutionGraph graph = mock(ExecutionGraph.class);
                ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-               CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
 
                when(vertex.getJobVertexId()).thenReturn(vertexId);
-               when(vertex.getGraph()).thenReturn(graph);
-               when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
 
                long[][] subTaskStats = new long[][] {
                                new long[] { 1, 10 },
@@ -113,7 +99,7 @@ public class JobVertexCheckpointsHandlerTest {
                OperatorCheckpointStats stats = new OperatorCheckpointStats(
                                3, 6812, 2800, 1024, subTaskStats);
 
-               when(tracker.getOperatorStats(eq(vertexId)))
+               when(vertex.getCheckpointStats())
                                .thenReturn(Option.apply(stats));
 
                // Request stats

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
new file mode 100644
index 0000000..92df7d7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ArchivedCheckpointStatsTracker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedCheckpointStatsTracker implements CheckpointStatsTracker, 
Serializable {
+       private static final long serialVersionUID = 1469003563086353555L;
+
+       private final Option<JobCheckpointStats> jobStats;
+       private final Map<JobVertexID, OperatorCheckpointStats> operatorStats;
+
+       public ArchivedCheckpointStatsTracker(Option<JobCheckpointStats> 
jobStats, Map<JobVertexID, OperatorCheckpointStats> operatorStats) {
+               this.jobStats = jobStats;
+               this.operatorStats = operatorStats;
+       }
+
+       @Override
+       public void onCompletedCheckpoint(CompletedCheckpoint checkpoint) {
+       }
+
+       @Override
+       public Option<JobCheckpointStats> getJobStats() {
+               return jobStats;
+       }
+
+       @Override
+       public Option<OperatorCheckpointStats> getOperatorStats(JobVertexID 
operatorId) {
+               return Option.apply(operatorStats.get(operatorId));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
index dc239dd..64f17d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/CheckpointStats.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint.stats;
 
+import java.io.Serializable;
+
 /**
  * Statistics for a specific checkpoint.
  */
-public class CheckpointStats {
+public class CheckpointStats implements Serializable {
 
        /** ID of the checkpoint. */
        private final long checkpointId;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
index b1d7ff2..e156c8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
@@ -20,12 +20,13 @@ package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.configuration.ConfigConstants;
 
+import java.io.Serializable;
 import java.util.List;
 
 /**
  * Snapshot of checkpoint statistics for a job.
  */
-public interface JobCheckpointStats {
+public interface JobCheckpointStats extends Serializable {
 
        // 
------------------------------------------------------------------------
        // General stats

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
index 5b113d8..6c2d497 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/OperatorCheckpointStats.java
@@ -27,6 +27,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class OperatorCheckpointStats extends CheckpointStats {
 
+       private static final long serialVersionUID = -1594736655739376140L;
+
        /** Duration in milliseconds and state sizes in bytes per sub task. */
        private final long[][] subTaskStats;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index db8a0e0..39fbad5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -348,6 +348,8 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
         */
        private static class JobCheckpointStatsSnapshot implements 
JobCheckpointStats {
 
+               private static final long serialVersionUID = 
7558212015099742418L;
+
                // General
                private final List<CheckpointStats> recentHistory;
                private final long count;

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
new file mode 100644
index 0000000..aefc17d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link Execution and {@link 
ArchivedExecution}.
+ */
+public interface AccessExecution {
+       /**
+        * Returns the {@link ExecutionAttemptID} for this Execution.
+        *
+        * @return ExecutionAttemptID for this execution
+        */
+       ExecutionAttemptID getAttemptId();
+
+       /**
+        * Returns the attempt number for this execution.
+        *
+        * @return attempt number for this execution.
+        */
+       int getAttemptNumber();
+
+       /**
+        * Returns the timestamps for every {@link ExecutionState}.
+        *
+        * @return timestamps for each state
+        */
+       long[] getStateTimestamps();
+
+       /**
+        * Returns the current {@link ExecutionState} for this execution.
+        *
+        * @return execution state for this execution
+        */
+       ExecutionState getState();
+
+       /**
+        * Returns the {@link TaskManagerLocation} for this execution.
+        *
+        * @return taskmanager location for this execution.
+        */
+       TaskManagerLocation getAssignedResourceLocation();
+
+       /**
+        * Returns the exception that caused the job to fail. This is the first 
root exception
+        * that was not recoverable and triggered job failure.
+        *
+        * @return failure exception as a string, or {@code "(null)"}
+        */
+       String getFailureCauseAsString();
+
+       /**
+        * Returns the timestamp for the given {@link ExecutionState}.
+        *
+        * @param state state for which the timestamp should be returned
+        * @return timestamp for the given state
+        */
+       long getStateTimestamp(ExecutionState state);
+
+       /**
+        * Returns the user-defined accumulators as strings.
+        *
+        * @return user-defined accumulators as strings.
+        */
+       StringifiedAccumulatorResult[] getUserAccumulatorsStringified();
+
+       /**
+        * Returns the system-defined accumulators.
+        *
+        * @return system-defined accumulators.
+        * @deprecated Will be removed in FLINK-4527
+        */
+       @Deprecated
+       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getFlinkAccumulators();
+
+       /**
+        * Returns the subtask index of this execution.
+        *
+        * @return subtask index of this execution.
+        */
+       int getParallelSubtaskIndex();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
new file mode 100644
index 0000000..0ff6ace
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionGraph} and {@link 
ArchivedExecutionGraph}.
+ */
+public interface AccessExecutionGraph {
+       /**
+        * Returns the job plan as a JSON string.
+        *
+        * @return job plan as a JSON string
+        */
+       String getJsonPlan();
+
+       /**
+        * Returns the {@link JobID} for this execution graph.
+        *
+        * @return job ID for this execution graph
+        */
+       JobID getJobID();
+
+       /**
+        * Returns the job name for thie execution graph.
+        *
+        * @return job name for this execution graph
+        */
+       String getJobName();
+
+       /**
+        * Returns the current {@link JobStatus} for this execution graph.
+        *
+        * @return job status for this execution graph
+        */
+       JobStatus getState();
+
+       /**
+        * Returns the exception that caused the job to fail. This is the first 
root exception
+        * that was not recoverable and triggered job failure.
+        *
+        * @return failure causing exception as a string, or {@code "(null)"}
+        */
+       String getFailureCauseAsString();
+
+       /**
+        * Returns the job vertex for the given {@link JobVertexID}.
+        *
+        * @param id id of job vertex to be returned
+        * @return job vertex for the given id, or null
+        */
+       AccessExecutionJobVertex getJobVertex(JobVertexID id);
+
+       /**
+        * Returns a map containing all job vertices for this execution graph.
+        *
+        * @return map containing all job vertices for this execution graph
+        */
+       Map<JobVertexID, ? extends AccessExecutionJobVertex> getAllVertices();
+
+       /**
+        * Returns an iterable containing all job vertices for this execution 
graph in the order they were created.
+        *
+        * @return iterable containing all job vertices for this execution 
graph in the order they were creater
+        */
+       Iterable<? extends AccessExecutionJobVertex> getVerticesTopologically();
+
+       /**
+        * Returns an iterable containing all execution vertices for this 
execution graph.
+        *
+        * @return iterable containing all execution vertices for this 
execution graph
+        */
+       Iterable<? extends AccessExecutionVertex> getAllExecutionVertices();
+
+       /**
+        * Returns the timestamp for the given {@link JobStatus}
+        *
+        * @param status status for which the timestamp should be returned
+        * @return timestamp for the given job status
+        */
+       long getStatusTimestamp(JobStatus status);
+
+       /**
+        * Returns the {@link CheckpointStatsTracker} for this execution graph.
+        *
+        * @return CheckpointStatsTracker for thie execution graph
+        */
+       CheckpointStatsTracker getCheckpointStatsTracker();
+
+       /**
+        * Returns the {@link ArchivedExecutionConfig} for this execution graph.
+        *
+        * @return execution config summary for this execution graph, or null 
in case of errors
+        */
+       ArchivedExecutionConfig getArchivedExecutionConfig();
+
+       /**
+        * Returns whether the job for this execution graph is stoppable.
+        *
+        * @return true, if all sources tasks are stoppable, false otherwise
+        */
+       boolean isStoppable();
+
+       /**
+        * Returns the aggregated user-defined accumulators as strings.
+        *
+        * @return aggregated user-defined accumulators as strings.
+        */
+       StringifiedAccumulatorResult[] getAccumulatorResultsStringified();
+
+       /**
+        * Returns a map containing the serialized values of user-defined 
accumulators.
+        *
+        * @return map containing serialized values of user-defined accumulators
+        * @throws IOException indicates that the serialization has failed
+        */
+       Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws 
IOException;
+
+       /**
+        * Returns the aggregated system-defined accumulators.
+        *
+        * @return aggregated system-defined accumulators.
+        * @deprecated Will be removed in FLINK-4527
+        */
+       @Deprecated
+       Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, 
?>>> getFlinkAccumulators();
+
+       /**
+        * Returns whether this execution graph was archived.
+        *
+        * @return true, if the execution graph was archived, false otherwise
+        */
+       boolean isArchived();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
new file mode 100644
index 0000000..c9bf604
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionJobVertex.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.util.Map;
+
+/**
+ * Common interface for the runtime {@link ExecutionJobVertex} and {@link 
ArchivedExecutionJobVertex}.
+ */
+public interface AccessExecutionJobVertex {
+       /**
+        * Returns the name for this job vertex.
+        *
+        * @return name for this job vertex.
+        */
+       String getName();
+
+       /**
+        * Returns the parallelism for this job vertex.
+        *
+        * @return parallelism for this job vertex.
+        */
+       int getParallelism();
+
+       /**
+        * Returns the max parallelism for this job vertex.
+        *
+        * @return max parallelism for this job vertex.
+        */
+       int getMaxParallelism();
+
+       /**
+        * Returns the {@link JobVertexID} for this job vertex.
+        *
+        * @return JobVertexID for this job vertex.
+        */
+       JobVertexID getJobVertexId();
+
+       /**
+        * Returns all execution vertices for this job vertex.
+        *
+        * @return all execution vertices for this job vertex
+        */
+       AccessExecutionVertex[] getTaskVertices();
+
+       /**
+        * Returns the aggregated {@link ExecutionState} for this job vertex.
+        *
+        * @return aggregated state for this job vertex
+        */
+       ExecutionState getAggregateState();
+
+       /**
+        * Returns the {@link OperatorCheckpointStats} for this job vertex.
+        *
+        * @return checkpoint stats for this job vertex.
+        */
+       Option<OperatorCheckpointStats> getCheckpointStats();
+
+       /**
+        * Returns the aggregated system-defined accumulators.
+        *
+        * @return aggregated system-defined accumulators.
+        * @deprecated Will be removed in FLINK-4527
+        */
+       @Deprecated
+       Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
getAggregatedMetricAccumulators();
+
+       /**
+        * Returns the aggregated user-defined accumulators as strings.
+        *
+        * @return aggregated user-defined accumulators as strings.
+        */
+       StringifiedAccumulatorResult[] 
getAggregatedUserAccumulatorsStringified();
+}

Reply via email to