Repository: flink
Updated Branches:
  refs/heads/master 2bfead7d9 -> d8a467b01


[FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

This commit adds the config snapshot of the key serializer of keyed
backends to its checkpoints. This allows the oppurtunity to upgrade key
serializers, as well as state migration in the future in the case of
incompatible old and new key serializers.

This closes #3925.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a467b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a467b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a467b0

Branch: refs/heads/master
Commit: d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4
Parents: 2bfead7
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Wed May 17 01:15:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Wed May 17 22:46:51 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 30 ++++++++++
 .../state/KeyedBackendSerializationProxy.java   | 61 ++++++++++++++++++--
 .../state/heap/HeapKeyedStateBackend.java       | 15 +++++
 .../runtime/state/SerializationProxiesTest.java | 54 ++++++++++++++++-
 .../runtime/state/StateBackendTestBase.java     | 42 ++++++++++++++
 5 files changed, 197 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4bd94fd..ddc7e17 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws ClassNotFoundException
                 * @throws RocksDBException
                 */
+               @SuppressWarnings("unchecked")
                private void restoreKVStateMetaData() throws IOException, 
ClassNotFoundException, RocksDBException {
 
                        KeyedBackendSerializationProxy serializationProxy =
@@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        serializationProxy.read(currentStateHandleInView);
 
+                       // check for key serializer compatibility; this also 
reconfigures the
+                       // key serializer to be compatible, if it is required 
and is possible
+                       if (StateMigrationUtil.resolveCompatibilityResult(
+                                       serializationProxy.getKeySerializer(),
+                                       
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                       
serializationProxy.getKeySerializerConfigSnapshot(),
+                                       (TypeSerializer) 
rocksDBKeyedStateBackend.keySerializer)
+                               .isRequiresMigration()) {
+
+                               // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
+                               throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+                                       "Aborting now since state migration is 
currently not available");
+                       }
+
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
                                        
serializationProxy.getStateMetaInfoSnapshots();
 
@@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        this.stateBackend = stateBackend;
                }
 
+               @SuppressWarnings("unchecked")
                private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
                                StreamStateHandle metaStateHandle) throws 
Exception {
 
@@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
                                serializationProxy.read(in);
 
+                               // check for key serializer compatibility; this 
also reconfigures the
+                               // key serializer to be compatible, if it is 
required and is possible
+                               if 
(StateMigrationUtil.resolveCompatibilityResult(
+                                               
serializationProxy.getKeySerializer(),
+                                               
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                               
serializationProxy.getKeySerializerConfigSnapshot(),
+                                               (TypeSerializer) 
stateBackend.keySerializer)
+                                       .isRequiresMigration()) {
+
+                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
+                                       throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+                                               "Aborting now since state 
migration is currently not available");
+                               }
+
                                return 
serializationProxy.getStateMetaInfoSnapshots();
                        } finally {
                                if (inputStream != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index a20628c..94fb9f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -19,10 +19,16 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -38,6 +44,8 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        public static final int VERSION = 3;
 
        private TypeSerializer<?> keySerializer;
+       private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
+
        private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots;
 
        private ClassLoader userCodeClassLoader;
@@ -51,6 +59,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
 
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
+               this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
 
                Preconditions.checkNotNull(stateMetaInfoSnapshots);
                Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
@@ -65,6 +74,10 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                return keySerializer;
        }
 
+       public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
+               return keySerializerConfigSnapshot;
+       }
+
        @Override
        public int getVersion() {
                return VERSION;
@@ -80,10 +93,24 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
-               new 
TypeSerializerSerializationProxy<>(keySerializer).write(out);
+               // write in a way to be fault tolerant of read failures when 
deserializing the key serializer
+               try (
+                       ByteArrayOutputStreamWithPos buffer = new 
ByteArrayOutputStreamWithPos();
+                       DataOutputViewStreamWrapper bufferWrapper = new 
DataOutputViewStreamWrapper(buffer)){
 
-               out.writeShort(stateMetaInfoSnapshots.size());
+                       new 
TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper);
+
+                       // write offset of key serializer's configuration 
snapshot
+                       out.writeInt(buffer.getPosition());
+                       
TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, 
keySerializerConfigSnapshot);
 
+                       // flush buffer
+                       out.writeInt(buffer.getPosition());
+                       out.write(buffer.getBuf(), 0, buffer.getPosition());
+               }
+
+               // write individual registered keyed state metainfos
+               out.writeShort(stateMetaInfoSnapshots.size());
                for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
