This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 42210e0895a4dd5aee4339049ec504dce61f983b Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Sat Jan 6 13:19:10 2024 +0800 [FLINK-30613][serializer] Migrate serializer tests to test new method of resolving schema compatibility --- .../flink/api/common/typeutils/SerializerTestBase.java | 2 +- .../common/typeutils/TypeSerializerUpgradeTestBase.java | 16 ++++++++++++---- .../table/runtime/typeutils/SerializerTestUtil.java | 5 ++++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 6dfdde50e6f..30d10e94095 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -154,7 +154,7 @@ public abstract class SerializerTestBase<T> { } TypeSerializerSchemaCompatibility<T> strategy = - restoredConfig.resolveSchemaCompatibility(getSerializer()); + getSerializer().snapshotConfiguration().resolveSchemaCompatibility(restoredConfig); final TypeSerializer<T> restoreSerializer; if (strategy.isCompatibleAsIs()) { restoreSerializer = restoredConfig.restoreSerializer(); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java index afd328f660f..b033c888314 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerUpgradeTestBase.java @@ -326,7 +326,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl testSpecification.verifier.createUpgradedSerializer(); TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility = - restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer); + upgradedSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredSerializerSnapshot); assertThat(upgradeCompatibility) .is( @@ -350,7 +352,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl testSpecification.verifier.createUpgradedSerializer(); TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility = - restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer); + upgradedSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredSerializerSnapshot); assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.") @@ -387,7 +391,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl testSpecification.verifier.createUpgradedSerializer(); TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility = - restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer); + upgradedSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredSerializerSnapshot); assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.") @@ -418,7 +424,9 @@ public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedEl testSpecification.verifier.createUpgradedSerializer(); TypeSerializerSchemaCompatibility<UpgradedElementT> upgradeCompatibility = - restoredSerializerSnapshot.resolveSchemaCompatibility(upgradedSerializer); + upgradedSerializer + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredSerializerSnapshot); assumeThat(upgradeCompatibility) .as( "This test only applies for test specifications that verify an upgraded serializer that is compatible as is.") diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerTestUtil.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerTestUtil.java index e0017a1967e..00d48528c87 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerTestUtil.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/SerializerTestUtil.java @@ -61,7 +61,10 @@ public class SerializerTestUtil { } TypeSerializerSchemaCompatibility<T> strategy = - restoredConfig.resolveSchemaCompatibility(serializerGetter.getSerializer()); + serializerGetter + .getSerializer() + .snapshotConfiguration() + .resolveSchemaCompatibility(restoredConfig); final TypeSerializer<T> restoredSerializer; if (strategy.isCompatibleAsIs()) { restoredSerializer = restoredConfig.restoreSerializer();