This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9ec5d6729f5eb2770cc48a23cfafce9b3eb7018
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Wed Feb 27 16:52:06 2019 +0800

    [FLINK-11771] [core] Fix TypeSerializerSnapshot#readVersionedSnapshot for 
TypeSerializerSnapshots directly upgraded from TypeSerializerConfigSnapshot
---
 .../api/common/typeutils/TypeSerializerConfigSnapshot.java  |  2 +-
 .../flink/api/common/typeutils/TypeSerializerSnapshot.java  | 13 +++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
index f09b455..5d315ba 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 public abstract class TypeSerializerConfigSnapshot<T> extends 
VersionedIOReadableWritable implements TypeSerializerSnapshot<T> {
 
        /** Version / Magic number for the format that bridges between the old 
and new interface. */
-       private static final int ADAPTER_VERSION = 0x7a53c4f0;
+       static final int ADAPTER_VERSION = 0x7a53c4f0;
 
        /** The user code class loader; only relevant if this configuration 
instance was deserialized from binary form. */
        private ClassLoader userCodeClassLoader;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
index e004fa1..20fcbb7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java
@@ -24,6 +24,8 @@ import org.apache.flink.core.memory.DataOutputView;
 
 import java.io.IOException;
 
+import static 
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.ADAPTER_VERSION;
+
 /**
  * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link 
TypeSerializer}'s configuration.
  * The configuration snapshot of a serializer is persisted within checkpoints
@@ -147,7 +149,6 @@ public interface TypeSerializerSnapshot<T> {
                snapshot.writeSnapshot(out);
        }
 
-
        /**
         * Reads a snapshot from the stream, performing resolving
         *
@@ -157,7 +158,15 @@ public interface TypeSerializerSnapshot<T> {
                final TypeSerializerSnapshot<T> snapshot =
                                
TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);
 
-               final int version = in.readInt();
+               int version = in.readInt();
+
+               if (version == ADAPTER_VERSION && !(snapshot instanceof 
TypeSerializerConfigSnapshot)) {
+                       // the snapshot was upgraded directly in-place from a 
TypeSerializerConfigSnapshot;
+                       // read and drop the previously Java-serialized 
serializer, and get the actual correct read version.
+                       // NOTE: this implicitly assumes that the version was 
properly written before the actual snapshot content.
+                       TypeSerializerSerializationUtil.tryReadSerializer(in, 
cl, true);
+                       version = in.readInt();
+               }
                snapshot.readSnapshot(version, in, cl);
 
                return snapshot;

Reply via email to