Repository: flink
Updated Branches:
  refs/heads/master 85b2f2706 -> 90be5774e


[FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate()

This also adds a test that verifies the deep copy.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90be5774
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90be5774
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90be5774

Branch: refs/heads/master
Commit: 90be5774e481af87355b9f475562180923039a93
Parents: 85b2f27
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Oct 13 13:11:42 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 13 15:01:38 2017 +0200

----------------------------------------------------------------------
 .../typeutils/runtime/TupleSerializerBase.java  |  8 ++++-
 .../scala/typeutils/CaseClassSerializer.scala   |  2 +-
 .../api/scala/runtime/TupleSerializerTest.scala | 33 ++++++++++++++++----
 3 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f12dcd9..3fb7def 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
@@ -43,7 +44,7 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
 
        protected final Class<T> tupleClass;
 
-       protected final TypeSerializer<Object>[] fieldSerializers;
+       protected TypeSerializer<Object>[] fieldSerializers;
 
        protected final int arity;
 
@@ -183,4 +184,9 @@ public abstract class TupleSerializerBase<T> extends 
TypeSerializer<T> {
        }
 
        protected abstract TupleSerializerBase<T> 
createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] 
fieldSerializers);
+
+       @VisibleForTesting
+       public TypeSerializer<Object>[] getFieldSerializers() {
+               return fieldSerializers;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
index c8222d6..c059913 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala
@@ -48,7 +48,7 @@ abstract class CaseClassSerializer[T <: Product](
     val result = super.clone().asInstanceOf[CaseClassSerializer[T]]
 
     // achieve a deep copy by duplicating the field serializers
-    result.fieldSerializers.transform(_.duplicate())
+    result.fieldSerializers = result.fieldSerializers.map(_.duplicate())
     result.fields = null
     result.instanceCreationFailed = false
 

http://git-wip-us.apache.org/repos/asf/flink/blob/90be5774/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index b210c99..01acc7c 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -20,23 +20,44 @@ package org.apache.flink.api.scala.runtime
 import java.util
 import java.util.Random
 
-import org.apache.flink.api.scala._
 import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import 
org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.util.StringUtils
-
 import org.joda.time.LocalDate
-
-import org.junit.Assert
-import org.junit.Test
+import org.junit.Assert._
+import org.junit.{Assert, Test}
 
 import scala.collection.JavaConverters._
 
 class TupleSerializerTest {
 
   @Test
+  def testProperDeepCopy(): Unit = {
+    val tpe = createTypeInformation[((String, Int), (Int, String))]
+
+    val originalSerializer =
+      tpe.createSerializer(new ExecutionConfig)
+        .asInstanceOf[CaseClassSerializer[((String, Int), (Int, String))]]
+    val duplicateSerializer = originalSerializer.duplicate()
+
+    duplicateSerializer.getFieldSerializers
+
+    // the list of child serializers must be duplicated
+    assertTrue(duplicateSerializer.getFieldSerializers ne 
originalSerializer.getFieldSerializers)
+
+    // each of the child serializers (which are themselves 
CaseClassSerializers) must be duplicated
+    assertTrue(
+      duplicateSerializer.getFieldSerializers()(0) ne 
originalSerializer.getFieldSerializers()(0))
+
+    assertTrue(
+      duplicateSerializer.getFieldSerializers()(1) ne 
originalSerializer.getFieldSerializers()(1))
+  }
+
+  @Test
   def testTuple1Int(): Unit = {
     val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), 
Tuple1(Int.MaxValue),
       Tuple1(Int.MinValue))

Reply via email to