[1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable
Repository: flink Updated Branches: refs/heads/release-1.3 625da00a5 -> 4eebf21e9 [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable This closes #3882. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7de22122 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7de22122 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7de22122 Branch: refs/heads/release-1.3 Commit: 7de221224ebf179581228ae2db7bd685468189da Parents: 625da00 Author: Tzu-Li (Gordon) TaiAuthored: Fri May 12 19:11:25 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Sat May 13 17:05:35 2017 +0800 -- .../state/DefaultOperatorStateBackend.java | 17 +++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 8 ++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 6 + .../state/heap/HeapKeyedStateBackend.java | 17 +++ .../runtime/state/MemoryStateBackendTest.java | 135 +++ .../runtime/state/OperatorStateBackendTest.java | 70 +- .../runtime/state/StateBackendTestBase.java | 2 +- 7 files changed, 251 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ab0c1f0..1d3af72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { // Recreate all PartitionableListStates from the meta info for (RegisteredOperatorBackendStateMetaInfo.Snapshot restoredMetaInfo : restoredMetaInfoSnapshots) { + + if (restoredMetaInfo.getPartitionStateSerializer() == null || + restoredMetaInfo.getPartitionStateSerializer() + instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + + // must fail now if the previous serializer cannot be restored because there is no serializer + // capable of reading previous state + // TODO when eager state registration is in place, we can try to get a convert deserializer + // TODO from the newly registered serializer instead of simply failing here + + throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]." + + " The previous serializer of the operator state must be present; the serializer could" + + " have been removed from the classpath, or its implementation have changed and could" + + " not be loaded. This is a temporary restriction that will be fixed in future versions."); + } + PartitionableListState listState = registeredStates.get(restoredMetaInfo.getName()); if (null == listState) { http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
[1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable
Repository: flink Updated Branches: refs/heads/master 7173774d0 -> 947c44e86 [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable This closes #3882. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c594af09 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c594af09 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c594af09 Branch: refs/heads/master Commit: c594af09767e2ef1e74dd8db187985460761b724 Parents: 7173774 Author: Tzu-Li (Gordon) TaiAuthored: Fri May 12 19:11:25 2017 +0800 Committer: Tzu-Li (Gordon) Tai Committed: Sat May 13 14:37:49 2017 +0800 -- .../state/DefaultOperatorStateBackend.java | 17 +++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 8 ++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 6 + .../state/heap/HeapKeyedStateBackend.java | 17 +++ .../runtime/state/MemoryStateBackendTest.java | 135 +++ .../runtime/state/OperatorStateBackendTest.java | 70 +- .../runtime/state/StateBackendTestBase.java | 2 +- 7 files changed, 251 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ab0c1f0..1d3af72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { // Recreate all PartitionableListStates from the meta info for (RegisteredOperatorBackendStateMetaInfo.Snapshot restoredMetaInfo : restoredMetaInfoSnapshots) { + + if (restoredMetaInfo.getPartitionStateSerializer() == null || + restoredMetaInfo.getPartitionStateSerializer() + instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + + // must fail now if the previous serializer cannot be restored because there is no serializer + // capable of reading previous state + // TODO when eager state registration is in place, we can try to get a convert deserializer + // TODO from the newly registered serializer instead of simply failing here + + throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]." + + " The previous serializer of the operator state must be present; the serializer could" + + " have been removed from the classpath, or its implementation have changed and could" + + " not be loaded. This is a temporary restriction that will be fixed in future versions."); + } + PartitionableListState listState = registeredStates.get(restoredMetaInfo.getName()); if (null == listState) { http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java