Repository: flink
Updated Branches:
  refs/heads/master f6d866817 -> 21e8e2dcf


http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
new file mode 100644
index 0000000..d0566d9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.SerializedValue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ArchivedExecutionGraphTest {
+       private static JobVertexID v1ID = new JobVertexID();
+       private static JobVertexID v2ID = new JobVertexID();
+
+       private static ExecutionAttemptID executionWithAccumulatorsID;
+
+       private static ExecutionGraph runtimeGraph;
+
+       @BeforeClass
+       public static void setupExecutionGraph() throws Exception {
+               // 
-------------------------------------------------------------------------------------------------------------
+               // Setup
+               // 
-------------------------------------------------------------------------------------------------------------
+
+               v1ID = new JobVertexID();
+               v2ID = new JobVertexID();
+
+               JobVertex v1 = new JobVertex("v1", v1ID);
+               JobVertex v2 = new JobVertex("v2", v2ID);
+
+               v1.setParallelism(1);
+               v2.setParallelism(2);
+
+               List<JobVertex> vertices = new 
ArrayList<JobVertex>(Arrays.asList(v1, v2));
+
+               ExecutionConfig config = new ExecutionConfig();
+
+               config.setExecutionMode(ExecutionMode.BATCH_FORCED);
+               config.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+               config.setParallelism(4);
+               config.enableObjectReuse();
+               config.setGlobalJobParameters(new TestJobParameters());
+
+               runtimeGraph = new ExecutionGraph(
+                       TestingUtils.defaultExecutionContext(),
+                       new JobID(),
+                       "test job",
+                       new Configuration(),
+                       new SerializedValue<>(config),
+                       AkkaUtils.getDefaultTimeout(),
+                       new NoRestartStrategy());
+               runtimeGraph.attachJobGraph(vertices);
+
+               runtimeGraph.enableSnapshotCheckpointing(
+                       100,
+                       100,
+                       100,
+                       1,
+                       Collections.<ExecutionJobVertex>emptyList(),
+                       Collections.<ExecutionJobVertex>emptyList(),
+                       Collections.<ExecutionJobVertex>emptyList(),
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(1, null),
+                       new HeapSavepointStore(),
+                       new TestCheckpointStatsTracker());
+
+               Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
flinkAccumulators = new HashMap<>();
+               flinkAccumulators.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, 
new LongCounter(32));
+
+               Map<String, Accumulator<?, ?>> userAccumulators = new 
HashMap<>();
+               userAccumulators.put("userAcc", new LongCounter(64));
+
+               Execution executionWithAccumulators = 
runtimeGraph.getJobVertex(v1ID).getTaskVertices()[0].getCurrentExecutionAttempt();
+               executionWithAccumulators.setAccumulators(flinkAccumulators, 
userAccumulators);
+               executionWithAccumulatorsID = 
executionWithAccumulators.getAttemptId();
+
+               
runtimeGraph.getJobVertex(v2ID).getTaskVertices()[0].getCurrentExecutionAttempt().fail(new
 RuntimeException("This exception was thrown on purpose."));
+       }
+
+       @Test
+       public void testArchive() throws IOException, ClassNotFoundException {
+               ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+               compareExecutionGraph(runtimeGraph, archivedGraph);
+       }
+
+       @Test
+       public void testSerialization() throws IOException, 
ClassNotFoundException {
+               ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+
+               verifySerializability(archivedGraph);
+       }
+
+       private static void compareExecutionGraph(AccessExecutionGraph 
runtimeGraph, AccessExecutionGraph archivedGraph) throws IOException, 
ClassNotFoundException {
+               assertTrue(archivedGraph.isArchived());
+               // 
-------------------------------------------------------------------------------------------------------------
+               // ExecutionGraph
+               // 
-------------------------------------------------------------------------------------------------------------
+               assertEquals(runtimeGraph.getJsonPlan(), 
archivedGraph.getJsonPlan());
+               assertEquals(runtimeGraph.getJobID(), archivedGraph.getJobID());
+               assertEquals(runtimeGraph.getJobName(), 
archivedGraph.getJobName());
+               assertEquals(runtimeGraph.getState(), archivedGraph.getState());
+               assertEquals(runtimeGraph.getFailureCauseAsString(), 
archivedGraph.getFailureCauseAsString());
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CREATED), 
archivedGraph.getStatusTimestamp(JobStatus.CREATED));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), 
archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILING), 
archivedGraph.getStatusTimestamp(JobStatus.FAILING));
+               assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FAILED), 
archivedGraph.getStatusTimestamp(JobStatus.FAILED));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING), 
archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), 
archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), 
archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), 
archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
+               
assertEquals(runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), 
archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
+               assertEquals(runtimeGraph.isStoppable(), 
archivedGraph.isStoppable());
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // JobCheckpointStats
+               // 
-------------------------------------------------------------------------------------------------------------
+               JobCheckpointStats runtimeStats = 
runtimeGraph.getCheckpointStatsTracker().getJobStats().get();
+               JobCheckpointStats archivedStats = 
archivedGraph.getCheckpointStatsTracker().getJobStats().get();
+
+               assertEquals(runtimeStats.getAverageDuration(), 
archivedStats.getAverageDuration());
+               assertEquals(runtimeStats.getMinDuration(), 
archivedStats.getMinDuration());
+               assertEquals(runtimeStats.getMaxDuration(), 
archivedStats.getMaxDuration());
+               assertEquals(runtimeStats.getAverageStateSize(), 
archivedStats.getAverageStateSize());
+               assertEquals(runtimeStats.getMinStateSize(), 
archivedStats.getMinStateSize());
+               assertEquals(runtimeStats.getMaxStateSize(), 
archivedStats.getMaxStateSize());
+               assertEquals(runtimeStats.getCount(), archivedStats.getCount());
+               assertEquals(runtimeStats.getRecentHistory(), 
archivedStats.getRecentHistory());
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // ArchivedExecutionConfig
+               // 
-------------------------------------------------------------------------------------------------------------
+               ArchivedExecutionConfig runtimeConfig = 
runtimeGraph.getArchivedExecutionConfig();
+               ArchivedExecutionConfig archivedConfig = 
archivedGraph.getArchivedExecutionConfig();
+
+               assertEquals(runtimeConfig.getExecutionMode(), 
archivedConfig.getExecutionMode());
+               assertEquals(runtimeConfig.getParallelism(), 
archivedConfig.getParallelism());
+               assertEquals(runtimeConfig.getObjectReuseEnabled(), 
archivedConfig.getObjectReuseEnabled());
+               assertEquals(runtimeConfig.getRestartStrategyDescription(), 
archivedConfig.getRestartStrategyDescription());
+               
assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
+               
assertEquals(runtimeConfig.getGlobalJobParameters().get("hello"), 
archivedConfig.getGlobalJobParameters().get("hello"));
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // StringifiedAccumulators
+               // 
-------------------------------------------------------------------------------------------------------------
+               
compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), 
archivedGraph.getAccumulatorResultsStringified());
+               
compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), 
archivedGraph.getAccumulatorsSerialized());
+               
compareFlinkAccumulators(runtimeGraph.getFlinkAccumulators().get(executionWithAccumulatorsID),
 archivedGraph.getFlinkAccumulators().get(executionWithAccumulatorsID));
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // JobVertices
+               // 
-------------------------------------------------------------------------------------------------------------
+               Map<JobVertexID, ? extends AccessExecutionJobVertex> 
runtimeVertices = runtimeGraph.getAllVertices();
+               Map<JobVertexID, ? extends AccessExecutionJobVertex> 
archivedVertices = archivedGraph.getAllVertices();
+
+               for (Map.Entry<JobVertexID, ? extends AccessExecutionJobVertex> 
vertex : runtimeVertices.entrySet()) {
+                       compareExecutionJobVertex(vertex.getValue(), 
archivedVertices.get(vertex.getKey()));
+               }
+
+               Iterator<? extends AccessExecutionJobVertex> 
runtimeTopologicalVertices = runtimeGraph.getVerticesTopologically().iterator();
+               Iterator<? extends AccessExecutionJobVertex> 
archiveTopologicaldVertices = 
archivedGraph.getVerticesTopologically().iterator();
+
+               while (runtimeTopologicalVertices.hasNext()) {
+                       assertTrue(archiveTopologicaldVertices.hasNext());
+                       
compareExecutionJobVertex(runtimeTopologicalVertices.next(), 
archiveTopologicaldVertices.next());
+               }
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // OperatorCheckpointStats
+               // 
-------------------------------------------------------------------------------------------------------------
+               CheckpointStatsTracker runtimeTracker = 
runtimeGraph.getCheckpointStatsTracker();
+               CheckpointStatsTracker archivedTracker = 
archivedGraph.getCheckpointStatsTracker();
+               
compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v1ID).get(), 
archivedTracker.getOperatorStats(v1ID).get());
+               
compareOperatorCheckpointStats(runtimeTracker.getOperatorStats(v2ID).get(), 
archivedTracker.getOperatorStats(v2ID).get());
+
+               // 
-------------------------------------------------------------------------------------------------------------
+               // ExecutionVertices
+               // 
-------------------------------------------------------------------------------------------------------------
+               Iterator<? extends AccessExecutionVertex> 
runtimeExecutionVertices = runtimeGraph.getAllExecutionVertices().iterator();
+               Iterator<? extends AccessExecutionVertex> 
archivedExecutionVertices = archivedGraph.getAllExecutionVertices().iterator();
+
+               while (runtimeExecutionVertices.hasNext()) {
+                       assertTrue(archivedExecutionVertices.hasNext());
+                       compareExecutionVertex(runtimeExecutionVertices.next(), 
archivedExecutionVertices.next());
+               }
+       }
+
+       private static void compareExecutionJobVertex(AccessExecutionJobVertex 
runtimeJobVertex, AccessExecutionJobVertex archivedJobVertex) {
+               assertEquals(runtimeJobVertex.getName(), 
archivedJobVertex.getName());
+               assertEquals(runtimeJobVertex.getParallelism(), 
archivedJobVertex.getParallelism());
+               assertEquals(runtimeJobVertex.getMaxParallelism(), 
archivedJobVertex.getMaxParallelism());
+               assertEquals(runtimeJobVertex.getJobVertexId(), 
archivedJobVertex.getJobVertexId());
+               assertEquals(runtimeJobVertex.getAggregateState(), 
archivedJobVertex.getAggregateState());
+
+               
compareOperatorCheckpointStats(runtimeJobVertex.getCheckpointStats().get(), 
archivedJobVertex.getCheckpointStats().get());
+
+               
compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(),
 archivedJobVertex.getAggregatedUserAccumulatorsStringified());
