This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aa802a4b2e0490b57afb2266cb8368158398380 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Tue Jan 22 17:52:56 2019 +0100 [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity don't match in CompositeTypeSerializerSnapshot This closes #7557. --- .../typeutils/CompositeTypeSerializerSnapshot.java | 10 ++++-- .../CompositeTypeSerializerSnapshotTest.java | 37 ++++++++++++++++++++-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java index 07c243d..e4666e0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java @@ -169,9 +169,13 @@ public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerialize return TypeSerializerSchemaCompatibility.incompatible(); } - return constructFinalSchemaCompatibilityResult( - getNestedSerializers(castedNewSerializer), - snapshots); + final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer); + // check that nested serializer arity remains identical; if not, short circuit result + if (newNestedSerializers.length != snapshots.length) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + + return constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots); } @Internal diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java index 12601c9..1ffbfc2 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java @@ -52,6 +52,7 @@ public class CompositeTypeSerializerSnapshotTest { TypeSerializerSchemaCompatibility<String> compatibility = snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( testNestedSerializers, + testNestedSerializers, OUTER_CONFIG, OUTER_CONFIG); @@ -71,6 +72,7 @@ public class CompositeTypeSerializerSnapshotTest { TypeSerializerSchemaCompatibility<String> compatibility = snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( testNestedSerializers, + testNestedSerializers, OUTER_CONFIG, OUTER_CONFIG); @@ -89,6 +91,7 @@ public class CompositeTypeSerializerSnapshotTest { TypeSerializerSchemaCompatibility<String> compatibility = snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( testNestedSerializers, + testNestedSerializers, OUTER_CONFIG, OUTER_CONFIG); @@ -114,6 +117,7 @@ public class CompositeTypeSerializerSnapshotTest { TypeSerializerSchemaCompatibility<String> compatibility = snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( testNestedSerializers, + testNestedSerializers, OUTER_CONFIG, OUTER_CONFIG); @@ -131,6 +135,7 @@ public class CompositeTypeSerializerSnapshotTest { TypeSerializerSchemaCompatibility<String> compatibility = snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( testNestedSerializers, + testNestedSerializers, INIT_OUTER_CONFIG, INCOMPAT_OUTER_CONFIG); @@ -139,12 +144,38 @@ public class CompositeTypeSerializerSnapshotTest { Assert.assertTrue(compatibility.isIncompatible()); } + @Test + public void testNestedFieldSerializerArityMismatchPrecedence() throws IOException { + final String OUTER_CONFIG = "outer-config"; + + final TypeSerializer<?>[] initialNestedSerializers = { + new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS), + }; + + final TypeSerializer<?>[] newNestedSerializers = { + new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS), + new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS), + new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS), + }; + + TypeSerializerSchemaCompatibility<String> compatibility = + snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( + initialNestedSerializers, + newNestedSerializers, + OUTER_CONFIG, + OUTER_CONFIG); + + // arity mismatch in the nested serializers should return incompatible as the result + Assert.assertTrue(compatibility.isIncompatible()); + } + private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore( - TypeSerializer<?>[] testNestedSerializers, + TypeSerializer<?>[] initialNestedSerializers, + TypeSerializer<?>[] newNestedSerializer, String initialOuterConfiguration, String newOuterConfiguration) throws IOException { TestCompositeTypeSerializer testSerializer = - new TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers); + new TestCompositeTypeSerializer(initialOuterConfiguration, initialNestedSerializers); TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration(); @@ -156,7 +187,7 @@ public class CompositeTypeSerializerSnapshotTest { in, Thread.currentThread().getContextClassLoader()); TestCompositeTypeSerializer newTestSerializer = - new TestCompositeTypeSerializer(newOuterConfiguration, testNestedSerializers); + new TestCompositeTypeSerializer(newOuterConfiguration, newNestedSerializer); return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer); }