This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fb65a0f9dd1fe4a985a994424f6cd2026d40504
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Tue Jan 24 10:43:22 2023 +0800

    [FLINK-30613][serializer] Migrate state test to implement new method of 
resolving schema compatibility
---
 .../state/InternalPriorityQueueTestBase.java       |  8 +--
 .../state/StateBackendMigrationTestBase.java       | 10 +--
 .../runtime/state/StateSerializerProviderTest.java |  2 +-
 .../runtime/testutils/statemigration/TestType.java | 80 ++++++++++++++++++++--
 .../V1TestTypeSerializerSnapshot.java              | 22 +++---
 .../V2TestTypeSerializerSnapshot.java              | 23 ++++---
 6 files changed, 111 insertions(+), 34 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
index 1315dee59dd..6856f02d07c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
@@ -529,13 +529,13 @@ public abstract class InternalPriorityQueueTestBase {
 
             @Override
             public TypeSerializerSchemaCompatibility<TestElement> 
resolveSchemaCompatibility(
-                    TypeSerializer<TestElement> newSerializer) {
-                if (!(newSerializer instanceof TestElementSerializer)) {
+                    TypeSerializerSnapshot<TestElement> oldSerializerSnapshot) 
{
+                if (!(oldSerializerSnapshot instanceof Snapshot)) {
                     return TypeSerializerSchemaCompatibility.incompatible();
                 }
 
-                TestElementSerializer testElementSerializer = 
(TestElementSerializer) newSerializer;
-                return (revision <= testElementSerializer.getRevision())
+                Snapshot snapshot = (Snapshot) oldSerializerSnapshot;
+                return (snapshot.getRevision() <= revision)
                         ? TypeSerializerSchemaCompatibility.compatibleAsIs()
                         : TypeSerializerSchemaCompatibility.incompatible();
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 56b8f9b03f3..731961b8289 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -1224,8 +1224,8 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
                                 testKeyedValueStateUpgrade(
                                         initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
                 .satisfiesAnyOf(
-                        e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
-                        e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
+                        e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
+                        e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
     }
 
     @TestTemplate
@@ -1245,8 +1245,8 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
                                 testKeyedValueStateUpgrade(
                                         initialAccessDescriptor, 
newAccessDescriptorAfterRestore))
                 .satisfiesAnyOf(
-                        e -> 
assertThat(e).isInstanceOf(IllegalStateException.class),
-                        e -> 
assertThat(e).hasCauseInstanceOf(IllegalStateException.class));
+                        e -> 
assertThat(e).isInstanceOf(StateMigrationException.class),
+                        e -> 
assertThat(e).hasCauseInstanceOf(StateMigrationException.class));
     }
 
     // 
-------------------------------------------------------------------------------
@@ -1344,7 +1344,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends StateBackend> {
 
         @Override
         public TypeSerializerSchemaCompatibility<VoidNamespace> 
resolveSchemaCompatibility(
-                TypeSerializer<VoidNamespace> newSerializer) {
+                TypeSerializerSnapshot<VoidNamespace> oldSerializerSnapshot) {
             return TypeSerializerSchemaCompatibility.compatibleAsIs();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
index 100dd694bc8..65908573fac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java
@@ -336,7 +336,7 @@ class StateSerializerProviderTest {
 
         @Override
         public TypeSerializerSchemaCompatibility<String> 
resolveSchemaCompatibility(
-                TypeSerializer<String> newSerializer) {
+                TypeSerializerSnapshot<String> oldSerializerSnapshot) {
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
index da70fb42b52..aa2a4148c6a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.testutils.statemigration;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -138,7 +139,7 @@ public class TestType extends 
AbstractHeapPriorityQueueElement
 
         @Override
         public TypeSerializerSnapshot<TestType> snapshotConfiguration() {
-            return new V1TestTypeSerializerSnapshot();
+            return new V2TestTypeSerializerSnapshot();
         }
     }
 
@@ -164,8 +165,47 @@ public class TestType extends 
AbstractHeapPriorityQueueElement
 
         @Override
         public TypeSerializerSnapshot<TestType> snapshotConfiguration() {
-            throw new UnsupportedOperationException(
-                    "The serializer should have been reconfigured as a new 
instance; shouldn't be used.");
+            return new ReconfigurationRequiringTestTypeSerializerSnapshot();
+        }
+
+        public static class ReconfigurationRequiringTestTypeSerializerSnapshot
+                implements TypeSerializerSnapshot<TestType> {
+
+            @Override
+            public int getCurrentVersion() {
+                return 0;
+            }
+
+            @Override
+            public void writeSnapshot(DataOutputView out) {
+                // do nothing
+            }
+
+            @Override
+            public void readSnapshot(
+                    int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) {
+                // do nothing
+            }
+
+            @Override
+            public TypeSerializer<TestType> restoreSerializer() {
+                return new ReconfigurationRequiringTestTypeSerializer();
+            }
+
+            @Override
+            public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(
+                    TypeSerializerSnapshot<TestType> oldSerializerSnapshot) {
+                // mimic the reconfiguration by just re-instantiating the 
correct serializer
+                if (oldSerializerSnapshot instanceof 
V1TestTypeSerializerSnapshot) {
+                    return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+                            new TestType.V1TestTypeSerializer());
+                } else if (oldSerializerSnapshot instanceof 
V2TestTypeSerializerSnapshot) {
+                    return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
+                            new TestType.V2TestTypeSerializer());
+                } else {
+                    return TypeSerializerSchemaCompatibility.incompatible();
+                }
+            }
         }
     }
 
@@ -188,8 +228,38 @@ public class TestType extends 
AbstractHeapPriorityQueueElement
 
         @Override
         public TypeSerializerSnapshot<TestType> snapshotConfiguration() {
-            throw new UnsupportedOperationException(
-                    "This is an incompatible serializer; shouldn't be used.");
+            return new IncompatibleTestTypeSerializerSnapshot();
+        }
+
+        public static class IncompatibleTestTypeSerializerSnapshot
+                implements TypeSerializerSnapshot<TestType> {
+
+            @Override
+            public int getCurrentVersion() {
+                return 0;
+            }
+
+            @Override
+            public void writeSnapshot(DataOutputView out) {
+                // do nothing
+            }
+
+            @Override
+            public void readSnapshot(
+                    int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) {
+                // do nothing
+            }
+
+            @Override
+            public TypeSerializer<TestType> restoreSerializer() {
+                return new IncompatibleTestTypeSerializer();
+            }
+
+            @Override
+            public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(
+                    TypeSerializerSnapshot<TestType> oldSerializerSnapshot) {
+                return TypeSerializerSchemaCompatibility.incompatible();
+            }
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
index b129d0455cc..22742b245c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.runtime.testutils.statemigration.TestType.IncompatibleTestTypeSerializer.IncompatibleTestTypeSerializerSnapshot;
+import 
org.apache.flink.runtime.testutils.statemigration.TestType.ReconfigurationRequiringTestTypeSerializer.ReconfigurationRequiringTestTypeSerializerSnapshot;
 
 import java.io.IOException;
 
@@ -36,16 +38,18 @@ public class V1TestTypeSerializerSnapshot implements 
TypeSerializerSnapshot<Test
 
     @Override
     public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(
-            TypeSerializer<TestType> newSerializer) {
-        if (newSerializer instanceof TestType.V1TestTypeSerializer) {
+            TypeSerializerSnapshot<TestType> oldSerializerSnapshot) {
+        if (oldSerializerSnapshot instanceof V1TestTypeSerializerSnapshot) {
             return TypeSerializerSchemaCompatibility.compatibleAsIs();
-        } else if (newSerializer instanceof TestType.V2TestTypeSerializer) {
-            return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
-        } else if (newSerializer instanceof 
TestType.ReconfigurationRequiringTestTypeSerializer) {
-            // we mimic the reconfiguration by just re-instantiating the 
correct serializer
-            return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
-                    new TestType.V1TestTypeSerializer());
-        } else if (newSerializer instanceof 
TestType.IncompatibleTestTypeSerializer) {
+        } else if (
+        // old ReconfigurationRequiringTestTypeSerializerSnapshot cannot be 
compatible with
+        // any new  TypeSerializerSnapshots
+        oldSerializerSnapshot instanceof 
ReconfigurationRequiringTestTypeSerializerSnapshot
+                // migrating from V2 -> V1 is not supported
+                || oldSerializerSnapshot instanceof 
V2TestTypeSerializerSnapshot
+                // IncompatibleTestTypeSerializerSnapshot cannot be compatible 
with any
+                // TypeSerializerSnapshots
+                || oldSerializerSnapshot instanceof 
IncompatibleTestTypeSerializerSnapshot) {
             return TypeSerializerSchemaCompatibility.incompatible();
         } else {
             throw new IllegalStateException("Unknown serializer class for 
TestType.");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
index 9349c6aa679..5cf819efcf0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.runtime.testutils.statemigration.TestType.IncompatibleTestTypeSerializer.IncompatibleTestTypeSerializerSnapshot;
+import 
org.apache.flink.runtime.testutils.statemigration.TestType.ReconfigurationRequiringTestTypeSerializer.ReconfigurationRequiringTestTypeSerializerSnapshot;
 
 import java.io.IOException;
 
@@ -36,18 +38,19 @@ public class V2TestTypeSerializerSnapshot implements 
TypeSerializerSnapshot<Test
 
     @Override
     public TypeSerializerSchemaCompatibility<TestType> 
resolveSchemaCompatibility(
-            TypeSerializer<TestType> newSerializer) {
-        if (newSerializer instanceof TestType.V2TestTypeSerializer) {
+            TypeSerializerSnapshot<TestType> oldSerializerSnapshot) {
+        if (oldSerializerSnapshot instanceof V2TestTypeSerializerSnapshot) {
             return TypeSerializerSchemaCompatibility.compatibleAsIs();
-        } else if (newSerializer instanceof 
TestType.ReconfigurationRequiringTestTypeSerializer) {
-            // we mimic the reconfiguration by just re-instantiating the 
correct serializer
-            return 
TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(
-                    new TestType.V2TestTypeSerializer());
+        } else if (oldSerializerSnapshot instanceof 
V1TestTypeSerializerSnapshot) {
+            // Migrate from V1 to V2 is supported
+            return 
TypeSerializerSchemaCompatibility.compatibleAfterMigration();
         } else if (
-        // migrating from V2 -> V1 is not supported
-        newSerializer instanceof TestType.V1TestTypeSerializer
-                || newSerializer instanceof 
TestType.IncompatibleTestTypeSerializer) {
-
+        // old ReconfigurationRequiringTestTypeSerializerSnapshot cannot be 
compatible with
+        // any new  TypeSerializerSnapshots
+        oldSerializerSnapshot instanceof 
ReconfigurationRequiringTestTypeSerializerSnapshot
+                // IncompatibleTestTypeSerializerSnapshot cannot be compatible 
with any
+                // TypeSerializerSnapshots
+                || oldSerializerSnapshot instanceof 
IncompatibleTestTypeSerializerSnapshot) {
             return TypeSerializerSchemaCompatibility.incompatible();
         } else {
             throw new IllegalStateException("Unknown serializer class for 
TestType.");

Reply via email to