+               
compareFlinkAccumulators(runtimeJobVertex.getAggregatedMetricAccumulators(), 
archivedJobVertex.getAggregatedMetricAccumulators());
+
+               AccessExecutionVertex[] runtimeExecutionVertices = 
runtimeJobVertex.getTaskVertices();
+               AccessExecutionVertex[] archivedExecutionVertices = 
archivedJobVertex.getTaskVertices();
+               assertEquals(runtimeExecutionVertices.length, 
archivedExecutionVertices.length);
+               for (int x = 0; x < runtimeExecutionVertices.length; x++) {
+                       compareExecutionVertex(runtimeExecutionVertices[x], 
archivedExecutionVertices[x]);
+               }
+       }
+
+       private static void compareExecutionVertex(AccessExecutionVertex 
runtimeVertex, AccessExecutionVertex archivedVertex) {
+               assertEquals(runtimeVertex.getTaskNameWithSubtaskIndex(), 
archivedVertex.getTaskNameWithSubtaskIndex());
+               assertEquals(runtimeVertex.getParallelSubtaskIndex(), 
archivedVertex.getParallelSubtaskIndex());
+               assertEquals(runtimeVertex.getExecutionState(), 
archivedVertex.getExecutionState());
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CREATED), 
archivedVertex.getStateTimestamp(ExecutionState.CREATED));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED), 
archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING), 
archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.RUNNING), 
archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FINISHED), 
archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELING), 
archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.CANCELED), 
archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
+               
assertEquals(runtimeVertex.getStateTimestamp(ExecutionState.FAILED), 
archivedVertex.getStateTimestamp(ExecutionState.FAILED));
+               assertEquals(runtimeVertex.getFailureCauseAsString(), 
archivedVertex.getFailureCauseAsString());
+               
assertEquals(runtimeVertex.getCurrentAssignedResourceLocation(), 
archivedVertex.getCurrentAssignedResourceLocation());
+
+               compareExecution(runtimeVertex.getCurrentExecutionAttempt(), 
archivedVertex.getCurrentExecutionAttempt());
+       }
+
+       private static void compareExecution(AccessExecution runtimeExecution, 
AccessExecution archivedExecution) {
+               assertEquals(runtimeExecution.getAttemptId(), 
archivedExecution.getAttemptId());
+               assertEquals(runtimeExecution.getAttemptNumber(), 
archivedExecution.getAttemptNumber());
+               assertArrayEquals(runtimeExecution.getStateTimestamps(), 
archivedExecution.getStateTimestamps());
+               assertEquals(runtimeExecution.getState(), 
archivedExecution.getState());
+               assertEquals(runtimeExecution.getAssignedResourceLocation(), 
archivedExecution.getAssignedResourceLocation());
+               assertEquals(runtimeExecution.getFailureCauseAsString(), 
archivedExecution.getFailureCauseAsString());
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CREATED), 
archivedExecution.getStateTimestamp(ExecutionState.CREATED));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED), 
archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING), 
archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.RUNNING), 
archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FINISHED), 
archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELING), 
archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), 
archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
+               
assertEquals(runtimeExecution.getStateTimestamp(ExecutionState.FAILED), 
archivedExecution.getStateTimestamp(ExecutionState.FAILED));
+               
compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(),
 archivedExecution.getUserAccumulatorsStringified());
