[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691757#comment-17691757 ] Yoel Benharrous commented on SPARK-31754: - Seems relate to this bug [SPARK-38809] Implement option to skip null values in symmetric hash impl of stream-stream joins - ASF JIRA (apache.org) > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17544670#comment-17544670 ] Ankit Verma commented on SPARK-31754: - While analysing one of the Production issue which we recently had, came across this bug which is similar to the issue with one of our spark streaming jobs (i.e. one of the query does a streaming join on data coming from two different kafka topics) Spark Version - 3.1.1 *Stack-trace :* {code:java} py4j.protocol.Py4JJavaError: An error occurred while calling o2504.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in stage 28.0 failed 4 times, most recent failure: Lost task 19.3 in stage 28.0 (TID 653) (ip-10-98-69-15.ap-southeast-2.compute.internal executor 6): java.lang.NullPointerException at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:236) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:260) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:196) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$21(StreamingSymmetricHashJoinExec.scala:408) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:597) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:111) at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:111) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:127) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.onOutputCompletion$1(StreamingSymmetricHashJoinExec.scala:385) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:431) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:51) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$2(KafkaWriter.scala:72) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1(KafkaWriter.scala:73) at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1$adapted(KafkaWriter.scala:70) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309833#comment-17309833 ] Jungtaek Lim commented on SPARK-31754: -- Did anyone make a backup of problematic state with corresponding query, and mind sharing? As I commented, it's pretty hard to track down without some material to play with. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309012#comment-17309012 ] Chenxi Zhao commented on SPARK-31754: - I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from Kafka source loading about 288GB data. After the joining state begins, I immediately start to see such exception: 21/03/24 04:56:51 ERROR Utils: Aborting task java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237961#comment-17237961 ] fanxin commented on SPARK-31754: I encountered the same NEP in Spark2.4.4. I have found that the root cause of the error is that the state store saved a corrupted snapshot files (and delta files were corrupted too, although corrupted delta file will not result in NEP). It has already been fixed in Spark3. Issue related to this problem is to https://issues.apache.org/jira/browse/SPARK-29438. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17198348#comment-17198348 ] Mark Kegel (DSS) commented on SPARK-31754: -- We are seeing this same problem in our data pipeline. As an experiment we tried swapping out the default state store for the RocksDB one. We still get an exception, but its a very different one. Hopefully this might point folks towards what the issue is. Here is a sample stacktrace: {code:java} Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2582) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:170) ... 56 more Caused by: java.lang.IllegalStateException: RocksDB instance could not be acquired by [ThreadId: 15124, task: 185.3 in stage 19413, TID 1537636] as it was not released by [ThreadId: 12648, task: 185.1 in stage 19413, TID 1535907] after 10002 ms StateStoreId(opId=2,partId=185,name=left-keyToNumValues) at com.databricks.sql.streaming.state.RocksDB.acquire(RocksDB.scala:332) at com.databricks.sql.streaming.state.RocksDB.load(RocksDB.scala:103) at com.databricks.sql.streaming.state.RocksDBStateStoreProvider.getStore(RocksDBStateStoreProvider.scala:161) at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:372) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.getStateStore(SymmetricHashJoinStateManager.scala:321) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyToNumValuesStore.(SymmetricHashJoinStateManager.scala:347) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.(SymmetricHashJoinStateManager.scala:294) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.(StreamingSymmetricHashJoinExec.scala:397) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions(StreamingSymmetricHashJoinExec.scala:229) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$doExecute$1.apply(StreamingSymmetricHashJoinExec.scala:205) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$doExecute$1.apply(StreamingSymmetricHashJoinExec.scala:205) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:101) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353) at org.apache.spark.rdd.RDD.iterator(RDD.scala:317) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353) at org.apache.spark.rdd.RDD.iterator(RDD.scala:317) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353) at org.apache.spark.rdd.RDD.iterator(RDD.scala:317) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140) at org.apache.spark.scheduler.Task.run(Task.scala:113)
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17170558#comment-17170558 ] Jungtaek Lim commented on SPARK-31754: -- I'd really like to help, but without the reproducer it's really hard to see what's happening. Small reproducer which consistently reproduce the issue would be really helpful. It doesn't need to be meaningful data - redacted data should be OK. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169766#comment-17169766 ] fritz commented on SPARK-31754: --- Hi [~puviarasu], In our case, re-run it not throwing the NPE since the streaming source is Kafka with latest offset. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169746#comment-17169746 ] Puviarasu commented on SPARK-31754: --- Hello [~fritzwijaya], Restarting[after clearing the checkpoint directories] does not solve the issue and the job fails with NullPointerException after running for few minutes. We are running on Spark-2.4.0. We were able to reproduce the issue in Spark-2.4.5 as well. Our environment is Cloudera. Thank you. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169736#comment-17169736 ] fritz commented on SPARK-31754: --- Thanks for your response [~puviarasu] Yes. Batch processing is not have the NPE issues and working fine. Totally agreed, the batch is having higher latency. We already having batch pipeline, so, changing the streaming pipeline to batch is not an options for us. What we are doing right now is to just re-run the job, and it is working again, but, the issues is reappear if there is NPE occurred and the job is failed and get terminated. I am not sure if this is useful, we are running on Spark 2.4.5 on EMR > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169727#comment-17169727 ] Puviarasu commented on SPARK-31754: --- Hello [~fritzwijaya], We still have the issue as of today[2020-08-03] with the Stream - Stream join in Spark Structured Streaming. *Our Workaround:* Batch Processing. We have currently moved our business logic from problematic Stream - Stream join in Spark Structured Streaming to an equivalent Spark Batch Processing. The batch processing workaround is having more latency when compared to Spark Structured Streaming, but it is running stable. Once the issue is fixed by Spark Community, we will replace our Batch processing workaround with the desired Spark Structured Streaming[Stream-Stream Join] Thank you. CC: [~kabhwan] > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17169677#comment-17169677 ] fritz commented on SPARK-31754: --- Hi [~puviarasu] [~kabhwan], recently facing similar issue with the NPE when do stream-stream join. It throwing same exception with the log that [~puviarasu] share above. The only different with my case is the source from kafka. Other than that is same. Have checked and ensure the join key is not null. Any advice? Thanks > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17117569#comment-17117569 ] Puviarasu commented on SPARK-31754: --- Hello [~kabhwan], Please find below our updates for testing with Spark 2.4.5 and Spark 3.0.0preview2. * *Spark 2.4.5:* The application fails with the same java.lang.NullPointerException as we were getting in Spark 2.4.0 * *Spark 3.0.0preview2:* The application fails with below exception. Somewhat related to https://issues.apache.org/jira/browse/SPARK-27780. Full exception stack along with Logical Plan : [^Excpetion-3.0.0Preview2.txt] {code:java} org.apache.spark.shuffle.FetchFailedException: java.lang.IllegalArgumentException: Unknown message type: 10 {code} Regarding the input and checkpoint, it is production data actually. Sharing them as such is very difficult. We are looking for options to anonymize the data before sharing. Even then we require approvals from stake holders. Thank you. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, > Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112816#comment-17112816 ] Jungtaek Lim commented on SPARK-31754: -- I can also take a look if the input and checkpoint are sharable (say, no production inputs), but I imagine it's unlikely. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17112359#comment-17112359 ] Puviarasu commented on SPARK-31754: --- Hello [~kabhwan] , Please find below the comments in *bold*. # Is it always reproducible under the same input & checkpoint (start from initial checkpoint or specific checkpoint)? *With the same checkpoint and input the application is failing with the same exception in the same offset. Also we tested clearing the checkpoint with the same input. In this case exception dint happen for that particular input.* # Could you share the query plan (logical/physical)? Query plan from previous batch would be OK. *Sure. Please find the attachment [^Logical-Plan.txt]* # Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so that we can avoid investigating issue which might be already resolved? *For this we might need some more time as we need some changes to be done in our cluster settings. Kindly bear us with the delay.* Thank you. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt, Logical-Plan.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111696#comment-17111696 ] Jungtaek Lim commented on SPARK-31754: -- Looks like the row itself is null which shouldn't happen. Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so that we can avoid investigating issue which might be already resolved? > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at >
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111221#comment-17111221 ] Puviarasu commented on SPARK-31754: --- Hello [~kabhwan], Thank you for your response. Please find the attachment of the code generated [^CodeGen.txt]. > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > Attachments: CodeGen.txt > > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream) > at
[jira] [Commented] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17110873#comment-17110873 ] Jungtaek Lim commented on SPARK-31754: -- [~puviarasu] Given the error comes from "generated code", you may want to turn the DEBUG log for below class and retrieve generated code, and paste these codes as well. org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator Btw higher priorities than major should go through committer's decision. I'll lower the priority and see the decision from committer. (Personally it looks like an edge-case, not meant to be a blocker.) > Spark Structured Streaming: NullPointerException in Stream Stream join > -- > > Key: SPARK-31754 > URL: https://issues.apache.org/jira/browse/SPARK-31754 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark Version : 2.4.0 > Hadoop Version : 3.0.0 >Reporter: Puviarasu >Priority: Major > Labels: structured-streaming > > When joining 2 streams with watermarking and windowing we are getting > NullPointer Exception after running for few minutes. > After failure we analyzed the checkpoint offsets/sources and found the files > for which the application failed. These files are not having any null values > in the join columns. > We even started the job with the files and the application ran. From this we > concluded that the exception is not because of the data from the streams. > *Code:* > > {code:java} > val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> > "1" ) > val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", > "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" > ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> > "1" ) > > spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1") > > spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2") > spark.sql("select * from source1 where eventTime1 is not null and col1 is > not null").withWatermark("eventTime1", "30 > minutes").createTempView("viewNotNull1") > spark.sql("select * from source2 where eventTime2 is not null and col2 is > not null").withWatermark("eventTime2", "30 > minutes").createTempView("viewNotNull2") > spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = > b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + > interval 2 hours").createTempView("join") > val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> > "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3") > spark.sql("select * from > join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 > seconds")).format("parquet").options(optionsMap3).start() > {code} > > *Exception:* > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Aborting TaskSet 4.0 because task 0 (partition 0) > cannot run anywhere due to node and executor blacklist. > Most recent failure: > Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221) > at > org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212) > at > org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338) > at >