This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3e0650562f9117381cad87f57d6b4102d7b1421
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Tue Jan 24 00:56:54 2023 +0800

    [FLINK-30613][serializer] Migrate GenericArraySerializerSnapshot to 
implement new method of resolving schema compatibility
---
 .../base/GenericArraySerializerConfigSnapshot.java | 23 +++++----------
 .../base/GenericArraySerializerSnapshot.java       | 33 ++++++++++++++++++++--
 .../CompositeTypeSerializerUpgradeTest.java        | 16 +++++++++++
 3 files changed, 54 insertions(+), 18 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index 4b8af37a3d9..cb9369cc3e7 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -19,10 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -108,19 +105,13 @@ public final class 
GenericArraySerializerConfigSnapshot<C> implements TypeSerial
                 componentClass, nestedSnapshot.getRestoredNestedSerializer(0));
     }
 
-    @Override
-    public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
-            TypeSerializer<C[]> newSerializer) {
-        checkState(nestedSnapshot != null);
-
-        if (!(newSerializer instanceof GenericArraySerializer)) {
-            return TypeSerializerSchemaCompatibility.incompatible();
-        }
+    @Nullable
+    public TypeSerializerSnapshot<?>[] getNestedSerializerSnapshots() {
+        return nestedSnapshot == null ? null : 
nestedSnapshot.getNestedSerializerSnapshots();
+    }
 
-        // delegate to the new snapshot class
-        return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
-                newSerializer,
-                new GenericArraySerializerSnapshot<>(componentClass),
-                nestedSnapshot.getNestedSerializerSnapshots());
+    @Nullable
+    public Class<C> getComponentClass() {
+        return componentClass;
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index 15eb10eac42..bdb6cef922c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -19,12 +19,16 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
+import java.util.Objects;
 
 /**
  * Point-in-time configuration of a {@link GenericArraySerializer}.
@@ -76,10 +80,35 @@ public final class GenericArraySerializerSnapshot<C>
         this.componentClass = InstantiationUtil.resolveClassByName(in, 
userCodeClassLoader);
     }
 
+    @Override
+    public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(
+            TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
+        if (oldSerializerSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {
+            return 
CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+                    oldSerializerSnapshot,
+                    this,
+                    Objects.requireNonNull(
+                            ((GenericArraySerializerConfigSnapshot<C>) 
oldSerializerSnapshot)
+                                    .getNestedSerializerSnapshots()));
+        }
+        return super.resolveSchemaCompatibility(oldSerializerSnapshot);
+    }
+
     @Override
     protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(
-            GenericArraySerializer<C> newSerializer) {
-        return (this.componentClass == newSerializer.getComponentClass())
+            TypeSerializerSnapshot<C[]> oldSerializerSnapshot) {
+        Class<C> componentClass;
+        if (oldSerializerSnapshot instanceof GenericArraySerializerSnapshot) {
+            componentClass =
+                    ((GenericArraySerializerSnapshot<C>) 
oldSerializerSnapshot).componentClass;
+        } else if (oldSerializerSnapshot instanceof 
GenericArraySerializerConfigSnapshot) {
+            componentClass =
+                    ((GenericArraySerializerConfigSnapshot<C>) 
oldSerializerSnapshot)
+                            .getComponentClass();
+        } else {
+            return OuterSchemaCompatibility.INCOMPATIBLE;
+        }
+        return (this.componentClass == componentClass)
                 ? OuterSchemaCompatibility.COMPATIBLE_AS_IS
                 : OuterSchemaCompatibility.INCOMPATIBLE;
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
index 2b0d40e1753..6248a9d49aa 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerUpgradeTest.java
@@ -20,16 +20,19 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
+import 
org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
 import org.apache.flink.types.Either;
 
 import org.hamcrest.Matcher;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
 
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 import static org.hamcrest.Matchers.is;
 
 /** A {@link TypeSerializerUpgradeTestBase} for {@link 
GenericArraySerializer}. */
@@ -143,4 +146,17 @@ class CompositeTypeSerializerUpgradeTest extends 
TypeSerializerUpgradeTestBase<O
             return TypeSerializerMatchers.isCompatibleAsIs();
         }
     }
+
+    @Test
+    public void testUpgradeFromDeprecatedSnapshot() {
+        GenericArraySerializer<String> genericArraySerializer =
+                new GenericArraySerializer<>(String.class, 
StringSerializer.INSTANCE);
+        GenericArraySerializerConfigSnapshot<String> oldSnapshot =
+                new 
GenericArraySerializerConfigSnapshot<>(genericArraySerializer);
+        TypeSerializerSchemaCompatibility<String[]> schemaCompatibility =
+                genericArraySerializer
+                        .snapshotConfiguration()
+                        .resolveSchemaCompatibility(oldSnapshot);
+        assertThat(schemaCompatibility.isCompatibleAsIs()).isTrue();
+    }
 }

Reply via email to