This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push: new f04d2cb [FLINK-11420][datastream] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer f04d2cb is described below commit f04d2cb87775b42aa54161ffb3bdaeb1f9d4af3c Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Mon Mar 11 12:21:29 2019 +0100 [FLINK-11420][datastream] Fix duplicate and createInstance methods of CoGroupedStreams.UnionSerializer UnionSerializer did not perform a proper duplication of inner serializers. It also violated the assumption that createInstance never produces null. --- .../streaming/api/datastream/CoGroupedStreams.java | 18 ++++- .../api/datastream/UnionSerializerTest.java | 81 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 3 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 19d9783..047d0b8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -504,7 +504,9 @@ public class CoGroupedStreams<T1, T2> { } } - private static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> { + @VisibleForTesting + @Internal + static class UnionSerializer<T1, T2> extends TypeSerializer<TaggedUnion<T1, T2>> { private static final long serialVersionUID = 1L; private final TypeSerializer<T1> oneSerializer; @@ -522,12 +524,22 @@ public class CoGroupedStreams<T1, T2> { @Override public TypeSerializer<TaggedUnion<T1, T2>> duplicate() { - return this; + TypeSerializer<T1> duplicateOne = oneSerializer.duplicate(); + TypeSerializer<T2> duplicateTwo = twoSerializer.duplicate(); + + // compare reference of nested serializers, if same instances returned, we can reuse + // this instance as well + if (duplicateOne != oneSerializer || duplicateTwo != twoSerializer) { + return new UnionSerializer<>(duplicateOne, duplicateTwo); + } else { + return this; + } } @Override public TaggedUnion<T1, T2> createInstance() { - return null; + //we arbitrarily always create instance of one + return TaggedUnion.one(oneSerializer.createInstance()); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java new file mode 100644 index 0000000..29304bc --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/UnionSerializerTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion; +import org.apache.flink.streaming.api.datastream.CoGroupedStreams.UnionSerializer; +import org.apache.flink.testutils.DeeplyEqualsChecker; + +/** + * Serializer tests for {@link UnionSerializer}. + */ +public class UnionSerializerTest extends SerializerTestBase<TaggedUnion<Object, Object>> { + + public UnionSerializerTest() { + super(new DeeplyEqualsChecker() + .withCustomCheck( + (o1, o2) -> o1 instanceof TaggedUnion && o2 instanceof TaggedUnion, + (o1, o2, checker) -> { + TaggedUnion union1 = (TaggedUnion) o1; + TaggedUnion union2 = (TaggedUnion) o2; + + if (union1.isOne() && union2.isOne()) { + return checker.deepEquals(union1.getOne(), union2.getOne()); + } else if (union1.isTwo() && union2.isTwo()) { + return checker.deepEquals(union1.getTwo(), union2.getTwo()); + } else { + return false; + } + } + )); + } + + @Override + protected TypeSerializer<TaggedUnion<Object, Object>> createSerializer() { + return new UnionSerializer<>( + new KryoSerializer<>(Object.class, new ExecutionConfig()), + new KryoSerializer<>(Object.class, new ExecutionConfig()) + ); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + @SuppressWarnings("unchecked") + protected Class<TaggedUnion<Object, Object>> getTypeClass() { + return (Class<TaggedUnion<Object, Object>>) (Class<?>) TaggedUnion.class; + } + + @Override + @SuppressWarnings("unchecked") + protected TaggedUnion<Object, Object>[] getTestData() { + return new TaggedUnion[]{ + TaggedUnion.one(1), + TaggedUnion.two("A"), + TaggedUnion.one("C") + }; + } +}