Repository: flink
Updated Branches:
  refs/heads/master 8fa313c39 -> 70e71c161


[FLINK-4684] [checkpoints] Remove redundant class loader from 
CheckpointCoordinator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70e71c16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70e71c16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70e71c16

Branch: refs/heads/master
Commit: 70e71c16177b40c2418e6a8ca0838bf117f6a926
Parents: 8fa313c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Sep 26 12:32:10 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 26 18:05:01 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   5 -
 .../flink/runtime/checkpoint/TaskState.java     |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   1 -
 .../apache/flink/runtime/state/StateUtil.java   |   2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 101 +++++++------------
 .../checkpoint/CheckpointStateRestoreTest.java  |   7 +-
 6 files changed, 38 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fc40911..6a43ddf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -112,9 +112,6 @@ public class CheckpointCoordinator {
         * need to be ascending across job managers. */
        private final CheckpointIDCounter checkpointIdCounter;
 
-       /** Class loader used to deserialize the state handles (as they may be 
user-defined) */
-       private final ClassLoader userClassLoader;
-
        /** The base checkpoint interval. Actual trigger time may be affected 
by the
         * max concurrent checkpoints and minimum-pause values */
        private final long baseInterval;
@@ -167,7 +164,6 @@ public class CheckpointCoordinator {
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
                        ExecutionVertex[] tasksToCommitTo,
-                       ClassLoader userClassLoader,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        SavepointStore savepointStore,
@@ -198,7 +194,6 @@ public class CheckpointCoordinator {
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.savepointStore = checkNotNull(savepointStore);
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
-               this.userClassLoader = checkNotNull(userClassLoader);
                this.statsTracker = checkNotNull(statsTracker);
 
                this.timer = new Timer("Checkpoint Timer", true);

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 657dd60..f5e3618 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -62,7 +62,7 @@ public class TaskState implements StateObject {
                                "Parallelism " + parallelism + " is not smaller 
or equal to max parallelism " + maxParallelism + ".");
 
                this.jobVertexID = jobVertexID;
-               //preallocate lists of the required size, so that we can 
randomly set values to indexes
+
                this.subtaskStates = new HashMap<>(parallelism);
                this.keyGroupsStateHandles = new HashMap<>(parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index c3cf297..7c3fa0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -390,7 +390,6 @@ public class ExecutionGraph {
                                tasksToTrigger,
                                tasksToWaitFor,
                                tasksToCommitTo,
-                               userClassLoader,
                                checkpointIDCounter,
                                checkpointStore,
                                savepointStore,

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index 3c5157e..aa28404 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -69,7 +69,7 @@ public class StateUtil {
         * occurring exceptions are suppressed and collected until the 
iteration is over and emitted as a single exception.
         *
         * @param handlesToDiscard State handles to discard. Passed iterable is 
allowed to deliver null values.
-        * @throws Exception exception that is a collection of all suppressed 
exceptions that were caught during iteration
+        * @throws IOException exception that is a collection of all suppressed 
exceptions that were caught during iteration
         */
        public static void bestEffortCloseAllStateObjects(
                        Iterable<? extends StateObject> handlesToDiscard) 
throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index bc61742..9adaa86 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -42,10 +42,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.junit.Assert;
 import org.junit.Test;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
@@ -109,7 +112,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -162,7 +164,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -213,7 +214,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -265,7 +265,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -390,7 +389,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -511,7 +509,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -662,7 +659,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
@@ -798,7 +794,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(10, cl),
                                        new HeapSavepointStore(),
@@ -920,7 +915,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
@@ -989,7 +983,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
@@ -1069,7 +1062,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
                                        new ExecutionVertex[] { commitVertex },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
@@ -1161,7 +1153,6 @@ public class CheckpointCoordinatorTest {
                                        new ExecutionVertex[] { vertex1 },
                                        new ExecutionVertex[] { vertex1 },
                                        new ExecutionVertex[] { vertex1 },
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
@@ -1246,7 +1237,6 @@ public class CheckpointCoordinatorTest {
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
-                               cl,
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1, cl),
                                new HeapSavepointStore(),
@@ -1384,7 +1374,6 @@ public class CheckpointCoordinatorTest {
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
-                               cl,
                                counter,
                                new StandaloneCompletedCheckpointStore(10, cl),
                                new HeapSavepointStore(),
@@ -1470,8 +1459,9 @@ public class CheckpointCoordinatorTest {
                                        maxConcurrentAttempts,
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
-                                       (), new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new ExecutionVertex[] { commitVertex }, 
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
                                        new DisabledCheckpointStatsTracker());
 
@@ -1541,8 +1531,9 @@ public class CheckpointCoordinatorTest {
                                        maxConcurrentAttempts, // max two 
concurrent checkpoints
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter
-                                       (), new 
StandaloneCompletedCheckpointStore(2, cl),
+                                       new ExecutionVertex[] { commitVertex }, 
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
                                        new DisabledCheckpointStatsTracker());
 
@@ -1621,7 +1612,8 @@ public class CheckpointCoordinatorTest {
                                        2, // max two concurrent checkpoints
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
-                                       new ExecutionVertex[] { commitVertex }, 
cl, new StandaloneCheckpointIDCounter(),
+                                       new ExecutionVertex[] { commitVertex },
+                                       new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(2, cl),
                                        new HeapSavepointStore(),
                                        new DisabledCheckpointStatsTracker());
@@ -1672,7 +1664,6 @@ public class CheckpointCoordinatorTest {
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
-                               cl,
                                checkpointIDCounter,
                                new StandaloneCompletedCheckpointStore(2, cl),
                                new HeapSavepointStore(),
@@ -1723,7 +1714,6 @@ public class CheckpointCoordinatorTest {
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
-                               cl,
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2, cl),
                                new HeapSavepointStore(),
@@ -1772,7 +1762,8 @@ public class CheckpointCoordinatorTest {
                
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-               ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[0]);
+               ExecutionVertex[] arrayExecutionVertices = 
+                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1784,7 +1775,6 @@ public class CheckpointCoordinatorTest {
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
-                       cl,
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1, cl),
                        new HeapSavepointStore(),
@@ -1796,8 +1786,8 @@ public class CheckpointCoordinatorTest {
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-               List<KeyGroupRange> keyGroupPartitions1 = 
coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-               List<KeyGroupRange> keyGroupPartitions2 = 
coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+               List<KeyGroupRange> keyGroupPartitions1 = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+               List<KeyGroupRange> keyGroupPartitions2 = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
                        ChainedStateHandle<StreamStateHandle> 
nonPartitionedState = generateStateForVertex(jobVertexID1, index);
@@ -1876,7 +1866,7 @@ public class CheckpointCoordinatorTest {
                
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-               ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[0]);
+               ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -1888,7 +1878,6 @@ public class CheckpointCoordinatorTest {
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
-                       cl,
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1, cl),
                        new HeapSavepointStore(),
@@ -1900,8 +1889,8 @@ public class CheckpointCoordinatorTest {
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-               List<KeyGroupRange> keyGroupPartitions1 = 
coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-               List<KeyGroupRange> keyGroupPartitions2 = 
coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+               List<KeyGroupRange> keyGroupPartitions1 = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+               List<KeyGroupRange> keyGroupPartitions2 = 
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
                        ChainedStateHandle<StreamStateHandle> valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
@@ -1991,7 +1980,8 @@ public class CheckpointCoordinatorTest {
                
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-               ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[0]);
+               ExecutionVertex[] arrayExecutionVertices = 
+                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2003,7 +1993,6 @@ public class CheckpointCoordinatorTest {
                        arrayExecutionVertices,
                        arrayExecutionVertices,
                        arrayExecutionVertices,
-                       cl,
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1, cl),
                        new HeapSavepointStore(),
@@ -2015,8 +2004,10 @@ public class CheckpointCoordinatorTest {
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-               List<KeyGroupRange> keyGroupPartitions1 = 
coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-               List<KeyGroupRange> keyGroupPartitions2 = 
coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+               List<KeyGroupRange> keyGroupPartitions1 = 
+                               
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+               List<KeyGroupRange> keyGroupPartitions2 = 
+                               
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
                        ChainedStateHandle<StreamStateHandle> valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
@@ -2110,7 +2101,8 @@ public class CheckpointCoordinatorTest {
                
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
-               ExecutionVertex[] arrayExecutionVertices = 
allExecutionVertices.toArray(new ExecutionVertex[0]);
+               ExecutionVertex[] arrayExecutionVertices = 
+                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2122,7 +2114,6 @@ public class CheckpointCoordinatorTest {
                                arrayExecutionVertices,
                                arrayExecutionVertices,
                                arrayExecutionVertices,
-                               cl,
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1, cl),
                                new HeapSavepointStore(),
@@ -2134,8 +2125,10 @@ public class CheckpointCoordinatorTest {
                assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
                long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
 
-               List<KeyGroupRange> keyGroupPartitions1 = 
coord.createKeyGroupPartitions(maxParallelism1, parallelism1);
-               List<KeyGroupRange> keyGroupPartitions2 = 
coord.createKeyGroupPartitions(maxParallelism2, parallelism2);
+               List<KeyGroupRange> keyGroupPartitions1 = 
+                               
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism1, parallelism1);
+               List<KeyGroupRange> keyGroupPartitions2 = 
+                               
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
                        ChainedStateHandle<StreamStateHandle> valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
@@ -2173,7 +2166,8 @@ public class CheckpointCoordinatorTest {
 
                int newParallelism2 = 13;
 
-               List<KeyGroupRange> newKeyGroupPartitions2 = 
coord.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+               List<KeyGroupRange> newKeyGroupPartitions2 = 
+                               
CheckpointCoordinator.createKeyGroupPartitions(maxParallelism2, 
newParallelism2);
 
                final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
                                jobVertexID1,
@@ -2207,37 +2201,12 @@ public class CheckpointCoordinatorTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       static void sendAckMessageToCoordinator(
-                       CheckpointCoordinator coord,
-                       long checkpointId, JobID jid,
-                       ExecutionJobVertex jobVertex,
-                       JobVertexID jobVertexID,
-                       List<KeyGroupRange> keyGroupPartitions) throws 
Exception {
-
-               for (int index = 0; index < jobVertex.getParallelism(); 
index++) {
-                       ChainedStateHandle<StreamStateHandle> state = 
generateStateForVertex(jobVertexID, index);
-                       List<KeyGroupsStateHandle> keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID,
-                                       keyGroupPartitions.get(index));
-
-                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       state,
-                                       keyGroupState);
-
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
-               }
-       }
-
        public static List<KeyGroupsStateHandle> generateKeyGroupState(
                        JobVertexID jobVertexID,
                        KeyGroupRange keyGroupPartition) throws IOException {
 
-               KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(keyGroupPartition);
                List<Integer> testStatesLists = new 
ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
-               int runningGroupsOffset = 0;
+
                // generate state for one keygroup
                for (int keyGroupIndex : keyGroupPartition) {
                        Random random = new Random(jobVertexID.hashCode() + 
keyGroupIndex);
@@ -2270,8 +2239,7 @@ public class CheckpointCoordinatorTest {
                //write all generated values in a single byte array, which is 
index by groupOffsetsInFinalByteArray
                byte[] allSerializedValuesConcatenated = new 
byte[runningGroupsOffset];
                runningGroupsOffset = 0;
-               byte[] old = null;
-               for(byte[] serializedGroupValue : serializedGroupValues) {
+               for (byte[] serializedGroupValue : serializedGroupValues) {
                        System.arraycopy(
                                        serializedGroupValue,
                                        0,
@@ -2279,7 +2247,6 @@ public class CheckpointCoordinatorTest {
                                        runningGroupsOffset,
                                        serializedGroupValue.length);
                        runningGroupsOffset += serializedGroupValue.length;
-                       old = serializedGroupValue;
                }
 
                ByteStreamStateHandle allSerializedStatesHandle = new 
ByteStreamStateHandle(

http://git-wip-us.apache.org/repos/asf/flink/blob/70e71c16/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 5416292..a4896aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -32,12 +32,11 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-
 import org.apache.flink.runtime.util.SerializableObject;
+
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -101,7 +100,6 @@ public class CheckpointStateRestoreTest {
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0],
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -182,7 +180,6 @@ public class CheckpointStateRestoreTest {
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0],
-                                       cl,
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),
@@ -231,7 +228,7 @@ public class CheckpointStateRestoreTest {
                                        Integer.MAX_VALUE,
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
-                                       new ExecutionVertex[0], cl,
+                                       new ExecutionVertex[0],
                                        new StandaloneCheckpointIDCounter(),
                                        new 
StandaloneCompletedCheckpointStore(1, cl),
                                        new HeapSavepointStore(),

Reply via email to