This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 30a8d2c9423d0ed9da4b80d41889e64de7ac1576 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Mon Feb 6 23:44:13 2023 +0800 [FLINK-30613][serializer] Migrate GenericTypeSerializerSnapshot to implement new method of resolving schema compatibility --- .../api/common/typeutils/GenericTypeSerializerSnapshot.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java index ac7e2f06f16..9cc6cfe60e4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/GenericTypeSerializerSnapshot.java @@ -81,13 +81,16 @@ public abstract class GenericTypeSerializerSnapshot<T, S extends TypeSerializer> @Override public final TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility( - TypeSerializer<T> newSerializer) { - if (!serializerClass().isInstance(newSerializer)) { + TypeSerializerSnapshot<T> oldSerializerSnapshot) { + if (!(oldSerializerSnapshot instanceof GenericTypeSerializerSnapshot)) { return TypeSerializerSchemaCompatibility.incompatible(); } - @SuppressWarnings("unchecked") - S casted = (S) newSerializer; - if (typeClass == getTypeClass(casted)) { + GenericTypeSerializerSnapshot<T, S> previousGenericTypeSerializerSnapshot = + (GenericTypeSerializerSnapshot<T, S>) oldSerializerSnapshot; + if (serializerClass() != previousGenericTypeSerializerSnapshot.serializerClass()) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + if (typeClass == previousGenericTypeSerializerSnapshot.typeClass) { return TypeSerializerSchemaCompatibility.compatibleAsIs(); } else { return TypeSerializerSchemaCompatibility.incompatible();