Repository: flink
Updated Branches:
  refs/heads/master 30bb958a7 -> ab014ef94


http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
new file mode 100644
index 0000000..08896da
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+public class CopyOnWriteStateTableTest {
+
+       /**
+        * Testing the basic map operations.
+        */
+       @Test
+       public void testPutGetRemoveContainsTransform() throws Exception {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               ArrayList<Integer> state_1_1 = new ArrayList<>();
+               state_1_1.add(41);
+               ArrayList<Integer> state_2_1 = new ArrayList<>();
+               state_2_1.add(42);
+               ArrayList<Integer> state_1_2 = new ArrayList<>();
+               state_1_2.add(43);
+
+               Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1));
+               Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+               Assert.assertEquals(1, stateTable.size());
+
+               Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1));
+               Assert.assertEquals(state_2_1, stateTable.get(2, 1));
+               Assert.assertEquals(2, stateTable.size());
+
+               Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2));
+               Assert.assertEquals(state_1_2, stateTable.get(1, 2));
+               Assert.assertEquals(3, stateTable.size());
+
+               Assert.assertTrue(stateTable.containsKey(2, 1));
+               Assert.assertFalse(stateTable.containsKey(3, 1));
+               Assert.assertFalse(stateTable.containsKey(2, 3));
+               stateTable.put(2, 1, null);
+               Assert.assertTrue(stateTable.containsKey(2, 1));
+               Assert.assertEquals(3, stateTable.size());
+               Assert.assertNull(stateTable.get(2, 1));
+               stateTable.put(2, 1, state_2_1);
+               Assert.assertEquals(3, stateTable.size());
+
+               Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 
1));
+               Assert.assertFalse(stateTable.containsKey(2, 1));
+               Assert.assertEquals(2, stateTable.size());
+
+               stateTable.remove(1, 2);
+               Assert.assertFalse(stateTable.containsKey(1, 2));
+               Assert.assertEquals(1, stateTable.size());
+
+               Assert.assertNull(stateTable.removeAndGetOld(4, 2));
+               Assert.assertEquals(1, stateTable.size());
+
+               StateTransformationFunction<ArrayList<Integer>, Integer> 
function =
+                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
+                                       @Override
+                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                               previousState.add(value);
+                                               return previousState;
+                                       }
+                               };
+
+               final int value = 4711;
+               stateTable.transform(1, 1, value, function);
+               state_1_1 = function.apply(state_1_1, value);
+               Assert.assertEquals(state_1_1, stateTable.get(1, 1));
+       }
+
+       /**
+        * This test triggers incremental rehash and tests for corruptions.
+        */
+       @Test
+       public void testIncrementalRehash() {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               int insert = 0;
+               int remove = 0;
+               while (!stateTable.isRehashing()) {
+                       stateTable.put(insert++, 0, new ArrayList<Integer>());
+                       if (insert % 8 == 0) {
+                               stateTable.remove(remove++, 0);
+                       }
+               }
+               Assert.assertEquals(insert - remove, stateTable.size());
+               while (stateTable.isRehashing()) {
+                       stateTable.put(insert++, 0, new ArrayList<Integer>());
+                       if (insert % 8 == 0) {
+                               stateTable.remove(remove++, 0);
+                       }
+               }
+               Assert.assertEquals(insert - remove, stateTable.size());
+
+               for (int i = 0; i < insert; ++i) {
+                       if (i < remove) {
+                               Assert.assertFalse(stateTable.containsKey(i, 
0));
+                       } else {
+                               Assert.assertTrue(stateTable.containsKey(i, 0));
+                       }
+               }
+       }
+
+       /**
+        * This test does some random modifications to a state table and a 
reference (hash map). Then draws snapshots,
+        * performs more modifications and checks snapshot integrity.
+        */
+       @Test
+       public void testRandomModificationsAndCopyOnWriteIsolation() throws 
Exception {
+
+               final RegisteredBackendStateMetaInfo<Integer, 
ArrayList<Integer>> metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> 
referenceMap = new HashMap<>();
+
+               final Random random = new Random(42);
+
+               // holds snapshots from the map under test
+               CopyOnWriteStateTable.StateTableEntry<Integer, Integer, 
ArrayList<Integer>>[] snapshot = null;
+               int snapshotSize = 0;
+
+               // holds a reference snapshot from our reference map that we 
compare against
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null;
+
+               int val = 0;
+
+
+               int snapshotCounter = 0;
+               int referencedSnapshotId = 0;
+
+               final StateTransformationFunction<ArrayList<Integer>, Integer> 
transformationFunction =
+                               new 
StateTransformationFunction<ArrayList<Integer>, Integer>() {
+                                       @Override
+                                       public ArrayList<Integer> 
apply(ArrayList<Integer> previousState, Integer value) throws Exception {
+                                               if (previousState == null) {
+                                                       previousState = new 
ArrayList<>();
+                                               }
+                                               previousState.add(value);
+                                               // we give back the original, 
attempting to spot errors in to copy-on-write
+                                               return previousState;
+                                       }
+                               };
+
+               // the main loop for modifications
+               for (int i = 0; i < 10_000_000; ++i) {
+
+                       int key = random.nextInt(20);
+                       int namespace = random.nextInt(4);
+                       Tuple2<Integer, Integer> compositeKey = new 
Tuple2<>(key, namespace);
+
+                       int op = random.nextInt(7);
+
+                       ArrayList<Integer> state = null;
+                       ArrayList<Integer> referenceState = null;
+
+                       switch (op) {
+                               case 0:
+                               case 1: {
+                                       state = stateTable.get(key, namespace);
+                                       referenceState = 
referenceMap.get(compositeKey);
+                                       if (null == state) {
+                                               state = new ArrayList<>();
+                                               stateTable.put(key, namespace, 
state);
+                                               referenceState = new 
ArrayList<>();
+                                               referenceMap.put(compositeKey, 
referenceState);
+                                       }
+                                       break;
+                               }
+                               case 2: {
+                                       stateTable.put(key, namespace, new 
ArrayList<Integer>());
+                                       referenceMap.put(compositeKey, new 
ArrayList<Integer>());
+                                       break;
+                               }
+                               case 3: {
+                                       state = stateTable.putAndGetOld(key, 
namespace, new ArrayList<Integer>());
+                                       referenceState = 
referenceMap.put(compositeKey, new ArrayList<Integer>());
+                                       break;
+                               }
+                               case 4: {
+                                       stateTable.remove(key, namespace);
+                                       referenceMap.remove(compositeKey);
+                                       break;
+                               }
+                               case 5: {
+                                       state = stateTable.removeAndGetOld(key, 
namespace);
+                                       referenceState = 
referenceMap.remove(compositeKey);
+                                       break;
+                               }
+                               case 6: {
+                                       final int updateValue = 
random.nextInt(1000);
+                                       stateTable.transform(key, namespace, 
updateValue, transformationFunction);
+                                       referenceMap.put(compositeKey, 
transformationFunction.apply(
+                                                       
referenceMap.remove(compositeKey), updateValue));
+                                       break;
+                               }
+                               default: {
+                                       Assert.fail("Unknown op-code " + op);
+                               }
+                       }
+
+                       Assert.assertEquals(referenceMap.size(), 
stateTable.size());
+
+                       if (state != null) {
+                               // mutate the states a bit...
+                               if (random.nextBoolean() && !state.isEmpty()) {
+                                       state.remove(state.size() - 1);
+                                       
referenceState.remove(referenceState.size() - 1);
+                               } else {
+                                       state.add(val);
+                                       referenceState.add(val);
+                                       ++val;
+                               }
+                       }
+
+                       Assert.assertEquals(referenceState, state);
+
+                       // snapshot triggering / comparison / release
+                       if (i > 0 && i % 500 == 0) {
+
+                               if (snapshot != null) {
+                                       // check our referenced snapshot
+                                       deepCheck(reference, convert(snapshot, 
snapshotSize));
+
+                                       if (i % 1_000 == 0) {
+                                               // draw and release some other 
snapshot while holding on the old snapshot
+                                               ++snapshotCounter;
+                                               
stateTable.snapshotTableArrays();
+                                               
stateTable.releaseSnapshot(snapshotCounter);
+                                       }
+
+                                       //release the snapshot after some time
+                                       if (i % 5_000 == 0) {
+                                               snapshot = null;
+                                               reference = null;
+                                               snapshotSize = 0;
+                                               
stateTable.releaseSnapshot(referencedSnapshotId);
+                                       }
+
+                               } else {
+                                       // if there is no more referenced 
snapshot, we create one
+                                       ++snapshotCounter;
+                                       referencedSnapshotId = snapshotCounter;
+                                       snapshot = 
stateTable.snapshotTableArrays();
+                                       snapshotSize = stateTable.size();
+                                       reference = 
manualDeepDump(referenceMap);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * This tests for the copy-on-write contracts, e.g. ensures that no 
copy-on-write is active after all snapshots are
+        * released.
+        */
+       @Test
+       public void testCopyOnWriteContracts() {
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects.
+
+               final MockInternalKeyContext<Integer> keyContext = new 
MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               final CopyOnWriteStateTable<Integer, Integer, 
ArrayList<Integer>> stateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               ArrayList<Integer> originalState1 = new ArrayList<>(1);
+               ArrayList<Integer> originalState2 = new ArrayList<>(1);
+               ArrayList<Integer> originalState3 = new ArrayList<>(1);
+               ArrayList<Integer> originalState4 = new ArrayList<>(1);
+               ArrayList<Integer> originalState5 = new ArrayList<>(1);
+
+               originalState1.add(1);
+               originalState2.add(2);
+               originalState3.add(3);
+               originalState4.add(4);
+               originalState5.add(5);
+
+               stateTable.put(1, 1, originalState1);
+               stateTable.put(2, 1, originalState2);
+               stateTable.put(4, 1, originalState4);
+               stateTable.put(5, 1, originalState5);
+
+               // no snapshot taken, we get the original back
+               Assert.assertTrue(stateTable.get(1, 1) == originalState1);
+               CopyOnWriteStateTableSnapshot<Integer, Integer, 
ArrayList<Integer>> snapshot1 = stateTable.createSnapshot();
+               // after snapshot1 is taken, we get a copy...
+               final ArrayList<Integer> copyState = stateTable.get(1, 1);
+               Assert.assertFalse(copyState == originalState1);
+               // ...and the copy is equal
+               Assert.assertEquals(originalState1, copyState);
+
+               // we make an insert AFTER snapshot1
+               stateTable.put(3, 1, originalState3);
+
+               // on repeated lookups, we get the same copy because no further 
snapshot was taken
+               Assert.assertTrue(copyState == stateTable.get(1, 1));
+
+               // we take snapshot2
+               CopyOnWriteStateTableSnapshot<Integer, Integer, 
ArrayList<Integer>> snapshot2 = stateTable.createSnapshot();
+               // after the second snapshot, copy-on-write is active again for 
old entries
+               Assert.assertFalse(copyState == stateTable.get(1, 1));
+               // and equality still holds
+               Assert.assertEquals(copyState, stateTable.get(1, 1));
+
+               // after releasing snapshot2
+               stateTable.releaseSnapshot(snapshot2);
+               // we still get the original of the untouched late insert 
(after snapshot1)
+               Assert.assertTrue(originalState3 == stateTable.get(3, 1));
+               // but copy-on-write is still active for older inserts (before 
snapshot1)
+               Assert.assertFalse(originalState4 == stateTable.get(4, 1));
+
+               // after releasing snapshot1
+               stateTable.releaseSnapshot(snapshot1);
+               // no copy-on-write is active
+               Assert.assertTrue(originalState5 == stateTable.get(5, 1));
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <K, N, S> Tuple3<K, N, S>[] 
convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) 
{
+
+               Tuple3<K, N, S>[] result = new Tuple3[mapSize];
+               int pos = 0;
+               for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : 
snapshot) {
+                       while (null != entry) {
+                               result[pos++] = new Tuple3<>(entry.getKey(), 
entry.getNamespace(), entry.getState());
+                               entry = entry.next;
+                       }
+               }
+               Assert.assertEquals(mapSize, pos);
+               return result;
+       }
+
+       @SuppressWarnings("unchecked")
+       private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump(
+                       HashMap<Tuple2<Integer, Integer>,
+                                       ArrayList<Integer>> map) {
+
+               Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new 
Tuple3[map.size()];
+               int pos = 0;
+               for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> 
entry : map.entrySet()) {
+                       Integer key = entry.getKey().f0;
+                       Integer namespace = entry.getKey().f1;
+                       result[pos++] = new Tuple3<>(key, namespace, new 
ArrayList<>(entry.getValue()));
+               }
+               return result;
+       }
+
+       private void deepCheck(
+                       Tuple3<Integer, Integer, ArrayList<Integer>>[] a,
+                       Tuple3<Integer, Integer, ArrayList<Integer>>[] b) {
+
+               if (a == b) {
+                       return;
+               }
+
+               Assert.assertEquals(a.length, b.length);
+
+               Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> 
comparator =
+                               new Comparator<Tuple3<Integer, Integer, 
ArrayList<Integer>>>() {
+
+                                       @Override
+                                       public int compare(Tuple3<Integer, 
Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> 
o2) {
+                                               int namespaceDiff = o1.f1 - 
o2.f1;
+                                               return namespaceDiff != 0 ? 
namespaceDiff : o1.f0 - o2.f0;
+                                       }
+                               };
+
+               Arrays.sort(a, comparator);
+               Arrays.sort(b, comparator);
+
+               for (int i = 0; i < a.length; ++i) {
+                       Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i];
+                       Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i];
+
+                       Assert.assertEquals(av.f0, bv.f0);
+                       Assert.assertEquals(av.f1, bv.f1);
+                       Assert.assertEquals(av.f2, bv.f2);
+               }
+       }
+
+       static class MockInternalKeyContext<T> implements InternalKeyContext<T> 
{
+
+               private T key;
+               private final TypeSerializer<T> serializer;
+               private final KeyGroupRange keyGroupRange;
+
+               public MockInternalKeyContext(TypeSerializer<T> serializer) {
+                       this.serializer = serializer;
+                       this.keyGroupRange = new KeyGroupRange(0, 0);
+               }
+
+               public void setKey(T key) {
+                       this.key = key;
+               }
+
+               @Override
+               public T getCurrentKey() {
+                       return key;
+               }
+
+               @Override
+               public int getCurrentKeyGroupIndex() {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberOfKeyGroups() {
+                       return 1;
+               }
+
+               @Override
+               public KeyGroupRange getKeyGroupRange() {
+                       return keyGroupRange;
+               }
+
+               @Override
+               public TypeSerializer<T> getKeySerializer() {
+                       return serializer;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index 735b5f5..cb4e403 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -23,25 +23,20 @@ import 
org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-
 import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link 
AggregatingState}.
  */
-public class HeapAggregatingStateTest {
+public class HeapAggregatingStateTest extends HeapStateBackendTestBase {
 
        @Test
        public void testAddAndGet() throws Exception {
@@ -227,20 +222,6 @@ public class HeapAggregatingStateTest {
        }
 
        // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private static HeapKeyedStateBackend<String> createKeyedBackend() 
throws Exception {
-               return new HeapKeyedStateBackend<>(
-                               mock(TaskKvStateRegistry.class),
-                               StringSerializer.INSTANCE,
-                               HeapAggregatingStateTest.class.getClassLoader(),
-                               16,
-                               new KeyGroupRange(0, 15),
-                               new ExecutionConfig());
-       }
-
-       // 
------------------------------------------------------------------------
        //  test functions
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
new file mode 100644
index 0000000..da0666a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.net.URL;
+import java.util.Collections;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests backwards compatibility in the serialization format of heap-based 
KeyedStateBackends.
+ */
+public class HeapKeyedStateBackendSnapshotMigrationTest extends 
HeapStateBackendTestBase {
+
+       /**
+        * [FLINK-5979]
+        *
+        * This test takes a snapshot that was created with Flink 1.2 and tries 
to restore it in master to check
+        * the backwards compatibility of the serialization format of {@link 
StateTable}s.
+        */
+       @Test
+       public void testRestore1_2ToMaster() throws Exception {
+
+               ClassLoader cl = getClass().getClassLoader();
+               URL resource = 
cl.getResource("heap_keyed_statebackend_1_2.snapshot");
+
+               Preconditions.checkNotNull(resource, "Binary snapshot resource 
not found!");
+
+               final Integer namespace1 = 1;
+               final Integer namespace2 = 2;
+               final Integer namespace3 = 3;
+
+               try (final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend()) {
+                       final KeyGroupsStateHandle stateHandle;
+                       try (BufferedInputStream bis = new 
BufferedInputStream((new FileInputStream(resource.getFile())))) {
+                               stateHandle = 
InstantiationUtil.deserializeObject(bis, 
Thread.currentThread().getContextClassLoader());
+                       }
+                       
keyedBackend.restore(Collections.singleton(stateHandle));
+                       final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+                       stateDescr.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+                       InternalListState<Integer, Long> state = 
keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+
+                       assertEquals(7, keyedBackend.numStateEntries());
+
+                       keyedBackend.setCurrentKey("abc");
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(asList(33L, 55L), state.get());
+                       state.setCurrentNamespace(namespace2);
+                       assertEquals(asList(22L, 11L), state.get());
+                       state.setCurrentNamespace(namespace3);
+                       assertEquals(Collections.singletonList(44L), 
state.get());
+
+                       keyedBackend.setCurrentKey("def");
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(asList(11L, 44L), state.get());
+
+                       state.setCurrentNamespace(namespace3);
+                       assertEquals(asList(22L, 55L, 33L), state.get());
+
+                       keyedBackend.setCurrentKey("jkl");
+                       state.setCurrentNamespace(namespace1);
+                       assertEquals(asList(11L, 22L, 33L, 44L, 55L), 
state.get());
+
+                       keyedBackend.setCurrentKey("mno");
+                       state.setCurrentNamespace(namespace3);
+                       assertEquals(asList(11L, 22L, 33L, 44L, 55L), 
state.get());
+               }
+       }
+
+//     /**
+//      * This code was used to create the binary file of the old version's 
snapshot used by this test. If you need to
+//      * recreate the binary, you can comment this out and run it.
+//      */
+//     private void createBinarySnapshot() throws Exception {
+//
+//             final String pathToWrite = "/PATH/TO/WRITE";
+//
+//             final ListStateDescriptor<Long> stateDescr = new 
ListStateDescriptor<>("my-state", Long.class);
+//             stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
+//
+//             final Integer namespace1 = 1;
+//             final Integer namespace2 = 2;
+//             final Integer namespace3 = 3;
+//
+//             final HeapKeyedStateBackend<String> keyedBackend = 
createKeyedBackend();
+//
+//             try {
+//                     InternalListState<Integer, Long> state = 
keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr);
+//
+//                     keyedBackend.setCurrentKey("abc");
+//                     state.setCurrentNamespace(namespace1);
+//                     state.add(33L);
+//                     state.add(55L);
+//
+//                     state.setCurrentNamespace(namespace2);
+//                     state.add(22L);
+//                     state.add(11L);
+//
+//                     state.setCurrentNamespace(namespace3);
+//                     state.add(44L);
+//
+//                     keyedBackend.setCurrentKey("def");
+//                     state.setCurrentNamespace(namespace1);
+//                     state.add(11L);
+//                     state.add(44L);
+//
+//                     state.setCurrentNamespace(namespace3);
+//                     state.add(22L);
+//                     state.add(55L);
+//                     state.add(33L);
+//
+//                     keyedBackend.setCurrentKey("jkl");
+//                     state.setCurrentNamespace(namespace1);
+//                     state.add(11L);
+//                     state.add(22L);
+//                     state.add(33L);
+//                     state.add(44L);
+//                     state.add(55L);
+//
+//                     keyedBackend.setCurrentKey("mno");
+//                     state.setCurrentNamespace(namespace3);
+//                     state.add(11L);
+//                     state.add(22L);
+//                     state.add(33L);
+//                     state.add(44L);
+//                     state.add(55L);
+//                     RunnableFuture<KeyGroupsStateHandle> snapshot = 
keyedBackend.snapshot(
+//                                     0L,
+//                                     0L,
+//                                     new MemCheckpointStreamFactory(4 * 1024 
* 1024),
+//                                     CheckpointOptions.forFullCheckpoint());
+//
+//                     snapshot.run();
+//
+//                     try (BufferedOutputStream bis = new 
BufferedOutputStream(new FileOutputStream(pathToWrite))) {
+//                             InstantiationUtil.serializeObject(bis, 
snapshot.get());
+//                     }
+//
+//             } finally {
+//                     keyedBackend.close();
+//                     keyedBackend.dispose();
+//             }
+//     }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index c36a48b..7705c19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -22,13 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
-
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -39,12 +35,11 @@ import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link 
ListState}.
  */
-public class HeapListStateTest {
+public class HeapListStateTest extends HeapStateBackendTestBase {
 
        @Test
        public void testAddAndGet() throws Exception {
@@ -225,16 +220,6 @@ public class HeapListStateTest {
                        keyedBackend.dispose();
                }
        }
-
-       private static HeapKeyedStateBackend<String> createKeyedBackend() 
throws Exception {
-               return new HeapKeyedStateBackend<>(
-                               mock(TaskKvStateRegistry.class),
-                               StringSerializer.INSTANCE,
-                               HeapListStateTest.class.getClassLoader(),
-                               16,
-                               new KeyGroupRange(0, 15),
-                               new ExecutionConfig());
-       }
        
        private static <T> void validateResult(Iterable<T> values, Set<T> 
expected) {
                int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index 63eec04..928eaec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalReducingState;
-
 import org.junit.Test;
 
 import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
 
 /**
  * Tests for the simple Java heap objects implementation of the {@link 
ReducingState}.
  */
-public class HeapReducingStateTest {
+public class HeapReducingStateTest extends HeapStateBackendTestBase {
 
        @Test
        public void testAddAndGet() throws Exception {
@@ -214,7 +209,7 @@ public class HeapReducingStateTest {
                        keyedBackend.setCurrentKey("mno");
                        state.setCurrentNamespace(namespace1);
                        state.clear();
-                       
+
                        StateTable<String, Integer, Long> stateTable =
                                        ((HeapReducingState<String, Integer, 
Long>) state).stateTable;
 
@@ -227,20 +222,6 @@ public class HeapReducingStateTest {
        }
 
        // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       private static HeapKeyedStateBackend<String> createKeyedBackend() 
throws Exception {
-               return new HeapKeyedStateBackend<>(
-                               mock(TaskKvStateRegistry.class),
-                               StringSerializer.INSTANCE,
-                               HeapReducingStateTest.class.getClassLoader(),
-                               16,
-                               new KeyGroupRange(0, 15),
-                               new ExecutionConfig());
-       }
-
-       // 
------------------------------------------------------------------------
        //  test functions
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
new file mode 100644
index 0000000..e6adef8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+@RunWith(Parameterized.class)
+public abstract class HeapStateBackendTestBase {
+
+       @Parameterized.Parameters
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(false, true);
+       }
+
+       @Parameterized.Parameter
+       public boolean async;
+
+       public HeapKeyedStateBackend<String> createKeyedBackend() throws 
Exception {
+               return new HeapKeyedStateBackend<>(
+                               mock(TaskKvStateRegistry.class),
+                               StringSerializer.INSTANCE,
+                               HeapReducingStateTest.class.getClassLoader(),
+                               16,
+                               new KeyGroupRange(0, 15),
+                               async,
+                               new ExecutionConfig());
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
new file mode 100644
index 0000000..6fd94f7
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.ArrayListSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+public class StateTableSnapshotCompatibilityTest {
+
+       /**
+        * This test ensures that different implementations of {@link 
StateTable} are compatible in their serialization
+        * format.
+        */
+       @Test
+       public void checkCompatibleSerializationFormats() throws IOException {
+               final Random r = new Random(42);
+               RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> 
metaInfo =
+                               new RegisteredBackendStateMetaInfo<>(
+                                               StateDescriptor.Type.UNKNOWN,
+                                               "test",
+                                               IntSerializer.INSTANCE,
+                                               new 
ArrayListSerializer<>(IntSerializer.INSTANCE));
+
+               final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> 
keyContext =
+                               new 
CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE);
+
+               CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> 
cowStateTable =
+                               new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               for (int i = 0; i < 100; ++i) {
+                       ArrayList<Integer> list = new ArrayList<>(5);
+                       int end = r.nextInt(5);
+                       for (int j = 0; j < end; ++j) {
+                               list.add(r.nextInt(100));
+                       }
+
+                       cowStateTable.put(r.nextInt(10), r.nextInt(2), list);
+               }
+
+               StateTableSnapshot snapshot = cowStateTable.createSnapshot();
+
+               final NestedMapsStateTable<Integer, Integer, 
ArrayList<Integer>> nestedMapsStateTable =
+                               new NestedMapsStateTable<>(keyContext, 
metaInfo);
+
+               restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, 
keyContext.getKeyGroupRange());
+               snapshot.release();
+
+
+               Assert.assertEquals(cowStateTable.size(), 
nestedMapsStateTable.size());
+               for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : 
cowStateTable) {
+                       Assert.assertEquals(entry.getState(), 
nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()));
+               }
+
+               snapshot = nestedMapsStateTable.createSnapshot();
+               cowStateTable = new CopyOnWriteStateTable<>(keyContext, 
metaInfo);
+
+               restoreStateTableFromSnapshot(cowStateTable, snapshot, 
keyContext.getKeyGroupRange());
+               snapshot.release();
+
+               Assert.assertEquals(nestedMapsStateTable.size(), 
cowStateTable.size());
+               for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : 
cowStateTable) {
+                       
Assert.assertEquals(nestedMapsStateTable.get(entry.getKey(), 
entry.getNamespace()), entry.getState());
+               }
+       }
+
+       private static <K, N, S> void restoreStateTableFromSnapshot(
+                       StateTable<K, N, S> stateTable,
+                       StateTableSnapshot snapshot,
+                       KeyGroupRange keyGroupRange) throws IOException {
+
+               final ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos(1024 * 1024);
+               final DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
+
+               for (Integer keyGroup : keyGroupRange) {
+                       snapshot.writeMappingsInKeyGroup(dov, keyGroup);
+               }
+
+               final ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(out.getBuf());
+               final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(in);
+
+               final StateTableByKeyGroupReader keyGroupReader =
+                               
StateTableByKeyGroupReaders.readerForVersion(stateTable, 
KeyedBackendSerializationProxy.VERSION);
+
+               for (Integer keyGroup : keyGroupRange) {
+                       keyGroupReader.readMappingsInKeyGroup(div, keyGroup);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
new file mode 100644
index 0000000..291f3ed
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CheckpointStreamFactory} for tests that allows for testing 
cancellation in async IO
+ */
+@VisibleForTesting
+@Internal
+public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory 
{
+
+       private final int maxSize;
+       private volatile int afterNumberInvocations;
+       private volatile OneShotLatch blocker;
+       private volatile OneShotLatch waiter;
+
+       MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
+
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
+               return lastCreatedStream;
+       }
+
+       public BlockerCheckpointStreamFactory(int maxSize) {
+               this.maxSize = maxSize;
+       }
+
+       public void setAfterNumberInvocations(int afterNumberInvocations) {
+               this.afterNumberInvocations = afterNumberInvocations;
+       }
+
+       public void setBlockerLatch(OneShotLatch latch) {
+               this.blocker = latch;
+       }
+
+       public void setWaiterLatch(OneShotLatch latch) {
+               this.waiter = latch;
+       }
+
+       @Override
+       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
+               this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
+
+                       private int afterNInvocations = afterNumberInvocations;
+                       private final OneShotLatch streamBlocker = blocker;
+                       private final OneShotLatch streamWaiter = waiter;
+
+                       @Override
+                       public void write(int b) throws IOException {
+
+                               if (null != waiter) {
+                                       waiter.trigger();
+                               }
+
+                               if (afterNInvocations > 0) {
+                                       --afterNInvocations;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamBlocker) {
+                                       try {
+                                               streamBlocker.await();
+                                       } catch (InterruptedException ignored) {
+                                       }
+                               }
+                               try {
+                                       super.write(b);
+                               } catch (IOException ex) {
+                                       if (null != streamWaiter) {
+                                               streamWaiter.trigger();
+                                       }
+                                       throw ex;
+                               }
+
+                               if (0 == afterNInvocations && null != 
streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+
+                       @Override
+                       public void close() {
+                               super.close();
+                               if (null != streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                       }
+               };
+
+               return lastCreatedStream;
+       }
+
+       @Override
+       public void close() throws Exception {
+
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot 
b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot
new file mode 100644
index 0000000..b9171bc
Binary files /dev/null and 
b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 05c89dd..781c320 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.util.MathUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -98,9 +99,7 @@ public class TimeWindow extends Window {
 
        @Override
        public int hashCode() {
-               int result = (int) (start ^ (start >>> 32));
-               result = 31 * result + (int) (end ^ (end >>> 32));
-               return result;
+               return MathUtils.longToIntWithBitMixing(start + end);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..5e966d1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -91,7 +91,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC
+               MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC
        }
 
        @BeforeClass
@@ -116,11 +116,19 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        public void initStateBackend() throws IOException {
                switch (stateBackendEnum) {
                        case MEM:
-                               this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE);
+                               this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
                                break;
                        case FILE: {
                                String backups = 
tempFolder.newFolder().getAbsolutePath();
-                               this.stateBackend = new 
FsStateBackend("file://" + backups);
+                               this.stateBackend = new 
FsStateBackend("file://" + backups, false);
+                               break;
+                       }
+                       case MEM_ASYNC:
+                               this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE, true);
+                               break;
+                       case FILE_ASYNC: {
+                               String backups = 
tempFolder.newFolder().getAbsolutePath();
+                               this.stateBackend = new 
FsStateBackend("file://" + backups, true);
                                break;
                        }
                        case ROCKSDB_FULLY_ASYNC: {
@@ -138,9 +146,9 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        @Test
        public void testTumblingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
+               final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+               final int WINDOW_SIZE = windowSize();
+               final int NUM_KEYS = numKeys();
                FailingSource.reset();
                
                try {
@@ -211,9 +219,9 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
+               final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+               final int WINDOW_SIZE = windowSize();
+               final int NUM_KEYS = numKeys();
                FailingSource.reset();
 
                try {
@@ -280,10 +288,10 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        @Test
        public void testSlidingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 1000;
-               final int WINDOW_SLIDE = 100;
-               final int NUM_KEYS = 100;
+               final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+               final int WINDOW_SIZE = windowSize();
+               final int WINDOW_SLIDE = windowSlide();
+               final int NUM_KEYS = numKeys();
                FailingSource.reset();
 
                try {
@@ -346,9 +354,9 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        @Test
        public void testPreAggregatedTumblingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 100;
-               final int NUM_KEYS = 100;
+               final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+               final int WINDOW_SIZE = windowSize();
+               final int NUM_KEYS = numKeys();
                FailingSource.reset();
 
                try {
@@ -418,10 +426,10 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        @Test
        public void testPreAggregatedSlidingTimeWindow() {
-               final int NUM_ELEMENTS_PER_KEY = 3000;
-               final int WINDOW_SIZE = 1000;
-               final int WINDOW_SLIDE = 100;
-               final int NUM_KEYS = 100;
+               final int NUM_ELEMENTS_PER_KEY = numElementsPerKey();
+               final int WINDOW_SIZE = windowSize();
+               final int WINDOW_SLIDE = windowSlide();
+               final int NUM_KEYS = numKeys();
                FailingSource.reset();
 
                try {
@@ -790,4 +798,20 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
                public IntType(int value) { this.value = value; }
        }
+
+       protected int numElementsPerKey() {
+               return 300;
+       }
+
+       protected int windowSize() {
+               return 100;
+       }
+
+       protected int windowSlide() {
+               return 100;
+       }
+
+       protected int numKeys() {
+               return 20;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..a5bf10c
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.FILE_ASYNC);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..ef9ad37
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
+
+       public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+               super(StateBackendEnum.MEM_ASYNC);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index 14feb78..da2bbc7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -23,4 +23,24 @@ public class 
RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEv
        public RocksDbBackendEventTimeWindowCheckpointingITCase() {
                super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
        }
+
+       @Override
+       protected int numElementsPerKey() {
+               return 3000;
+       }
+
+       @Override
+       protected int windowSize() {
+               return 1000;
+       }
+
+       @Override
+       protected int windowSlide() {
+               return 100;
+       }
+
+       @Override
+       protected int numKeys() {
+               return 100;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
index 456861a..cbb56d0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java
@@ -33,6 +33,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.util.Random;
+
 /**
  * A collection of manual tests that serve to assess the performance of 
windowed operations. These
  * run in local mode with parallelism 1 with a source that emits data as fast 
as possible. Thus,
@@ -241,11 +243,10 @@ public class ManualWindowSpeedITCase extends 
StreamingMultipleProgramsTestBase {
 
                @Override
                public void run(SourceContext<Tuple2<String, Integer>> out) 
throws Exception {
-                       long index = 0;
+                       Random random = new Random(42);
                        while (running) {
-                               Tuple2<String, Integer> tuple = new 
Tuple2<String, Integer>("Tuple " + (index % numKeys), 1);
+                               Tuple2<String, Integer> tuple = new 
Tuple2<String, Integer>("Tuple " + (random.nextInt(numKeys)), 1);
                                out.collect(tuple);
-                               index++;
                        }
                }
 

Reply via email to