[FLINK-6566] [core] More restricted interface for VersionedIOReadableWritable 
hooks

This commit makes the method hooks for defining compatibile
serialization versions of VersionedIOReadableWritables more restricted.

Functionally everything remains the same, but with lesser space for
error-prone user implementations. It also allows for a better error
message to indicate version mismatch.

This closes #3883.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ac5f842
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ac5f842
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ac5f842

Branch: refs/heads/release-1.3
Commit: 1ac5f8421cb289c5e52172f1ecc854e1c9252b23
Parents: 7de2212
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Fri May 12 20:06:01 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Sat May 13 17:05:45 2017 +0800

----------------------------------------------------------------------
 .../typeutils/TypeSerializerConfigSnapshot.java | 23 ---------
 .../core/io/VersionedIOReadableWritable.java    | 53 ++++++++++++--------
 .../flink/core/io/VersionedIOWriteableTest.java | 16 ++----
 .../state/KeyedBackendSerializationProxy.java   | 20 ++------
 .../OperatorBackendSerializationProxy.java      | 17 ++-----
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 6 files changed, 42 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index 27369b9..389d141 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.util.Preconditions;
 
@@ -51,22 +50,6 @@ public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWr
        /** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
        private ClassLoader userCodeClassLoader;
 
-       /** The snapshot version of this configuration. */
-       private Integer snapshotVersion;
-
-       /**
-        * Returns the version of the configuration at the time its snapshot 
was taken.
-        *
-        * @return the snapshot configuration's version.
-        */
-       public int getSnapshotVersion() {
-               if (snapshotVersion == null) {
-                       return getVersion();
-               } else {
-                       return snapshotVersion;
-               }
-       }
-
        /**
         * Set the user code class loader.
         * Only relevant if this configuration instance was deserialized from 
binary form.
@@ -91,12 +74,6 @@ public abstract class TypeSerializerConfigSnapshot extends 
VersionedIOReadableWr
                return userCodeClassLoader;
        }
 
-       @Override
-       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
-               super.resolveVersionRead(foundVersion);
-               this.snapshotVersion = foundVersion;
-       }
-
        public abstract boolean equals(Object obj);
 
        public abstract int hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
 
b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
index bad9cef..b4a0b2f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.core.io;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 /**
  * This is the abstract base class for {@link IOReadableWritable} which allows 
to differentiate between serialization
  * versions. Concrete subclasses should typically override the {@link 
#write(DataOutputView)} and
  * {@link #read(DataInputView)}, thereby calling super to ensure version 
checking.
  */
-@PublicEvolving
+@Internal
 public abstract class VersionedIOReadableWritable implements 
