Repository: flink Updated Branches: refs/heads/release-0.8 ffc86f668 -> d9afefb0e
[FLINK-1978] Fix POJO deserialization for reuse objects with NULL fields Backported fix. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9afefb0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9afefb0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9afefb0 Branch: refs/heads/release-0.8 Commit: d9afefb0e6b6e5947276837fc2d6d55a6a185956 Parents: ffc86f6 Author: Fabian Hueske <fhue...@apache.org> Authored: Wed May 6 17:47:59 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu May 7 12:20:51 2015 +0200 ---------------------------------------------------------------------- .../java/typeutils/runtime/PojoSerializer.java | 19 +++++++++++++++++-- .../typeutils/runtime/PojoSerializerTest.java | 15 ++++++++++----- 2 files changed, 27 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d9afefb0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 15e8537..e205a9d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -167,7 +167,14 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { for (int i = 0; i < numFields; i++) { Object value = fields[i].get(from); if (value != null) { - Object copy = fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse)); + Object reuseValue = fields[i].get(reuse); + Object copy; + if(reuseValue != null) { + copy = fieldSerializers[i].copy(value, reuseValue); + } + else { + copy = fieldSerializers[i].copy(value); + } fields[i].set(reuse, copy); } else { @@ -256,7 +263,15 @@ public final class PojoSerializer<T> extends TypeSerializer<T> { if(isNull) { fields[i].set(reuse, null); } else { - Object field = fieldSerializers[i].deserialize(fields[i].get(reuse), source); + Object field; + Object reuseField = fields[i].get(reuse); + if(reuseField != null) { + field = fieldSerializers[i].deserialize(reuseField, source); + } + else { + field = fieldSerializers[i].deserialize(source); + } + fields[i].set(reuse, field); } } http://git-wip-us.apache.org/repos/asf/flink/blob/d9afefb0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index 641a902..a499241 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.api.java.typeutils.runtime; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Random; @@ -67,11 +68,13 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te Random rnd = new Random(874597969123412341L); return new TestUserClass[]{ - new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[]{1, 2, 3}, + new TestUserClass(rnd.nextInt(), "foo", rnd.nextDouble(), new int[]{1, 2, 3}, new Date(), new NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 11, 12})), - new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, + new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, null, new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})), - new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null) + new TestUserClass(rnd.nextInt(), null, rnd.nextDouble(), null, null, null), + new TestUserClass(rnd.nextInt(), "bar", rnd.nextDouble(), new int[]{4, 5, 6}, new Date(), + new NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 21, 22})) }; } @@ -82,17 +85,19 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te public String dumm2; public double dumm3; public int[] dumm4; + public Date dumm5; public NestedTestUserClass nestedClass; public TestUserClass() { } - public TestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4, NestedTestUserClass nestedClass) { + public TestUserClass(int dumm1, String dumm2, double dumm3, int[] dumm4, Date dumm5, NestedTestUserClass nestedClass) { this.dumm1 = dumm1; this.dumm2 = dumm2; this.dumm3 = dumm3; this.dumm4 = dumm4; + this.dumm5 = dumm5; this.nestedClass = nestedClass; } @@ -201,7 +206,7 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te fields[0] = result.get(0).getPosition(); TypeComparator<TestUserClass> pojoComp = pType.createComparator( fields, new boolean[]{true}, 0); - TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, new int[] {1,2,3}, new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3})); + TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, new int[] {1,2,3}, new Date(), new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3})); int pHash = pojoComp.hash(pojoTestRecord); Tuple1<String> tupleTest = new Tuple1<String>("haha");