[FLINK-6207] Duplicate TypeSerializers for async snapshots of CopyOnWriteStateTable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89866a5a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89866a5a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89866a5a Branch: refs/heads/table-retraction Commit: 89866a5ad0673dd448b3cff186e6d663189a60e3 Parents: ad21a44 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue Mar 28 16:56:27 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Mar 28 19:20:53 2017 +0200 ---------------------------------------------------------------------- .../heap/CopyOnWriteStateTableSnapshot.java | 34 +++- .../state/heap/CopyOnWriteStateTableTest.java | 171 ++++++++++++++++++- 2 files changed, 195 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/89866a5a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java index c83fce0..2ac88b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableSnapshot.java @@ -71,6 +71,21 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> private int[] keyGroupOffsets; /** + * A local duplicate of the table's key serializer. + */ + private final TypeSerializer<K> localKeySerializer; + + /** + * A local duplicate of the table's namespace serializer. + */ + private final TypeSerializer<N> localNamespaceSerializer; + + /** + * A local duplicate of the table's state serializer. + */ + private final TypeSerializer<S> localStateSerializer; + + /** * Creates a new {@link CopyOnWriteStateTableSnapshot}. * * @param owningStateTable the {@link CopyOnWriteStateTable} for which this object represents a snapshot. @@ -81,6 +96,13 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> this.snapshotData = owningStateTable.snapshotTableArrays(); this.snapshotVersion = owningStateTable.getStateTableVersion(); this.stateTableSize = owningStateTable.size(); + + // We create duplicates of the serializers for the async snapshot, because TypeSerializer + // might be stateful and shared with the event processing thread. + this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate(); + this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate(); + this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate(); + this.keyGroupOffsets = null; } @@ -162,10 +184,6 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> int startOffset = keyGroupOffsetIdx < 0 ? 0 : keyGroupOffsets[keyGroupOffsetIdx]; int endOffset = keyGroupOffsets[keyGroupOffsetIdx + 1]; - TypeSerializer<K> keySerializer = owningStateTable.keyContext.getKeySerializer(); - TypeSerializer<N> namespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer(); - TypeSerializer<S> stateSerializer = owningStateTable.metaInfo.getStateSerializer(); - // write number of mappings in key-group dov.writeInt(endOffset - startOffset); @@ -173,9 +191,9 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> for (int i = startOffset; i < endOffset; ++i) { CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i]; groupedOut[i] = null; // free asap for GC - namespaceSerializer.serialize(toWrite.namespace, dov); - keySerializer.serialize(toWrite.key, dov); - stateSerializer.serialize(toWrite.state, dov); + localNamespaceSerializer.serialize(toWrite.namespace, dov); + localKeySerializer.serialize(toWrite.key, dov); + localStateSerializer.serialize(toWrite.state, dov); } } @@ -185,4 +203,4 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) { return stateTable == owningStateTable; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/89866a5a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java index 08896da..976b9aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java @@ -23,13 +23,19 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; @@ -37,7 +43,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Random; -public class CopyOnWriteStateTableTest { +public class CopyOnWriteStateTableTest extends TestLogger { /** * Testing the basic map operations. @@ -380,6 +386,77 @@ public class CopyOnWriteStateTableTest { Assert.assertTrue(originalState5 == stateTable.get(5, 1)); } + /** + * This tests that serializers used for snapshots are duplicates of the ones used in + * processing to avoid race conditions in stateful serializers. + */ + @Test + public void testSerializerDuplicationInSnapshot() throws IOException { + + final TestDuplicateSerializer namespaceSerializer = new TestDuplicateSerializer(); + final TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();; + final TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();; + + RegisteredBackendStateMetaInfo<Integer, Integer> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, + "test", + namespaceSerializer, + stateSerializer); + + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0); + InternalKeyContext<Integer> mockKeyContext = new InternalKeyContext<Integer>() { + @Override + public Integer getCurrentKey() { + return 0; + } + + @Override + public int getCurrentKeyGroupIndex() { + return 0; + } + + @Override + public int getNumberOfKeyGroups() { + return 1; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public TypeSerializer<Integer> getKeySerializer() { + return keySerializer; + } + }; + + CopyOnWriteStateTable<Integer, Integer, Integer> table = + new CopyOnWriteStateTable<>(mockKeyContext, metaInfo); + + table.put(0, 0, 0, 0); + table.put(1, 0, 0, 1); + table.put(2, 0, 1, 2); + + + CopyOnWriteStateTableSnapshot<Integer, Integer, Integer> snapshot = table.createSnapshot(); + + try { + + namespaceSerializer.disable(); + keySerializer.disable(); + stateSerializer.disable(); + + snapshot.writeMappingsInKeyGroup( + new DataOutputViewStreamWrapper( + new ByteArrayOutputStreamWithPos(1024)), 0); + + } finally { + table.releaseSnapshot(snapshot); + } + } + @SuppressWarnings("unchecked") private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) { @@ -483,4 +560,94 @@ public class CopyOnWriteStateTableTest { return serializer; } } -} \ No newline at end of file + + /** + * Serializer that can be disabled. Duplicates are still enabled, so we can check that + * serializers are duplicated. + */ + static class TestDuplicateSerializer extends TypeSerializer<Integer> { + + private static final long serialVersionUID = 1L; + + private static final Integer ZERO = 0; + + private boolean disabled; + + public TestDuplicateSerializer() { + this.disabled = false; + } + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public TypeSerializer<Integer> duplicate() { + return new TestDuplicateSerializer(); + } + + @Override + public Integer createInstance() { + return ZERO; + } + + @Override + public Integer copy(Integer from) { + return from; + } + + @Override + public Integer copy(Integer from, Integer reuse) { + return from; + } + + @Override + public int getLength() { + return 4; + } + + @Override + public void serialize(Integer record, DataOutputView target) throws IOException { + Assert.assertFalse(disabled); + target.writeInt(record); + } + + @Override + public Integer deserialize(DataInputView source) throws IOException { + Assert.assertFalse(disabled); + return source.readInt(); + } + + @Override + public Integer deserialize(Integer reuse, DataInputView source) throws IOException { + Assert.assertFalse(disabled); + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + Assert.assertFalse(disabled); + target.writeInt(source.readInt()); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof TestDuplicateSerializer; + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof TestDuplicateSerializer; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + public void disable() { + this.disabled = true; + } + } +}