This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new f04d2cb  [FLINK-11420][datastream] Fix duplicate and createInstance 
methods of CoGroupedStreams.UnionSerializer
f04d2cb is described below

commit f04d2cb87775b42aa54161ffb3bdaeb1f9d4af3c
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Mon Mar 11 12:21:29 2019 +0100

    [FLINK-11420][datastream] Fix duplicate and createInstance methods of 
CoGroupedStreams.UnionSerializer
    
    UnionSerializer did not perform a proper duplication of inner serializers. 
It also violated the assumption that createInstance never produces null.
---
 .../streaming/api/datastream/CoGroupedStreams.java | 18 ++++-
 .../api/datastream/UnionSerializerTest.java        | 81 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 19d9783..047d0b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -504,7 +504,9 @@ public class CoGroupedStreams<T1, T2> {
                }
        }
 
-       private static class UnionSerializer<T1, T2> extends 
TypeSerializer<TaggedUnion<T1, T2>> {
+       @VisibleForTesting
+       @Internal
+       static class UnionSerializer<T1, T2> extends 
TypeSerializer<TaggedUnion<T1, T2>> {
                private static final long serialVersionUID = 1L;
 
                private final TypeSerializer<T1> oneSerializer;
@@ -522,12 +524,22 @@ public class CoGroupedStreams<T1, T2> {
 
                @Override
                public TypeSerializer<TaggedUnion<T1, T2>> duplicate() {
-                       return this;
+                       TypeSerializer<T1> duplicateOne = 
oneSerializer.duplicate();
+                       TypeSerializer<T2> duplicateTwo = 
twoSerializer.duplicate();
+
+                       // compare reference of nested serializers, if same 
instances returned, we can reuse
+                       // this instance as well
+                       if (duplicateOne != oneSerializer || duplicateTwo != 
twoSerializer) {
+                               return new UnionSerializer<>(duplicateOne, 
duplicateTwo);
+                       } else {
+                               return this;
+                       }
                }
 
                @Override
                public TaggedUnion<T1, T2> createInstance() {
-                       return null;
+                       //we arbitrarily always create instance of one
+                       return TaggedUnion.one(oneSerializer.createInstance());
                }
 
                @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java
new file mode 100644
index 0000000..29304bc
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
+import 
org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer;
+import org.apache.flink.testutils.DeeplyEqualsChecker;
+
+/**
+ * Serializer tests for {@link UnionSerializer}.
+ */
+public class UnionSerializerTest extends 
SerializerTestBase<TaggedUnion<Object, Object>> {
+
+       public UnionSerializerTest() {
+               super(new DeeplyEqualsChecker()
+                       .withCustomCheck(
+                               (o1, o2) -> o1 instanceof TaggedUnion && o2 
instanceof TaggedUnion,
+                               (o1, o2, checker) -> {
+                                       TaggedUnion union1 = (TaggedUnion) o1;
+                                       TaggedUnion union2 = (TaggedUnion) o2;
+
+                                       if (union1.isOne() && union2.isOne()) {
+                                               return 
checker.deepEquals(union1.getOne(), union2.getOne());
+                                       } else if (union1.isTwo() && 
union2.isTwo()) {
+                                               return 
checker.deepEquals(union1.getTwo(), union2.getTwo());
+                                       } else {
+                                               return false;
+                                       }
+                               }
+                       ));
+       }
+
+       @Override
+       protected TypeSerializer<TaggedUnion<Object, Object>> 
createSerializer() {
+               return new UnionSerializer<>(
+                       new KryoSerializer<>(Object.class, new 
ExecutionConfig()),
+                       new KryoSerializer<>(Object.class, new 
ExecutionConfig())
+               );
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected Class<TaggedUnion<Object, Object>> getTypeClass() {
+               return (Class<TaggedUnion<Object, Object>>) (Class<?>) 
TaggedUnion.class;
+       }
+
+       @Override
+       @SuppressWarnings("unchecked")
+       protected TaggedUnion<Object, Object>[] getTestData() {
+               return new TaggedUnion[]{
+                       TaggedUnion.one(1),
+                       TaggedUnion.two("A"),
+                       TaggedUnion.one("C")
+               };
+       }
+}

Reply via email to