[
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934071#comment-15934071
]
Luke Hutchison commented on FLINK-6115:
---------------------------------------
It's quite easy to end up with null values in tuples though -- it's currently
entirely valid to store values in tuples, they just can't be serialized -- and
you can't always predict when tuples will be serialized -- e.g. (I suspect)
code may work fine for a while in RAM on a single machine, but then when you
scale your code up to run on a cluster, or even when code decides to spill to
disk, suddenly it breaks. This is very poor behavior.
Even worse though is that it's exceedingly hard to tell where the problem is
caused, as shown in the stack trace I posted. Not only is the location where
the null value is set separated from the location where the problem is
triggered on serialization, but the serialization trace doesn't tell you
anything about where in the program the serializer was running, other than what
operation type it was contained within.
Another common scenario in which null values get set in tuples is doing an
outer join. Basically if the Flink policy is "we won't support nulls in tuples
as valid, ever", then you should not be able to produce a tuple as a result of
an outer join. More generally, you should simply throw an exception when the
constructor of a tuple is called with a null parameter, so that the user is
notified immediately of the invalid behavior, with the exception tied directly
to where the null value setting happened. This would not be a perfect fix
though, since the fields of a tuple are not final, so it is possible to simply
set the field values to null directly.
I don't see any of these as good solutions to this issue. Really the best thing
to do is find a way to efficiently serialize null values in tuples. Why exactly
is it slower to support serializing null values in tuples than it is for a POJO
or a {{Row}} object?
In theory, if you simply ran tuples through the POJO serializer, it should be
able to serialize them fine, with the same efficiency that it can serialize
regular POJOs (which are allowed to contain null values) -- so I don't see how
or why this would incur a performance pentalty.
> Need more helpful error message when trying to serialize a tuple with a null
> field
> ----------------------------------------------------------------------------------
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the
> following, which has no information about where in the program the problem
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a
> flatMap (but I have dozens of them in my code). Surely there's a way to pull
> out the source file name and line number from the program DAG node when
> errors like this occur?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)