This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6fb65a0f9dd1fe4a985a994424f6cd2026d40504 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Tue Jan 24 10:43:22 2023 +0800 [FLINK-30613][serializer] Migrate state test to implement new method of resolving schema compatibility --- .../state/InternalPriorityQueueTestBase.java | 8 +-- .../state/StateBackendMigrationTestBase.java | 10 +-- .../runtime/state/StateSerializerProviderTest.java | 2 +- .../runtime/testutils/statemigration/TestType.java | 80 ++++++++++++++++++++-- .../V1TestTypeSerializerSnapshot.java | 22 +++--- .../V2TestTypeSerializerSnapshot.java | 23 ++++--- 6 files changed, 111 insertions(+), 34 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java index 1315dee59dd..6856f02d07c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java @@ -529,13 +529,13 @@ public abstract class InternalPriorityQueueTestBase { @Override public TypeSerializerSchemaCompatibility<TestElement> resolveSchemaCompatibility( - TypeSerializer<TestElement> newSerializer) { - if (!(newSerializer instanceof TestElementSerializer)) { + TypeSerializerSnapshot<TestElement> oldSerializerSnapshot) { + if (!(oldSerializerSnapshot instanceof Snapshot)) { return TypeSerializerSchemaCompatibility.incompatible(); } - TestElementSerializer testElementSerializer = (TestElementSerializer) newSerializer; - return (revision <= testElementSerializer.getRevision()) + Snapshot snapshot = (Snapshot) oldSerializerSnapshot; + return (snapshot.getRevision() <= revision) ? TypeSerializerSchemaCompatibility.compatibleAsIs() : TypeSerializerSchemaCompatibility.incompatible(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java index 56b8f9b03f3..731961b8289 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java @@ -1224,8 +1224,8 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(StateMigrationException.class), - e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); + e -> assertThat(e).isInstanceOf(IllegalStateException.class), + e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); } @TestTemplate @@ -1245,8 +1245,8 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { testKeyedValueStateUpgrade( initialAccessDescriptor, newAccessDescriptorAfterRestore)) .satisfiesAnyOf( - e -> assertThat(e).isInstanceOf(IllegalStateException.class), - e -> assertThat(e).hasCauseInstanceOf(IllegalStateException.class)); + e -> assertThat(e).isInstanceOf(StateMigrationException.class), + e -> assertThat(e).hasCauseInstanceOf(StateMigrationException.class)); } // ------------------------------------------------------------------------------- @@ -1344,7 +1344,7 @@ public abstract class StateBackendMigrationTestBase<B extends StateBackend> { @Override public TypeSerializerSchemaCompatibility<VoidNamespace> resolveSchemaCompatibility( - TypeSerializer<VoidNamespace> newSerializer) { + TypeSerializerSnapshot<VoidNamespace> oldSerializerSnapshot) { return TypeSerializerSchemaCompatibility.compatibleAsIs(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java index 100dd694bc8..65908573fac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSerializerProviderTest.java @@ -336,7 +336,7 @@ class StateSerializerProviderTest { @Override public TypeSerializerSchemaCompatibility<String> resolveSchemaCompatibility( - TypeSerializer<String> newSerializer) { + TypeSerializerSnapshot<String> oldSerializerSnapshot) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java index da70fb42b52..aa2a4148c6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/TestType.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.testutils.statemigration; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -138,7 +139,7 @@ public class TestType extends AbstractHeapPriorityQueueElement @Override public TypeSerializerSnapshot<TestType> snapshotConfiguration() { - return new V1TestTypeSerializerSnapshot(); + return new V2TestTypeSerializerSnapshot(); } } @@ -164,8 +165,47 @@ public class TestType extends AbstractHeapPriorityQueueElement @Override public TypeSerializerSnapshot<TestType> snapshotConfiguration() { - throw new UnsupportedOperationException( - "The serializer should have been reconfigured as a new instance; shouldn't be used."); + return new ReconfigurationRequiringTestTypeSerializerSnapshot(); + } + + public static class ReconfigurationRequiringTestTypeSerializerSnapshot + implements TypeSerializerSnapshot<TestType> { + + @Override + public int getCurrentVersion() { + return 0; + } + + @Override + public void writeSnapshot(DataOutputView out) { + // do nothing + } + + @Override + public void readSnapshot( + int readVersion, DataInputView in, ClassLoader userCodeClassLoader) { + // do nothing + } + + @Override + public TypeSerializer<TestType> restoreSerializer() { + return new ReconfigurationRequiringTestTypeSerializer(); + } + + @Override + public TypeSerializerSchemaCompatibility<TestType> resolveSchemaCompatibility( + TypeSerializerSnapshot<TestType> oldSerializerSnapshot) { + // mimic the reconfiguration by just re-instantiating the correct serializer + if (oldSerializerSnapshot instanceof V1TestTypeSerializerSnapshot) { + return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer( + new TestType.V1TestTypeSerializer()); + } else if (oldSerializerSnapshot instanceof V2TestTypeSerializerSnapshot) { + return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer( + new TestType.V2TestTypeSerializer()); + } else { + return TypeSerializerSchemaCompatibility.incompatible(); + } + } } } @@ -188,8 +228,38 @@ public class TestType extends AbstractHeapPriorityQueueElement @Override public TypeSerializerSnapshot<TestType> snapshotConfiguration() { - throw new UnsupportedOperationException( - "This is an incompatible serializer; shouldn't be used."); + return new IncompatibleTestTypeSerializerSnapshot(); + } + + public static class IncompatibleTestTypeSerializerSnapshot + implements TypeSerializerSnapshot<TestType> { + + @Override + public int getCurrentVersion() { + return 0; + } + + @Override + public void writeSnapshot(DataOutputView out) { + // do nothing + } + + @Override + public void readSnapshot( + int readVersion, DataInputView in, ClassLoader userCodeClassLoader) { + // do nothing + } + + @Override + public TypeSerializer<TestType> restoreSerializer() { + return new IncompatibleTestTypeSerializer(); + } + + @Override + public TypeSerializerSchemaCompatibility<TestType> resolveSchemaCompatibility( + TypeSerializerSnapshot<TestType> oldSerializerSnapshot) { + return TypeSerializerSchemaCompatibility.incompatible(); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java index b129d0455cc..22742b245c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V1TestTypeSerializerSnapshot.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.testutils.statemigration.TestType.IncompatibleTestTypeSerializer.IncompatibleTestTypeSerializerSnapshot; +import org.apache.flink.runtime.testutils.statemigration.TestType.ReconfigurationRequiringTestTypeSerializer.ReconfigurationRequiringTestTypeSerializerSnapshot; import java.io.IOException; @@ -36,16 +38,18 @@ public class V1TestTypeSerializerSnapshot implements TypeSerializerSnapshot<Test @Override public TypeSerializerSchemaCompatibility<TestType> resolveSchemaCompatibility( - TypeSerializer<TestType> newSerializer) { - if (newSerializer instanceof TestType.V1TestTypeSerializer) { + TypeSerializerSnapshot<TestType> oldSerializerSnapshot) { + if (oldSerializerSnapshot instanceof V1TestTypeSerializerSnapshot) { return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } else if (newSerializer instanceof TestType.V2TestTypeSerializer) { - return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); - } else if (newSerializer instanceof TestType.ReconfigurationRequiringTestTypeSerializer) { - // we mimic the reconfiguration by just re-instantiating the correct serializer - return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer( - new TestType.V1TestTypeSerializer()); - } else if (newSerializer instanceof TestType.IncompatibleTestTypeSerializer) { + } else if ( + // old ReconfigurationRequiringTestTypeSerializerSnapshot cannot be compatible with + // any new TypeSerializerSnapshots + oldSerializerSnapshot instanceof ReconfigurationRequiringTestTypeSerializerSnapshot + // migrating from V2 -> V1 is not supported + || oldSerializerSnapshot instanceof V2TestTypeSerializerSnapshot + // IncompatibleTestTypeSerializerSnapshot cannot be compatible with any + // TypeSerializerSnapshots + || oldSerializerSnapshot instanceof IncompatibleTestTypeSerializerSnapshot) { return TypeSerializerSchemaCompatibility.incompatible(); } else { throw new IllegalStateException("Unknown serializer class for TestType."); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java index 9349c6aa679..5cf819efcf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/statemigration/V2TestTypeSerializerSnapshot.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.testutils.statemigration.TestType.IncompatibleTestTypeSerializer.IncompatibleTestTypeSerializerSnapshot; +import org.apache.flink.runtime.testutils.statemigration.TestType.ReconfigurationRequiringTestTypeSerializer.ReconfigurationRequiringTestTypeSerializerSnapshot; import java.io.IOException; @@ -36,18 +38,19 @@ public class V2TestTypeSerializerSnapshot implements TypeSerializerSnapshot<Test @Override public TypeSerializerSchemaCompatibility<TestType> resolveSchemaCompatibility( - TypeSerializer<TestType> newSerializer) { - if (newSerializer instanceof TestType.V2TestTypeSerializer) { + TypeSerializerSnapshot<TestType> oldSerializerSnapshot) { + if (oldSerializerSnapshot instanceof V2TestTypeSerializerSnapshot) { return TypeSerializerSchemaCompatibility.compatibleAsIs(); - } else if (newSerializer instanceof TestType.ReconfigurationRequiringTestTypeSerializer) { - // we mimic the reconfiguration by just re-instantiating the correct serializer - return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer( - new TestType.V2TestTypeSerializer()); + } else if (oldSerializerSnapshot instanceof V1TestTypeSerializerSnapshot) { + // Migrate from V1 to V2 is supported + return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); } else if ( - // migrating from V2 -> V1 is not supported - newSerializer instanceof TestType.V1TestTypeSerializer - || newSerializer instanceof TestType.IncompatibleTestTypeSerializer) { - + // old ReconfigurationRequiringTestTypeSerializerSnapshot cannot be compatible with + // any new TypeSerializerSnapshots + oldSerializerSnapshot instanceof ReconfigurationRequiringTestTypeSerializerSnapshot + // IncompatibleTestTypeSerializerSnapshot cannot be compatible with any + // TypeSerializerSnapshots + || oldSerializerSnapshot instanceof IncompatibleTestTypeSerializerSnapshot) { return TypeSerializerSchemaCompatibility.incompatible(); } else { throw new IllegalStateException("Unknown serializer class for TestType.");