[
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941882#comment-15941882
]
Luke Hutchison commented on FLINK-6115:
---------------------------------------
[~greghogan] versioning of durable binary formats is not only standard
practice, it is important.
Yes, allowing null values in Java was a big mistake, but option types didn't
exist back when nullable references were added to Java. And an {{Option}} or
{{Maybe}} type was the first thing I went looking for in Flink to address the
non-nullable tuple field issue. Flink definitely needs an {{Option}} type until
Java supports its own (just as having its own {{Tuple}} types is critical to
the usefulness of Flink). (For that matter, {{flatMap}} should work over
{{Option}} types, treating them as collected lists of length 0 or 1, I went
looking for that too...)
However, the fact that null pointers (and general lack of strong nullability
analysis) have caused no manner of pain to users of Java doesn't mean that they
are not crucial to the way that the language works today, or to how programmers
tend to use it. Even Flink uses null values the way they're generally used in
Java in place of {{Option}} types -- e.g. giving you null values on an outer
join when there is no corresponding key in one dataset.
Yes, throwing a NPE in tuple constructors misses the manual setting of field
values, I mentioned that in an earlier comment. However, it actually highly
surprised me when I noticed that the tuple fields were non-final, based on one
of the first things I read in the Flink documentation: "Flink has the special
classes {{DataSet}} and {{DataStream}} to represent data in a program. You can
think of them as immutable collections of data that can contain duplicates." If
the collections and streams that contain tuples are supposed to be thought of
as immutable, why should the individual elements of those collections and
streams be mutable? Perhaps tuple field values should be made final (which
would of course be a breaking change for some users, and would probably
especially require changes internally in the aggregation operator code).
Setting aside the issue of null fields in tuples, this will surely not be the
last time that the serialization format will need to change! What if, for
example, Flink needs to add support for some future new Java type, or needs to
support another JVM language that requires some extra metadata of some form to
be stored along with its serialized objects? I strongly recommend versioning
the checkpoint files.
> Need more helpful error message when trying to serialize a tuple with a null
> field
> ----------------------------------------------------------------------------------
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the
> following, which has no information about where in the program the problem
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
> at
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
> at
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> at
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
> at
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a
> flatMap (but I have dozens of them in my code). Surely there's a way to pull
> out the source file name and line number from the program DAG node when
> errors like this occur?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)