[FLINK-6207] Duplicate TypeSerializers for async snapshots of 
CopyOnWriteStateTable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89866a5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89866a5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89866a5a

Branch: refs/heads/table-retraction
Commit: 89866a5ad0673dd448b3cff186e6d663189a60e3
Parents: ad21a44
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Mar 28 16:56:27 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Mar 28 19:20:53 2017 +0200

----------------------------------------------------------------------
 .../heap/CopyOnWriteStateTableSnapshot.java     |  34 +++-
 .../state/heap/CopyOnWriteStateTableTest.java   | 171 ++++++++++++++++++-
 2 files changed, 195 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89866a5a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
index c83fce0..2ac88b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java
@@ -71,6 +71,21 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
        private int[] keyGroupOffsets;
 
        /**
+        * A local duplicate of the table's key serializer.
+        */
+       private final TypeSerializer<K> localKeySerializer;
+
+       /**
+        * A local duplicate of the table's namespace serializer.
+        */
+       private final TypeSerializer<N> localNamespaceSerializer;
+
+       /**
+        * A local duplicate of the table's state serializer.
+        */
+       private final TypeSerializer<S> localStateSerializer;
+
+       /**
         * Creates a new {@link CopyOnWriteStateTableSnapshot}.
         *
         * @param owningStateTable the {@link CopyOnWriteStateTable} for which 
this object represents a snapshot.
@@ -81,6 +96,13 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
                this.snapshotData = owningStateTable.snapshotTableArrays();
                this.snapshotVersion = owningStateTable.getStateTableVersion();
                this.stateTableSize = owningStateTable.size();
+
+               // We create duplicates of the serializers for the async 
snapshot, because TypeSerializer
+               // might be stateful and shared with the event processing 
thread.
+               this.localKeySerializer = 
owningStateTable.keyContext.getKeySerializer().duplicate();
+               this.localNamespaceSerializer = 
owningStateTable.metaInfo.getNamespaceSerializer().duplicate();
+               this.localStateSerializer = 
owningStateTable.metaInfo.getStateSerializer().duplicate();
+
                this.keyGroupOffsets = null;
        }
 
@@ -162,10 +184,6 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
                int startOffset = keyGroupOffsetIdx < 0 ? 0 : 
keyGroupOffsets[keyGroupOffsetIdx];
                int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1];
 
-               TypeSerializer<K> keySerializer = 
owningStateTable.keyContext.getKeySerializer();
-               TypeSerializer<N> namespaceSerializer = 
owningStateTable.metaInfo.getNamespaceSerializer();
-               TypeSerializer<S> stateSerializer = 
owningStateTable.metaInfo.getStateSerializer();
-
                // write number of mappings in key-group
                dov.writeInt(endOffset - startOffset);
 
@@ -173,9 +191,9 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
                for (int i = startOffset; i < endOffset; ++i) {
                        CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite 
= groupedOut[i];
                        groupedOut[i] = null; // free asap for GC
-                       namespaceSerializer.serialize(toWrite.namespace, dov);
-                       keySerializer.serialize(toWrite.key, dov);
-                       stateSerializer.serialize(toWrite.state, dov);
+                       localNamespaceSerializer.serialize(toWrite.namespace, 
dov);
+                       localKeySerializer.serialize(toWrite.key, dov);
+                       localStateSerializer.serialize(toWrite.state, dov);
                }
        }
 
@@ -185,4 +203,4 @@ public class CopyOnWriteStateTableSnapshot<K, N, S>
        boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
                return stateTable == owningStateTable;
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/89866a5a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
index 08896da..976b9aa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java
@@ -23,13 +23,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -37,7 +43,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
-public class CopyOnWriteStateTableTest {
+public class CopyOnWriteStateTableTest extends TestLogger {
 
        /**
         * Testing the basic map operations.
@@ -380,6 +386,77 @@ public class CopyOnWriteStateTableTest {
                Assert.assertTrue(originalState5 == stateTable.get(5, 1));
        }
 
+       /**
+        * This tests that serializers used for snapshots are duplicates of the 
ones used in
+        * processing to avoid race conditions in stateful serializers.
+        */
+       @Test
+       public void testSerializerDuplicationInSnapshot() throws IOException {
+
+               final TestDuplicateSerializer namespaceSerializer = new 
TestDuplicateSerializer();
+               final TestDuplicateSerializer stateSerializer = new 
TestDuplicateSerializer();;
+               final TestDuplicateSerializer keySerializer = new 
TestDuplicateSerializer();;
+
+               RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo =
+                       new RegisteredBackendStateMetaInfo<>(
+                               StateDescriptor.Type.VALUE,
+                               "test",
+                               namespaceSerializer,
+                               stateSerializer);
+
+               final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
+               InternalKeyContext<Integer> mockKeyContext = new 
InternalKeyContext<Integer>() {
+                       @Override
+                       public Integer getCurrentKey() {
+                               return 0;
+                       }
+
+                       @Override
+                       public int getCurrentKeyGroupIndex() {
+                               return 0;
+                       }
+
+                       @Override
+                       public int getNumberOfKeyGroups() {
+                               return 1;
+                       }
+
+                       @Override
+                       public KeyGroupRange getKeyGroupRange() {
+                               return keyGroupRange;
+                       }
+
+                       @Override
+                       public TypeSerializer<Integer> getKeySerializer() {
+                               return keySerializer;
+                       }
+               };
+
+               CopyOnWriteStateTable<Integer, Integer, Integer> table =
+                       new CopyOnWriteStateTable<>(mockKeyContext, metaInfo);
+
+               table.put(0, 0, 0, 0);
+               table.put(1, 0, 0, 1);
+               table.put(2, 0, 1, 2);
+
+
+               CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> 
snapshot = table.createSnapshot();
+
+               try {
+
+                       namespaceSerializer.disable();
+                       keySerializer.disable();
+                       stateSerializer.disable();
+
+                       snapshot.writeMappingsInKeyGroup(
+                               new DataOutputViewStreamWrapper(
+                                       new 
ByteArrayOutputStreamWithPos(1024)), 0);
+
+               } finally {
+                       table.releaseSnapshot(snapshot);
+               }
+       }
+
        @SuppressWarnings("unchecked")
        private static <K, N, S> Tuple3<K, N, S>[] 
convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) 
{
 
@@ -483,4 +560,94 @@ public class CopyOnWriteStateTableTest {
                        return serializer;
                }
        }
-}
\ No newline at end of file
+
+       /**
+        * Serializer that can be disabled. Duplicates are still enabled, so we 
can check that
+        * serializers are duplicated.
+        */
+       static class TestDuplicateSerializer extends TypeSerializer<Integer> {
+
+               private static final long serialVersionUID = 1L;
+
+               private static final Integer ZERO = 0;
+
+               private boolean disabled;
+
+               public TestDuplicateSerializer() {
+                       this.disabled = false;
+               }
+
+               @Override
+               public boolean isImmutableType() {
+                       return true;
+               }
+
+               @Override
+               public TypeSerializer<Integer> duplicate() {
+                       return new TestDuplicateSerializer();
+               }
+
+               @Override
+               public Integer createInstance() {
+                       return ZERO;
+               }
+
+               @Override
+               public Integer copy(Integer from) {
+                       return from;
+               }
+
+               @Override
+               public Integer copy(Integer from, Integer reuse) {
+                       return from;
+               }
+
+               @Override
+               public int getLength() {
+                       return 4;
+               }
+
+               @Override
+               public void serialize(Integer record, DataOutputView target) 
throws IOException {
+                       Assert.assertFalse(disabled);
+                       target.writeInt(record);
+               }
+
+               @Override
+               public Integer deserialize(DataInputView source) throws 
IOException {
+                       Assert.assertFalse(disabled);
+                       return source.readInt();
+               }
+
+               @Override
+               public Integer deserialize(Integer reuse, DataInputView source) 
throws IOException {
+                       Assert.assertFalse(disabled);
+                       return deserialize(source);
+               }
+
+               @Override
+               public void copy(DataInputView source, DataOutputView target) 
throws IOException {
+                       Assert.assertFalse(disabled);
+                       target.writeInt(source.readInt());
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       return obj instanceof TestDuplicateSerializer;
+               }
+
+               @Override
+               public boolean canEqual(Object obj) {
+                       return obj instanceof TestDuplicateSerializer;
+               }
+
+               @Override
+               public int hashCode() {
+                       return getClass().hashCode();
+               }
+
+               public void disable() {
+                       this.disabled = true;
+               }
+       }
+}

Reply via email to