Repository: flink Updated Branches: refs/heads/master 85b2f2706 -> 90be5774e
[FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate() This also adds a test that verifies the deep copy. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90be5774 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90be5774 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90be5774 Branch: refs/heads/master Commit: 90be5774e481af87355b9f475562180923039a93 Parents: 85b2f27 Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 13 13:11:42 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 13 15:01:38 2017 +0200 ---------------------------------------------------------------------- .../typeutils/runtime/TupleSerializerBase.java | 8 ++++- .../scala/typeutils/CaseClassSerializer.scala | 2 +- .../api/scala/runtime/TupleSerializerTest.scala | 33 ++++++++++++++++---- 3 files changed, 35 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index f12dcd9..3fb7def 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.typeutils.runtime; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; @@ -43,7 +44,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { protected final Class<T> tupleClass; - protected final TypeSerializer<Object>[] fieldSerializers; + protected TypeSerializer<Object>[] fieldSerializers; protected final int arity; @@ -183,4 +184,9 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> { } protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers); + + @VisibleForTesting + public TypeSerializer<Object>[] getFieldSerializers() { + return fieldSerializers; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index c8222d6..c059913 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -48,7 +48,7 @@ abstract class CaseClassSerializer[T <: Product]( val result = super.clone().asInstanceOf[CaseClassSerializer[T]] // achieve a deep copy by duplicating the field serializers - result.fieldSerializers.transform(_.duplicate()) + result.fieldSerializers = result.fieldSerializers.map(_.duplicate()) result.fields = null result.instanceCreationFailed = false http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala index b210c99..01acc7c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala @@ -20,23 +20,44 @@ package org.apache.flink.api.scala.runtime import java.util import java.util.Random -import org.apache.flink.api.scala._ import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.TupleTypeInfoBase import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._ -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.typeutils.CaseClassSerializer import org.apache.flink.util.StringUtils - import org.joda.time.LocalDate - -import org.junit.Assert -import org.junit.Test +import org.junit.Assert._ +import org.junit.{Assert, Test} import scala.collection.JavaConverters._ class TupleSerializerTest { @Test + def testProperDeepCopy(): Unit = { + val tpe = createTypeInformation[((String, Int), (Int, String))] + + val originalSerializer = + tpe.createSerializer(new ExecutionConfig) + .asInstanceOf[CaseClassSerializer[((String, Int), (Int, String))]] + val duplicateSerializer = originalSerializer.duplicate() + + duplicateSerializer.getFieldSerializers + + // the list of child serializers must be duplicated + assertTrue(duplicateSerializer.getFieldSerializers ne originalSerializer.getFieldSerializers) + + // each of the child serializers (which are themselves CaseClassSerializers) must be duplicated + assertTrue( + duplicateSerializer.getFieldSerializers()(0) ne originalSerializer.getFieldSerializers()(0)) + + assertTrue( + duplicateSerializer.getFieldSerializers()(1) ne originalSerializer.getFieldSerializers()(1)) + } + + @Test def testTuple1Int(): Unit = { val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue), Tuple1(Int.MinValue))