metaInfo : stateMetaInfoSnapshots) {
                        KeyedBackendStateMetaInfoSnapshotReaderWriters
                                .getWriterForVersion(VERSION, metaInfo)
@@ -97,8 +124,34 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
 
                final TypeSerializerSerializationProxy<?> keySerializerProxy =
                        new 
TypeSerializerSerializationProxy<>(userCodeClassLoader);
-               keySerializerProxy.read(in);
-               this.keySerializer = keySerializerProxy.getTypeSerializer();
+
+               // only starting from version 3, we have the key serializer and 
its config snapshot written
+               if (getReadVersion() >= 3) {
+                       int keySerializerConfigSnapshotOffset = in.readInt();
+                       int numBufferedBytes = in.readInt();
+                       byte[] keySerializerAndConfigBytes = new 
byte[numBufferedBytes];
+                       in.readFully(keySerializerAndConfigBytes);
+
+                       try (
+                               ByteArrayInputStreamWithPos buffer = new 
ByteArrayInputStreamWithPos(keySerializerAndConfigBytes);
+                               DataInputViewStreamWrapper bufferWrapper = new 
DataInputViewStreamWrapper(buffer)) {
+
+                               try {
+                                       keySerializerProxy.read(bufferWrapper);
+                                       this.keySerializer = 
keySerializerProxy.getTypeSerializer();
+                               } catch (IOException e) {
+                                       this.keySerializer = null;
+                               }
+
+                               
buffer.setPosition(keySerializerConfigSnapshotOffset);
+                               this.keySerializerConfigSnapshot =
+                                       
TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, 
userCodeClassLoader);
+                       }
+               } else {
+                       keySerializerProxy.read(in);
+                       this.keySerializer = 
keySerializerProxy.getTypeSerializer();
+                       this.keySerializerConfigSnapshot = null;
+               }
 
                int numKvStates = in.readShort();
                stateMetaInfoSnapshots = new ArrayList<>(numKvStates);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 11e7760..8d3d8a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -385,6 +386,20 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                serializationProxy.read(inView);
 
+                               // check for key serializer compatibility; this 
also reconfigures the
+                               // key serializer to be compatible, if it is 
required and is possible
+                               if 
(StateMigrationUtil.resolveCompatibilityResult(
+                                               
serializationProxy.getKeySerializer(),
+                                               
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+                                               
serializationProxy.getKeySerializerConfigSnapshot(),
+                                               (TypeSerializer) keySerializer)
+                                       .isRequiresMigration()) {
+
+                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
+                                       throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+                                               "Aborting now since state 
migration is currently not available");
+                               }
+
                                
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
                                                
serializationProxy.getStateMetaInfoSnapshots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 02b4d62..8bbbd5f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -44,7 +44,10 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, 
OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
+@PrepareForTest({
+       KeyedBackendSerializationProxy.class,
+       KeyedBackendStateMetaInfoSnapshotReaderWriters.class,
+       OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class SerializationProxiesTest {
 
        @Test
@@ -80,10 +83,59 @@ public class SerializationProxiesTest {
                }
 
                Assert.assertEquals(keySerializer, 
serializationProxy.getKeySerializer());
+               Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
                Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
        }
 
        @Test
+       public void 
testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures()
 throws Exception {
+
+               TypeSerializer<?> keySerializer = IntSerializer.INSTANCE;
+               TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+               TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+               List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoList = new ArrayList<>();
+
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "a", namespaceSerializer, 
stateSerializer).snapshot());
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "b", namespaceSerializer, 
stateSerializer).snapshot());
+               stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+                       StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
+
+               KeyedBackendSerializationProxy serializationProxy =
+                       new KeyedBackendSerializationProxy(keySerializer, 
stateMetaInfoList);
+
+               byte[] serialized;
+               try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
+                       serializationProxy.write(new 
DataOutputViewStreamWrapper(out));
+                       serialized = out.toByteArray();
+               }
+
+               serializationProxy =
+                       new 
KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+
+               // mock failure when deserializing serializers
+               TypeSerializerSerializationProxy<?> mockProxy = 
mock(TypeSerializerSerializationProxy.class);
+               doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+               
PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+               try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+                       serializationProxy.read(new 
DataInputViewStreamWrapper(in));
+               }
+
+               Assert.assertEquals(null, 
serializationProxy.getKeySerializer());
+               Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
+
+               for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : 
serializationProxy.getStateMetaInfoSnapshots()) {
+                       Assert.assertEquals(null, 
meta.getNamespaceSerializer());
+                       Assert.assertEquals(null, meta.getStateSerializer());
+                       
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
meta.getNamespaceSerializerConfigSnapshot());
+                       
Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
meta.getStateSerializerConfigSnapshot());
+               }
+       }
+
+       @Test
        public void testKeyedStateMetaInfoSerialization() throws Exception {
 
                String name = "test";

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ca66ffb..b1927f1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -1805,6 +1806,47 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        @Test
+       public void testRestoreWithWrongKeySerializer() {
+               try {
+                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
+
+                       // use an IntSerializer at first
+                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+                       ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
+
+                       ValueState<String> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       // write some state
+                       backend.setCurrentKey(1);
+                       state.update("1");
+                       backend.setCurrentKey(2);
+                       state.update("2");
+
+                       // draw a snapshot
+                       KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forFullCheckpoint()));
+
+                       backend.dispose();
+
+                       // restore with the wrong key serializer
+                       try {
+                               restoreKeyedBackend(DoubleSerializer.INSTANCE, 
snapshot1);
+
+                               fail("should recognize wrong key serializer");
+                       } catch (RuntimeException e) {
+                               if (!e.getMessage().contains("The new key 
serializer is not compatible")) {
+                                       fail("wrong exception " + e);
+                               }
+                               // expected
+                       }
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
        @SuppressWarnings("unchecked")
        public void testValueStateRestoreWithWrongSerializers() {
                try {

Reply via email to