[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17110873#comment-17110873 ]
Jungtaek Lim commented on SPARK-31754: -------------------------------------- [~puviarasu] Given the error comes from "generated code", you may want to turn the DEBUG log for below class and retrieve generated code, and paste these codes as well. org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator Btw higher priorities than major should go through committer's decision. I'll lower the priority and see the decision from committer. (Personally it looks like an edge-case, not meant to be a blocker.) > Spark Structured Streaming: NullPointerException in Stream Stream join > ---------------------------------------------------------------------- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 > Reporter: Puviarasu > Priority: Major > Labels: structured-streaming > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583) > at > org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361) > at > org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216) > at > org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108) > at > org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Blacklisting behavior can be configured via spark.blacklist.*. at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167) > ... 19 moreException in thread "main" > org.apache.spark.SparkException: Application application_2345 finished with > failed status > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158) > at > org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851) > at > org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org