[1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable

2017-05-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.3 625da00a5 -> 4eebf21e9


[FLINK-6565] Fail memory-backed state restores with meaningful message if 
previous serializer is unavailable

This closes #3882.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7de22122
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7de22122
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7de22122

Branch: refs/heads/release-1.3
Commit: 7de221224ebf179581228ae2db7bd685468189da
Parents: 625da00
Author: Tzu-Li (Gordon) Tai 
Authored: Fri May 12 19:11:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Sat May 13 17:05:35 2017 +0800

--
 .../state/DefaultOperatorStateBackend.java  |  17 +++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   8 ++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   6 +
 .../state/heap/HeapKeyedStateBackend.java   |  17 +++
 .../runtime/state/MemoryStateBackendTest.java   | 135 +++
 .../runtime/state/OperatorStateBackendTest.java |  70 +-
 .../runtime/state/StateBackendTestBase.java |   2 +-
 7 files changed, 251 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ab0c1f0..1d3af72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
// Recreate all PartitionableListStates from 
the meta info
for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot restoredMetaInfo : 
restoredMetaInfoSnapshots) {
+
+   if 
(restoredMetaInfo.getPartitionStateSerializer() == null ||
+   
restoredMetaInfo.getPartitionStateSerializer()
+   instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+   // must fail now if the 
previous serializer cannot be restored because there is no serializer
+   // capable of reading previous 
state
+   // TODO when eager state 
registration is in place, we can try to get a convert deserializer
+   // TODO from the newly 
registered serializer instead of simply failing here
+
+   throw new IOException("Unable 
to restore operator state [" + restoredMetaInfo.getName() + "]." +
+   " The previous 
serializer of the operator state must be present; the serializer could" +
+   " have been removed 
from the classpath, or its implementation have changed and could" +
+   " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
+   }
+
PartitionableListState listState = 
registeredStates.get(restoredMetaInfo.getName());
 
if (null == listState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7de22122/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 

[1/3] flink git commit: [FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable

2017-05-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master 7173774d0 -> 947c44e86


[FLINK-6565] Fail memory-backed state restores with meaningful message if 
previous serializer is unavailable

This closes #3882.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c594af09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c594af09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c594af09

Branch: refs/heads/master
Commit: c594af09767e2ef1e74dd8db187985460761b724
Parents: 7173774
Author: Tzu-Li (Gordon) Tai 
Authored: Fri May 12 19:11:25 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Sat May 13 14:37:49 2017 +0800

--
 .../state/DefaultOperatorStateBackend.java  |  17 +++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   8 ++
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   6 +
 .../state/heap/HeapKeyedStateBackend.java   |  17 +++
 .../runtime/state/MemoryStateBackendTest.java   | 135 +++
 .../runtime/state/OperatorStateBackendTest.java |  70 +-
 .../runtime/state/StateBackendTestBase.java |   2 +-
 7 files changed, 251 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index ab0c1f0..1d3af72 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements 
OperatorStateBackend {
 
// Recreate all PartitionableListStates from 
the meta info
for 
(RegisteredOperatorBackendStateMetaInfo.Snapshot restoredMetaInfo : 
restoredMetaInfoSnapshots) {
+
+   if 
(restoredMetaInfo.getPartitionStateSerializer() == null ||
+   
restoredMetaInfo.getPartitionStateSerializer()
+   instanceof 
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) {
+
+   // must fail now if the 
previous serializer cannot be restored because there is no serializer
+   // capable of reading previous 
state
+   // TODO when eager state 
registration is in place, we can try to get a convert deserializer
+   // TODO from the newly 
registered serializer instead of simply failing here
+
+   throw new IOException("Unable 
to restore operator state [" + restoredMetaInfo.getName() + "]." +
+   " The previous 
serializer of the operator state must be present; the serializer could" +
+   " have been removed 
from the classpath, or its implementation have changed and could" +
+   " not be loaded. This 
is a temporary restriction that will be fixed in future versions.");
+   }
+
PartitionableListState listState = 
registeredStates.get(restoredMetaInfo.getName());
 
if (null == listState) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java