[ https://issues.apache.org/jira/browse/FLINK-14642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Victor Wong updated FLINK-14642: -------------------------------- Description: Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations and prone to error. *codes:* {code:java} stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null{code} *exception info:* {code:java} Caused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) {code} *suggestion:* Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g. {code:java} // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) } {code} was: Currently, TupleSerializer and CaseCassSerializer do not support serialize NULL values, which I think is acceptable. But not supporting to copy NULL values will cause the following codes to throw an exception, which I think is not matched with users' expectations. *codes:* {code:java} stream.map(xxx).filter(_ != null).xxx //the return type of the map function is Tuple and it may return null{code} *exception info:* {code:java} Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) {code} *suggestion:* Can we make the `copy` method of TupleSerializer/CaseClassSerializer to handle NULL values? e.g. {code:java} // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy def copy(from: T): T = { // handle NULL values. if(from == null) { return from } initArray() var i = 0 while (i < arity) { fields(i) = fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) i += 1 } createInstance(fields) } {code} > Flink TupleSerializer and CaseClassSerializer shoud support copy NULL values > ---------------------------------------------------------------------------- > > Key: FLINK-14642 > URL: https://issues.apache.org/jira/browse/FLINK-14642 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System > Affects Versions: 1.9.1 > Reporter: Victor Wong > Priority: Major > > Currently, TupleSerializer and CaseCassSerializer do not support serialize > NULL values, which I think is acceptable. But not supporting copy NULL values > will cause the following codes to throw an exception, which I think is not > matched with users' expectations and prone to error. > *codes:* > {code:java} > stream.map(xxx).filter(_ != null).xxx //the return type of the map function > is Tuple and it may return null{code} > > *exception info:* > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:92) > > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) > {code} > > *suggestion:* > Can we make the `copy` method of TupleSerializer/CaseClassSerializer to > handle NULL values? e.g. > {code:java} > // org.apache.flink.api.scala.typeutils.CaseClassSerializer#copy > def copy(from: T): T = { > // handle NULL values. > if(from == null) { > return from > } > initArray() > var i = 0 > while (i < arity) { > fields(i) = > fieldSerializers(i).copy(from.productElement(i).asInstanceOf[AnyRef]) > i += 1 > } > createInstance(fields) > } > {code} > > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)