+               
compareFlinkAccumulators(runtimeExecution.getFlinkAccumulators(), 
archivedExecution.getFlinkAccumulators());
+               assertEquals(runtimeExecution.getParallelSubtaskIndex(), 
archivedExecution.getParallelSubtaskIndex());
+       }
+
+       private static void 
compareStringifiedAccumulators(StringifiedAccumulatorResult[] runtimeAccs, 
StringifiedAccumulatorResult[] archivedAccs) {
+               assertEquals(runtimeAccs.length, archivedAccs.length);
+
+               for (int x = 0; x < runtimeAccs.length; x++) {
+                       StringifiedAccumulatorResult runtimeResult = 
runtimeAccs[x];
+                       StringifiedAccumulatorResult archivedResult = 
archivedAccs[x];
+
+                       assertEquals(runtimeResult.getName(), 
archivedResult.getName());
+                       assertEquals(runtimeResult.getType(), 
archivedResult.getType());
+                       assertEquals(runtimeResult.getValue(), 
archivedResult.getValue());
+               }
+       }
+
+       private static void compareSerializedAccumulators(Map<String, 
SerializedValue<Object>> runtimeAccs, Map<String, SerializedValue<Object>> 
archivedAccs) throws IOException, ClassNotFoundException {
+               assertEquals(runtimeAccs.size(), archivedAccs.size());
+               for (Map.Entry<String, SerializedValue<Object>> runtimeAcc : 
runtimeAccs.entrySet()) {
+                       long runtimeUserAcc = (long) 
runtimeAcc.getValue().deserializeValue(ClassLoader.getSystemClassLoader());
+                       long archivedUserAcc = (long) 
archivedAccs.get(runtimeAcc.getKey()).deserializeValue(ClassLoader.getSystemClassLoader());
+
+                       assertEquals(runtimeUserAcc, archivedUserAcc);
+               }
+       }
+
+       private static void 
compareFlinkAccumulators(Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> 
runtimeAccs, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> archivedAccs) {
+               assertEquals(runtimeAccs == null, archivedAccs == null);
+               if (runtimeAccs != null && archivedAccs != null) {
+                       assertEquals(runtimeAccs.size(), archivedAccs.size());
+                       for (Map.Entry<AccumulatorRegistry.Metric, 
Accumulator<?, ?>> runtimeAcc : runtimeAccs.entrySet()) {
+                               Accumulator<?, ?> archivedAcc = 
archivedAccs.get(runtimeAcc.getKey());
+
+                               
assertEquals(runtimeAcc.getValue().getLocalValue(), 
archivedAcc.getLocalValue());
+                       }
+               }
+       }
+
+       private static void 
compareOperatorCheckpointStats(OperatorCheckpointStats runtimeStats, 
OperatorCheckpointStats archivedStats) {
+               assertEquals(runtimeStats.getNumberOfSubTasks(), 
archivedStats.getNumberOfSubTasks());
+               assertEquals(runtimeStats.getCheckpointId(), 
archivedStats.getCheckpointId());
+               assertEquals(runtimeStats.getDuration(), 
archivedStats.getDuration());
+               assertEquals(runtimeStats.getStateSize(), 
archivedStats.getStateSize());
+               assertEquals(runtimeStats.getTriggerTimestamp(), 
archivedStats.getTriggerTimestamp());
+               assertEquals(runtimeStats.getSubTaskDuration(0), 
archivedStats.getSubTaskDuration(0));
+               assertEquals(runtimeStats.getSubTaskStateSize(0), 
archivedStats.getSubTaskStateSize(0));
+       }
+
+       private static void verifySerializability(ArchivedExecutionGraph graph) 
throws IOException, ClassNotFoundException {
+               ArchivedExecutionGraph copy = 
CommonTestUtils.createCopySerializable(graph);
+               compareExecutionGraph(graph, copy);
+       }
+
+
+       private static class TestCheckpointStatsTracker implements 
CheckpointStatsTracker {
+
+               @Override
+               public void onCompletedCheckpoint(CompletedCheckpoint 
checkpoint) {
+               }
+
+               @Override
+               public Option<JobCheckpointStats> getJobStats() {
+                       return Option.<JobCheckpointStats>apply(new 
TestJobCheckpointStats());
+               }
+
+               @Override
+               public Option<OperatorCheckpointStats> 
getOperatorStats(JobVertexID operatorId) {
+                       return Option.<OperatorCheckpointStats>apply(new 
TestOperatorCheckpointStats(operatorId.getUpperPart()));
+               }
+       }
+
+       private static class TestJobCheckpointStats implements 
JobCheckpointStats {
+               private static final long serialVersionUID = 
-2630234917947292836L;
+
+               @Override
+               public List<CheckpointStats> getRecentHistory() {
+                       return Collections.emptyList();
+               }
+
+               @Override
+               public long getCount() {
+                       return 1;
+               }
+
+               @Override
+               public long getMinDuration() {
+                       return 2;
+               }
+
+               @Override
+               public long getMaxDuration() {
+                       return 4;
+               }
+
+               @Override
+               public long getAverageDuration() {
+                       return 3;
+               }
+
+               @Override
+               public long getMinStateSize() {
+                       return 5;
+               }
+
+               @Override
+               public long getMaxStateSize() {
+                       return 7;
+               }
+
+               @Override
+               public long getAverageStateSize() {
+                       return 6;
+               }
+       }
+
+       private static class TestOperatorCheckpointStats extends 
OperatorCheckpointStats {
+               private static final long serialVersionUID = 
-2798640928349528644L;
+
+               public TestOperatorCheckpointStats(long offset) {
+                       super(1 + offset, 2 + offset, 3 + offset, 4 + offset, 
new long[][]{new long[]{5 + offset, 6 + offset}});
+               }
+       }
+
+       private static class TestJobParameters extends 
ExecutionConfig.GlobalJobParameters {
+               private static final long serialVersionUID = 
-8118611781035212808L;
+               private Map<String, String> parameters;
+
+               private TestJobParameters() {
+                       this.parameters = new HashMap<>();
+                       this.parameters.put("hello", "world");
+               }
+
+               @Override
+               public Map<String, String> toMap() {
+                       return parameters;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3c6ae9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -201,7 +201,7 @@ public class JobManagerTest {
                                                // Request the execution graph 
to get the runtime info
                                                jobManagerGateway.tell(new 
RequestExecutionGraph(jid), testActorGateway);
 
-                                               final ExecutionGraph eg = 
expectMsgClass(ExecutionGraphFound.class)
+                                               final ExecutionGraph eg = 
(ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
                                                        .executionGraph();
 
                                                final ExecutionVertex vertex = 
eg.getJobVertex(sender.getID())

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 450f9fb..8a9a4ce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -111,7 +111,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger 
{
 
                assertTrue(responseExecutionGraph instanceof 
TestingJobManagerMessages.ExecutionGraphFound);
 
-               ExecutionGraph executionGraph = 
((TestingJobManagerMessages.ExecutionGraphFound) 
responseExecutionGraph).executionGraph();
+               ExecutionGraph executionGraph = (ExecutionGraph) 
((TestingJobManagerMessages.ExecutionGraphFound) 
responseExecutionGraph).executionGraph();
 
                TestJobStatusListener testListener = new 
TestJobStatusListener();
                executionGraph.registerJobStatusListener(testListener);

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index d07c48f..72cf58b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, 
ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
 
@@ -37,7 +37,7 @@ object TestingJobManagerMessages {
     def jobID: JobID
   }
 
-  case class ExecutionGraphFound(jobID: JobID, executionGraph: ExecutionGraph) 
extends
+  case class ExecutionGraphFound(jobID: JobID, executionGraph: 
AccessExecutionGraph) extends
   ResponseExecutionGraph
 
   case class ExecutionGraphNotFound(jobID: JobID) extends 
ResponseExecutionGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 1259460..e0e6ecd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -40,7 +40,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -396,7 +395,7 @@ public class QueryableStateITCase extends TestLogger {
                                        .map(new 
Mapper<TestingJobManagerMessages.ExecutionGraphFound, ExecutionGraph>() {
                                                @Override
                                                public ExecutionGraph 
apply(ExecutionGraphFound found) {
-                                                       return 
found.executionGraph();
+                                                       return (ExecutionGraph) 
found.executionGraph();
                                                }
                                        }, TEST_ACTOR_SYSTEM.dispatcher());
                        ExecutionGraph eg = Await.result(egFuture, 
deadline.timeLeft());
@@ -553,12 +552,13 @@ public class QueryableStateITCase extends TestLogger {
                                                        
.mapTo(ClassTag$.MODULE$.<JobFound>apply(JobFound.class)),
                                        deadline.timeLeft());
 
-                       Throwable failureCause = 
jobFound.executionGraph().getFailureCause();
+                       String failureCause = 
jobFound.executionGraph().getFailureCauseAsString();
 
-                       assertTrue("Not instance of SuppressRestartsException", 
failureCause instanceof SuppressRestartsException);
-                       assertTrue("Not caused by IllegalStateException", 
failureCause.getCause() instanceof IllegalStateException);
-                       Throwable duplicateException = failureCause.getCause();
-                       assertTrue("Exception does not contain registration 
name", duplicateException.getMessage().contains(queryName));
+                       assertTrue("Not instance of SuppressRestartsException", 
failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
+                       int causedByIndex = failureCause.indexOf("Caused by: ");
+                       String subFailureCause = 
failureCause.substring(causedByIndex + "Caused by: ".length());
+                       assertTrue("Not caused by IllegalStateException", 
subFailureCause.startsWith("java.lang.IllegalStateException"));
+                       assertTrue("Exception does not contain registration 
name", subFailureCause.contains(queryName));
                } finally {
                        // Free cluster resources
                        if (jobId != null) {

Reply via email to