Repository: flink Updated Branches: refs/heads/master 2bfead7d9 -> d8a467b01
[FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints This commit adds the config snapshot of the key serializer of keyed backends to its checkpoints. This allows the oppurtunity to upgrade key serializers, as well as state migration in the future in the case of incompatible old and new key serializers. This closes #3925. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a467b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a467b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a467b0 Branch: refs/heads/master Commit: d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4 Parents: 2bfead7 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Wed May 17 01:15:57 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Wed May 17 22:46:51 2017 +0800 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 30 ++++++++++ .../state/KeyedBackendSerializationProxy.java | 61 ++++++++++++++++++-- .../state/heap/HeapKeyedStateBackend.java | 15 +++++ .../runtime/state/SerializationProxiesTest.java | 54 ++++++++++++++++- .../runtime/state/StateBackendTestBase.java | 42 ++++++++++++++ 5 files changed, 197 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 4bd94fd..ddc7e17 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws ClassNotFoundException * @throws RocksDBException */ + @SuppressWarnings("unchecked") private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException { KeyedBackendSerializationProxy serializationProxy = @@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.read(currentStateHandleInView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) rocksDBKeyedStateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); @@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.stateBackend = stateBackend; } + @SuppressWarnings("unchecked") private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData( StreamStateHandle metaStateHandle) throws Exception { @@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { DataInputView in = new DataInputViewStreamWrapper(inputStream); serializationProxy.read(in); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) stateBackend.keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + return serializationProxy.getStateMetaInfoSnapshots(); } finally { if (inputStream != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index a20628c..94fb9f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -19,10 +19,16 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; +import org.apache.flink.api.common.typeutils.TypeSerializerUtil; import org.apache.flink.core.io.VersionedIOReadableWritable; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -38,6 +44,8 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public static final int VERSION = 3; private TypeSerializer<?> keySerializer; + private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; + private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots; private ClassLoader userCodeClassLoader; @@ -51,6 +59,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { this.keySerializer = Preconditions.checkNotNull(keySerializer); + this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration()); Preconditions.checkNotNull(stateMetaInfoSnapshots); Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE); @@ -65,6 +74,10 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable return keySerializer; } + public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() { + return keySerializerConfigSnapshot; + } + @Override public int getVersion() { return VERSION; @@ -80,10 +93,24 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable public void write(DataOutputView out) throws IOException { super.write(out); - new TypeSerializerSerializationProxy<>(keySerializer).write(out); + // write in a way to be fault tolerant of read failures when deserializing the key serializer + try ( + ByteArrayOutputStreamWithPos buffer = new ByteArrayOutputStreamWithPos(); + DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(buffer)){ - out.writeShort(stateMetaInfoSnapshots.size()); + new TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper); + + // write offset of key serializer's configuration snapshot + out.writeInt(buffer.getPosition()); + TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, keySerializerConfigSnapshot); + // flush buffer + out.writeInt(buffer.getPosition()); + out.write(buffer.getBuf(), 0, buffer.getPosition()); + } + + // write individual registered keyed state metainfos + out.writeShort(stateMetaInfoSnapshots.size()); for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) { KeyedBackendStateMetaInfoSnapshotReaderWriters .getWriterForVersion(VERSION, metaInfo) @@ -97,8 +124,34 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable final TypeSerializerSerializationProxy<?> keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader); - keySerializerProxy.read(in); - this.keySerializer = keySerializerProxy.getTypeSerializer(); + + // only starting from version 3, we have the key serializer and its config snapshot written + if (getReadVersion() >= 3) { + int keySerializerConfigSnapshotOffset = in.readInt(); + int numBufferedBytes = in.readInt(); + byte[] keySerializerAndConfigBytes = new byte[numBufferedBytes]; + in.readFully(keySerializerAndConfigBytes); + + try ( + ByteArrayInputStreamWithPos buffer = new ByteArrayInputStreamWithPos(keySerializerAndConfigBytes); + DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(buffer)) { + + try { + keySerializerProxy.read(bufferWrapper); + this.keySerializer = keySerializerProxy.getTypeSerializer(); + } catch (IOException e) { + this.keySerializer = null; + } + + buffer.setPosition(keySerializerConfigSnapshotOffset); + this.keySerializerConfigSnapshot = + TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader); + } + } else { + keySerializerProxy.read(in); + this.keySerializer = keySerializerProxy.getTypeSerializer(); + this.keySerializerConfigSnapshot = null; + } int numKvStates = in.readShort(); stateMetaInfoSnapshots = new ArrayList<>(numKvStates); http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 11e7760..8d3d8a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateMigrationUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; @@ -385,6 +386,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.read(inView); + // check for key serializer compatibility; this also reconfigures the + // key serializer to be compatible, if it is required and is possible + if (StateMigrationUtil.resolveCompatibilityResult( + serializationProxy.getKeySerializer(), + TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class, + serializationProxy.getKeySerializerConfigSnapshot(), + (TypeSerializer) keySerializer) + .isRequiresMigration()) { + + // TODO replace with state migration; note that key hash codes need to remain the same after migration + throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + "Aborting now since state migration is currently not available"); + } + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 02b4d62..8bbbd5f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -44,7 +44,10 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) -@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) +@PrepareForTest({ + KeyedBackendSerializationProxy.class, + KeyedBackendStateMetaInfoSnapshotReaderWriters.class, + OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) public class SerializationProxiesTest { @Test @@ -80,10 +83,59 @@ public class SerializationProxiesTest { } Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer()); + Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots()); } @Test + public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures() throws Exception { + + TypeSerializer<?> keySerializer = IntSerializer.INSTANCE; + TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE; + + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>(); + + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot()); + stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>( + StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); + + KeyedBackendSerializationProxy serializationProxy = + new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList); + + byte[] serialized; + try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { + serializationProxy.write(new DataOutputViewStreamWrapper(out)); + serialized = out.toByteArray(); + } + + serializationProxy = + new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader()); + + // mock failure when deserializing serializers + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) { + serializationProxy.read(new DataInputViewStreamWrapper(in)); + } + + Assert.assertEquals(null, serializationProxy.getKeySerializer()); + Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); + + for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) { + Assert.assertEquals(null, meta.getNamespaceSerializer()); + Assert.assertEquals(null, meta.getStateSerializer()); + Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot()); + Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot()); + } + } + + @Test public void testKeyedStateMetaInfoSerialization() throws Exception { String name = "test"; http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index ca66ffb..b1927f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -38,6 +38,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; @@ -1805,6 +1806,47 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } @Test + public void testRestoreWithWrongKeySerializer() { + try { + CheckpointStreamFactory streamFactory = createStreamFactory(); + + // use an IntSerializer at first + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); + + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // write some state + backend.setCurrentKey(1); + state.update("1"); + backend.setCurrentKey(2); + state.update("2"); + + // draw a snapshot + KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + // restore with the wrong key serializer + try { + restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1); + + fail("should recognize wrong key serializer"); + } catch (RuntimeException e) { + if (!e.getMessage().contains("The new key serializer is not compatible")) { + fail("wrong exception " + e); + } + // expected + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test @SuppressWarnings("unchecked") public void testValueStateRestoreWithWrongSerializers() { try {
