[FLINK-8890] Compare checkpoints with order in 
CompletedCheckpoint.checkpointsMatch()

This method is used, among other things, to check if a list of restored
checkpoints is stable after several restore attempts in the ZooKeeper
checkpoint store. The order of checkpoints is somewhat important because
we want the latest checkpoint to stay the latest checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1432092f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1432092f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1432092f

Branch: refs/heads/release-1.3.3-rc1
Commit: 1432092f29c548c55af562ff7b4a7973fedd2e22
Parents: df37d7a
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 7 11:58:07 2018 +0100
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Mon Mar 12 18:40:37 2018 +0800

----------------------------------------------------------------------
 .../runtime/checkpoint/CompletedCheckpoint.java |  12 +-
 .../checkpoint/CompletedCheckpointTest.java     | 134 +++++++++++++++++++
 2 files changed, 140 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1432092f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 5e7a76a..ee14e80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -35,9 +35,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -284,17 +283,18 @@ public class CompletedCheckpoint implements Serializable {
        public static boolean checkpointsMatch(
                Collection<CompletedCheckpoint> first,
                Collection<CompletedCheckpoint> second) {
+               if (first.size() != second.size()) {
+                       return false;
+               }
 
-               Set<Tuple2<Long, JobID>> firstInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> firstInterestingFields = new 
ArrayList<>(first.size());
 
                for (CompletedCheckpoint checkpoint : first) {
                        firstInterestingFields.add(
                                new Tuple2<>(checkpoint.getCheckpointID(), 
checkpoint.getJobId()));
                }
 
-               Set<Tuple2<Long, JobID>> secondInterestingFields =
-                       new HashSet<>();
+               List<Tuple2<Long, JobID>> secondInterestingFields = new 
ArrayList<>(second.size());
 
                for (CompletedCheckpoint checkpoint : second) {
                        secondInterestingFields.add(

http://git-wip-us.apache.org/repos/asf/flink/blob/1432092f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 4846879..7fcef46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -26,17 +26,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -46,6 +51,135 @@ public class CompletedCheckpointTest {
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
+       @Test
+       public void testCompareCheckpointsWithDifferentOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       @Test
+       public void testCompareCheckpointsWithSameOrder() {
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       new JobID(), 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       new JobID(), 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+               checkpoints1.add(checkpoint2);
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint1);
+               checkpoints2.add(checkpoint2);
+               checkpoints2.add(checkpoint1);
+
+               assertTrue(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameJobID() {
+               JobID jobID = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID, 1, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
+       /**
+        * Verify that both JobID and checkpoint id are taken into account when 
comparing.
+        */
+       @Test
+       public void testCompareCheckpointsWithSameCheckpointId() {
+               JobID jobID1 = new JobID();
+               JobID jobID2 = new JobID();
+
+               CompletedCheckpoint checkpoint1 = new CompletedCheckpoint(
+                       jobID1, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               CompletedCheckpoint checkpoint2 = new CompletedCheckpoint(
+                       jobID2, 0, 0, 1,
+                       new HashMap<OperatorID, OperatorState>(),
+                       null,
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
+
+               List<CompletedCheckpoint> checkpoints1= new ArrayList<>();
+               checkpoints1.add(checkpoint1);
+
+               List<CompletedCheckpoint> checkpoints2 = new ArrayList<>();
+               checkpoints2.add(checkpoint2);
+
+               assertFalse(CompletedCheckpoint.checkpointsMatch(checkpoints1, 
checkpoints2));
+       }
+
        /**
         * Tests that persistent checkpoints discard their header file.
         */

Reply via email to