IOReadableWritable, Versioned {
 
+       private int readVersion = Integer.MIN_VALUE;
+
        @Override
        public void write(DataOutputView out) throws IOException {
                out.writeInt(getVersion());
@@ -39,34 +42,42 @@ public abstract class VersionedIOReadableWritable 
implements IOReadableWritable,
 
        @Override
        public void read(DataInputView in) throws IOException {
-               int foundVersion = in.readInt();
-               resolveVersionRead(foundVersion);
+               this.readVersion = in.readInt();
+               resolveVersionRead(readVersion);
        }
 
        /**
-        * This method is a hook to react on the version tag that we find 
during read. This can also be used to initialize
-        * further read logic w.r.t. the version at hand.
-        * Default implementation of this method just checks the compatibility 
of a version number against the own version.
+        * Returns the found serialization version. If this instance was not 
read from serialized bytes
+        * but simply instantiated, then the current version is returned.
         *
-        * @param foundVersion the version found from reading the input stream
-        * @throws VersionMismatchException thrown when serialization versions 
mismatch
+        * @return the read serialization version, or the current version if 
the instance was not read from bytes.
         */
-       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
-               if (!isCompatibleVersion(foundVersion)) {
-                       int expectedVersion = getVersion();
-                       throw new VersionMismatchException(
-                                       "Incompatible version: found " + 
foundVersion + ", required " + expectedVersion);
-               }
+       public int getReadVersion() {
+               return (readVersion == Integer.MIN_VALUE) ? getVersion() : 
readVersion;
        }
 
        /**
-        * Checks for compatibility between this and the found version. 
Subclasses can override this methods in case of
-        * intended backwards backwards compatibility.
+        * Returns the compatible version values.
         *
-        * @param version version number to compare against.
-        * @return true, iff this is compatible to the passed version.
+        * <p>By default, the base implementation recognizes only the current 
version (identified by {@link #getVersion()})
+        * as compatible. This method can be used as a hook and may be 
overridden to identify more compatible versions.
+        *
+        * @return an array of integers representing the compatible version 
values.
         */
-       public boolean isCompatibleVersion(int version) {
-               return getVersion() == version;
+       public int[] getCompatibleVersions() {
+               return new int[] {getVersion()};
+       }
+
+       private void resolveVersionRead(int readVersion) throws 
VersionMismatchException {
+
+               int[] compatibleVersions = getCompatibleVersions();
+               for (int compatibleVersion : compatibleVersions) {
+                       if (compatibleVersion == readVersion) {
+                               return;
+                       }
+               }
+
+               throw new VersionMismatchException(
+                       "Incompatible version: found " + readVersion + ", 
compatible versions are " + Arrays.toString(compatibleVersions));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
index b7b6d6f..ec5f792 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
@@ -65,8 +65,8 @@ public class VersionedIOWriteableTest {
 
                testWriteable = new TestWriteable(2) {
                        @Override
-                       public boolean isCompatibleVersion(int version) {
-                               return getVersion() >= version;
+                       public int[] getCompatibleVersions() {
+                               return new int[] {1, 2};
                        }
                };
                try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
@@ -130,19 +130,9 @@ public class VersionedIOWriteableTest {
                        this.data = in.readUTF();
                }
 
-               @Override
-               protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
-                       super.resolveVersionRead(foundVersion);
-               }
-
-               @Override
-               public boolean isCompatibleVersion(int version) {
-                       return super.isCompatibleVersion(version);
-               }
-
                public String getData() {
                        return data;
                }
        }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index a389c4f..a20628c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -41,7 +40,6 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
        private TypeSerializer<?> keySerializer;
        private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
stateMetaInfoSnapshots;
 
-       private int restoredVersion;
        private ClassLoader userCodeClassLoader;
 
        public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
@@ -57,8 +55,6 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                Preconditions.checkNotNull(stateMetaInfoSnapshots);
                Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
                this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
-
-               this.restoredVersion = VERSION;
        }
 
        public List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> 
getStateMetaInfoSnapshots() {
@@ -74,20 +70,10 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                return VERSION;
        }
 
-       public int getRestoredVersion() {
-               return restoredVersion;
-       }
-
-       @Override
-       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
-               super.resolveVersionRead(foundVersion);
-               this.restoredVersion = foundVersion;
-       }
-
        @Override
-       public boolean isCompatibleVersion(int version) {
+       public int[] getCompatibleVersions() {
                // we are compatible with version 3 (Flink 1.3.x) and version 1 
& 2 (Flink 1.2.x)
-               return super.isCompatibleVersion(version) || version == 2 || 
version == 1;
+               return new int[] {VERSION, 2, 1};
        }
 
        @Override
@@ -119,7 +105,7 @@ public class KeyedBackendSerializationProxy extends 
VersionedIOReadableWritable
                for (int i = 0; i < numKvStates; i++) {
                        stateMetaInfoSnapshots.add(
                                KeyedBackendStateMetaInfoSnapshotReaderWriters
-                                       .getReaderForVersion(restoredVersion, 
userCodeClassLoader)
+                                       .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
                                        .readStateMetaInfo(in));
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index 91d7aab..074d84e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.io.VersionMismatchException;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -39,8 +38,6 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
        private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> 
stateMetaInfoSnapshots;
        private ClassLoader userCodeClassLoader;
 
-       private int restoredVersion;
-
        public OperatorBackendSerializationProxy(ClassLoader 
userCodeClassLoader) {
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
        }
@@ -50,8 +47,6 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
 
                this.stateMetaInfoSnapshots = 
Preconditions.checkNotNull(stateMetaInfoSnapshots);
                Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= 
Short.MAX_VALUE);
-
-               this.restoredVersion = VERSION;
        }
 
        @Override
@@ -60,15 +55,9 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
        }
 
        @Override
-       protected void resolveVersionRead(int foundVersion) throws 
VersionMismatchException {
-               super.resolveVersionRead(foundVersion);
-               this.restoredVersion = foundVersion;
-       }
-
-       @Override
-       public boolean isCompatibleVersion(int version) {
+       public int[] getCompatibleVersions() {
                // we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-               return super.isCompatibleVersion(version) || version == 1;
+               return new int[] {VERSION, 1};
        }
 
        @Override
@@ -92,7 +81,7 @@ public class OperatorBackendSerializationProxy extends 
VersionedIOReadableWritab
                for (int i = 0; i < numKvStates; i++) {
                        stateMetaInfoSnapshots.add(
                                
OperatorBackendStateMetaInfoSnapshotReaderWriters
-                                       .getReaderForVersion(restoredVersion, 
userCodeClassLoader)
+                                       .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
                                        .readStateMetaInfo(in));
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1ac5f842/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index bc314df..11e7760 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -448,7 +448,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                StateTableByKeyGroupReader 
keyGroupReader =
                                                                
StateTableByKeyGroupReaders.readerForVersion(
                                                                                
stateTable,
-                                                                               
serializationProxy.getRestoredVersion());
+                                                                               
serializationProxy.getReadVersion());
 
                                                
keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
                                        }

Reply via email to