This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e9ec5d6729f5eb2770cc48a23cfafce9b3eb7018 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Wed Feb 27 16:52:06 2019 +0800 [FLINK-11771] [core] Fix TypeSerializerSnapshot#readVersionedSnapshot for TypeSerializerSnapshots directly upgraded from TypeSerializerConfigSnapshot --- .../api/common/typeutils/TypeSerializerConfigSnapshot.java | 2 +- .../flink/api/common/typeutils/TypeSerializerSnapshot.java | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java index f09b455..5d315ba 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java @@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkState; public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadableWritable implements TypeSerializerSnapshot<T> { /** Version / Magic number for the format that bridges between the old and new interface. */ - private static final int ADAPTER_VERSION = 0x7a53c4f0; + static final int ADAPTER_VERSION = 0x7a53c4f0; /** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */ private ClassLoader userCodeClassLoader; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java index e004fa1..20fcbb7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.java @@ -24,6 +24,8 @@ import org.apache.flink.core.memory.DataOutputView; import java.io.IOException; +import static org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.ADAPTER_VERSION; + /** * A {@code TypeSerializerSnapshot} is a point-in-time view of a {@link TypeSerializer}'s configuration. * The configuration snapshot of a serializer is persisted within checkpoints @@ -147,7 +149,6 @@ public interface TypeSerializerSnapshot<T> { snapshot.writeSnapshot(out); } - /** * Reads a snapshot from the stream, performing resolving * @@ -157,7 +158,15 @@ public interface TypeSerializerSnapshot<T> { final TypeSerializerSnapshot<T> snapshot = TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl); - final int version = in.readInt(); + int version = in.readInt(); + + if (version == ADAPTER_VERSION && !(snapshot instanceof TypeSerializerConfigSnapshot)) { + // the snapshot was upgraded directly in-place from a TypeSerializerConfigSnapshot; + // read and drop the previously Java-serialized serializer, and get the actual correct read version. + // NOTE: this implicitly assumes that the version was properly written before the actual snapshot content. + TypeSerializerSerializationUtil.tryReadSerializer(in, cl, true); + version = in.readInt(); + } snapshot.readSnapshot(version, in, cl); return snapshot;