This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7aa802a4b2e0490b57afb2266cb8368158398380
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Tue Jan 22 17:52:56 2019 +0100

    [FLINK-11406] [core] Return INCOMPATIBLE when nested serializers arity 
don't match in CompositeTypeSerializerSnapshot
    
    This closes #7557.
---
 .../typeutils/CompositeTypeSerializerSnapshot.java | 10 ++++--
 .../CompositeTypeSerializerSnapshotTest.java       | 37 ++++++++++++++++++++--
 2 files changed, 41 insertions(+), 6 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
index 07c243d..e4666e0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshot.java
@@ -169,9 +169,13 @@ public abstract class CompositeTypeSerializerSnapshot<T, S 
extends TypeSerialize
                        return TypeSerializerSchemaCompatibility.incompatible();
                }
 
-               return constructFinalSchemaCompatibilityResult(
-                       getNestedSerializers(castedNewSerializer),
-                       snapshots);
+               final TypeSerializer<?>[] newNestedSerializers = 
getNestedSerializers(castedNewSerializer);
+               // check that nested serializer arity remains identical; if 
not, short circuit result
+               if (newNestedSerializers.length != snapshots.length) {
+                       return TypeSerializerSchemaCompatibility.incompatible();
+               }
+
+               return 
constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots);
        }
 
        @Internal
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
index 12601c9..1ffbfc2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotTest.java
@@ -52,6 +52,7 @@ public class CompositeTypeSerializerSnapshotTest {
                TypeSerializerSchemaCompatibility<String> compatibility =
                        
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
                                testNestedSerializers,
+                               testNestedSerializers,
                                OUTER_CONFIG,
                                OUTER_CONFIG);
 
@@ -71,6 +72,7 @@ public class CompositeTypeSerializerSnapshotTest {
                TypeSerializerSchemaCompatibility<String> compatibility =
                        
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
                                testNestedSerializers,
+                               testNestedSerializers,
                                OUTER_CONFIG,
                                OUTER_CONFIG);
 
@@ -89,6 +91,7 @@ public class CompositeTypeSerializerSnapshotTest {
                TypeSerializerSchemaCompatibility<String> compatibility =
                        
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
                                testNestedSerializers,
+                               testNestedSerializers,
                                OUTER_CONFIG,
                                OUTER_CONFIG);
 
@@ -114,6 +117,7 @@ public class CompositeTypeSerializerSnapshotTest {
                TypeSerializerSchemaCompatibility<String> compatibility =
                        
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
                                testNestedSerializers,
+                               testNestedSerializers,
                                OUTER_CONFIG,
                                OUTER_CONFIG);
 
@@ -131,6 +135,7 @@ public class CompositeTypeSerializerSnapshotTest {
                TypeSerializerSchemaCompatibility<String> compatibility =
                        
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
                                testNestedSerializers,
+                               testNestedSerializers,
                                INIT_OUTER_CONFIG,
                                INCOMPAT_OUTER_CONFIG);
 
@@ -139,12 +144,38 @@ public class CompositeTypeSerializerSnapshotTest {
                Assert.assertTrue(compatibility.isIncompatible());
        }
 
+       @Test
+       public void testNestedFieldSerializerArityMismatchPrecedence() throws 
IOException {
+               final String OUTER_CONFIG = "outer-config";
+
+               final TypeSerializer<?>[] initialNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               final TypeSerializer<?>[] newNestedSerializers = {
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+                       new 
NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
+               };
+
+               TypeSerializerSchemaCompatibility<String> compatibility =
+                       
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
+                               initialNestedSerializers,
+                               newNestedSerializers,
+                               OUTER_CONFIG,
+                               OUTER_CONFIG);
+
+               // arity mismatch in the nested serializers should return 
incompatible as the result
+               Assert.assertTrue(compatibility.isIncompatible());
+       }
+
        private TypeSerializerSchemaCompatibility<String> 
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
-                       TypeSerializer<?>[] testNestedSerializers,
+                       TypeSerializer<?>[] initialNestedSerializers,
+                       TypeSerializer<?>[] newNestedSerializer,
                        String initialOuterConfiguration,
                        String newOuterConfiguration) throws IOException {
                TestCompositeTypeSerializer testSerializer =
-                       new 
TestCompositeTypeSerializer(initialOuterConfiguration, testNestedSerializers);
+                       new 
TestCompositeTypeSerializer(initialOuterConfiguration, 
initialNestedSerializers);
 
                TypeSerializerSnapshot<String> testSerializerSnapshot = 
testSerializer.snapshotConfiguration();
 
@@ -156,7 +187,7 @@ public class CompositeTypeSerializerSnapshotTest {
                        in, Thread.currentThread().getContextClassLoader());
 
                TestCompositeTypeSerializer newTestSerializer =
-                       new TestCompositeTypeSerializer(newOuterConfiguration, 
testNestedSerializers);
+                       new TestCompositeTypeSerializer(newOuterConfiguration, 
newNestedSerializer);
                return 
testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
        }
 

Reply via email to