[
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940376#comment-15940376
]
Greg Hogan commented on FLINK-6115:
-----------------------------------
[~lukehutch] and [~fhueske], I'm not sure that versioning is necessary nor that
the current behavior in regards to null values should be changed.
Allowing null values in tuples breaks the behavior of all user functions
depending on the old, safe behavior. The creator of {{null}} has called it his
[billion dollar
mistake|https://en.wikipedia.org/wiki/Tony_Hoare#Apologies_and_retractions].
Just because we can serialize {{null}} does not mean that we should. I see
Flink has an {{Either}} type but I don't see an {{Option}} type. Would this not
be a cleaner way to process values explicitly?
As for versioning, schemas should generally be versioned but a tuple is
literally a concatenation of values. Different representations (e.g. integer vs
variable-length integer) should often be different types.
Throwing NPE in tuple constructors misses that fields may be set between object
instantiation and serialization. Flink was designed for object reuse which
improves performance when billions to trillions of elements are serialized in
short order.
> Need more helpful error message when trying to serialize a tuple with a null
> field
> ----------------------------------------------------------------------------------
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the
> following, which has no information about where in the program the problem
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a
> flatMap (but I have dozens of them in my code). Surely there's a way to pull
> out the source file name and line number from the program DAG node when
> errors like this occur?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)