Repository: flink
Updated Branches:
  refs/heads/release-0.8 ffc86f668 -> d9afefb0e


[FLINK-1978] Fix POJO deserialization for reuse objects with NULL fields

Backported fix.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d9afefb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d9afefb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d9afefb0

Branch: refs/heads/release-0.8
Commit: d9afefb0e6b6e5947276837fc2d6d55a6a185956
Parents: ffc86f6
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed May 6 17:47:59 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 7 12:20:51 2015 +0200

----------------------------------------------------------------------
 .../java/typeutils/runtime/PojoSerializer.java   | 19 +++++++++++++++++--
 .../typeutils/runtime/PojoSerializerTest.java    | 15 ++++++++++-----
 2 files changed, 27 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d9afefb0/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 15e8537..e205a9d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -167,7 +167,14 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                        for (int i = 0; i < numFields; i++) {
                                Object value = fields[i].get(from);
                                if (value != null) {
-                                       Object copy = 
fieldSerializers[i].copy(fields[i].get(from), fields[i].get(reuse));
+                                       Object reuseValue = 
fields[i].get(reuse);
+                                       Object copy;
+                                       if(reuseValue != null) {
+                                               copy = 
fieldSerializers[i].copy(value, reuseValue);
+                                       }
+                                       else {
+                                               copy = 
fieldSerializers[i].copy(value);
+                                       }
                                        fields[i].set(reuse, copy);
                                }
                                else {
@@ -256,7 +263,15 @@ public final class PojoSerializer<T> extends 
TypeSerializer<T> {
                                if(isNull) {
                                        fields[i].set(reuse, null);
                                } else {
-                                       Object field = 
fieldSerializers[i].deserialize(fields[i].get(reuse), source);
+                                       Object field;
+                                       Object reuseField = 
fields[i].get(reuse);
+                                       if(reuseField != null) {
+                                               field = 
fieldSerializers[i].deserialize(reuseField, source);
+                                       }
+                                       else {
+                                               field = 
fieldSerializers[i].deserialize(source);
+                                       }
+
                                        fields[i].set(reuse, field);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9afefb0/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
index 641a902..a499241 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.typeutils.runtime;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.Random;
 
@@ -67,11 +68,13 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                Random rnd = new Random(874597969123412341L);
 
                return new TestUserClass[]{
-                               new TestUserClass(rnd.nextInt(), "foo", 
rnd.nextDouble(), new int[]{1, 2, 3},
+                               new TestUserClass(rnd.nextInt(), "foo", 
rnd.nextDouble(), new int[]{1, 2, 3}, new Date(),
                                                new 
NestedTestUserClass(rnd.nextInt(), "foo@boo", rnd.nextDouble(), new int[]{10, 
11, 12})),
-                               new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6},
+                               new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6}, null,
                                                new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22})),
-                               new TestUserClass(rnd.nextInt(), null, 
rnd.nextDouble(), null, null)
+                               new TestUserClass(rnd.nextInt(), null, 
rnd.nextDouble(), null, null, null),
+                               new TestUserClass(rnd.nextInt(), "bar", 
rnd.nextDouble(), new int[]{4, 5, 6}, new Date(),
+                                               new 
NestedTestUserClass(rnd.nextInt(), "bar@bas", rnd.nextDouble(), new int[]{20, 
21, 22}))
                };
 
        }
@@ -82,17 +85,19 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                public String dumm2;
                public double dumm3;
                public int[] dumm4;
+               public Date dumm5;
 
                public NestedTestUserClass nestedClass;
 
                public TestUserClass() {
                }
 
-               public TestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4, NestedTestUserClass nestedClass) {
+               public TestUserClass(int dumm1, String dumm2, double dumm3, 
int[] dumm4, Date dumm5, NestedTestUserClass nestedClass) {
                        this.dumm1 = dumm1;
                        this.dumm2 = dumm2;
                        this.dumm3 = dumm3;
                        this.dumm4 = dumm4;
+                       this.dumm5 = dumm5;
                        this.nestedClass = nestedClass;
                }
 
@@ -201,7 +206,7 @@ public class PojoSerializerTest extends 
SerializerTestBase<PojoSerializerTest.Te
                fields[0] = result.get(0).getPosition();
                TypeComparator<TestUserClass> pojoComp = 
pType.createComparator( fields, new boolean[]{true}, 0);
                
-               TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, 
new int[] {1,2,3}, new NestedTestUserClass(1, "haha", 4d, new int[] {5,4,3}));
+               TestUserClass pojoTestRecord = new TestUserClass(0, "abc", 3d, 
new int[] {1,2,3}, new Date(), new NestedTestUserClass(1, "haha", 4d, new int[] 
{5,4,3}));
                int pHash = pojoComp.hash(pojoTestRecord);
                
                Tuple1<String> tupleTest = new Tuple1<String>("haha");

Reply via email to