[FLINK-3617] [scala apis] Added null value check.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdce1f31 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdce1f31 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdce1f31 Branch: refs/heads/master Commit: fdce1f319c512fc845b64cbb7cbfb10f9d899021 Parents: c4626cb Author: Aleksandr Chermenin <aleksandr_cherme...@epam.com> Authored: Fri Dec 16 14:42:50 2016 +0300 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Jan 16 20:18:48 2017 +0100 ---------------------------------------------------------------------- .../flink/api/scala/typeutils/CaseClassSerializer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fdce1f31/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 625ee80..29b4952 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -20,7 +20,8 @@ package org.apache.flink.api.scala.typeutils import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase -import org.apache.flink.core.memory.{DataOutputView, DataInputView} +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.types.NullFieldException /** * Serializer for Case Classes. Creation and access is different from @@ -97,7 +98,13 @@ abstract class CaseClassSerializer[T <: Product]( var i = 0 while (i < arity) { val serializer = fieldSerializers(i).asInstanceOf[TypeSerializer[Any]] - serializer.serialize(value.productElement(i), target) + val o = value.productElement(i) + try + serializer.serialize(o, target) + catch { + case e: NullPointerException => + throw new NullFieldException(i, e) + } i += 1 } }