[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2020-05-19 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111696#comment-17111696
 ] 

Jungtaek Lim edited comment on SPARK-31754 at 5/20/20, 2:21 AM:


Looks like the row itself is null which shouldn't happen. 

1. Is it always reproducible under the same input & checkpoint (start from 
initial checkpoint or specific checkpoint)?
2. Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so 
that we can avoid investigating issue which might be already resolved?


was (Author: kabhwan):
Looks like the row itself is null which shouldn't happen. Could you try it out 
with recent version like 2.4.5 or 3.0.0-preview2 so that we can avoid 
investigating issue which might be already resolved?

> Spark Structured Streaming: NullPointerException in Stream Stream join
> --
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>Reporter: Puviarasu
>Priority: Major
>  Labels: structured-streaming
> Attachments: CodeGen.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2020-05-19 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111696#comment-17111696
 ] 

Jungtaek Lim edited comment on SPARK-31754 at 5/20/20, 2:50 AM:


Looks like the row itself is null which shouldn't happen. 

1. Is it always reproducible under the same input & checkpoint (start from 
initial checkpoint or specific checkpoint)?
2. Could you share the query plan (logical/physical)? Query plan from previous 
batch would be OK.
3. Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so 
that we can avoid investigating issue which might be already resolved?


was (Author: kabhwan):
Looks like the row itself is null which shouldn't happen. 

1. Is it always reproducible under the same input & checkpoint (start from 
initial checkpoint or specific checkpoint)?
2. Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so 
that we can avoid investigating issue which might be already resolved?

> Spark Structured Streaming: NullPointerException in Stream Stream join
> --
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>Reporter: Puviarasu
>Priority: Major
>  Labels: structured-streaming
> Attachments: CodeGen.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
> at org.apache.spark.util.NextIterator.hasNext(NextIte

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2020-05-20 Thread Puviarasu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112359#comment-17112359
 ] 

Puviarasu edited comment on SPARK-31754 at 5/20/20, 3:36 PM:
-

Hello [~kabhwan] , 

Please find below the comments in *bold*.
 # Is it always reproducible under the same input & checkpoint (start from 
initial checkpoint or specific checkpoint)?
 *Same Checkpoint: With the same checkpoint and input the application is 
failing with the same exception in the same offset.* 
*New Checkpoint: Also we tested clearing the checkpoint with the same input. In 
this case[after clearing the checkpoint] exception dint happen for that 
particular input.* 
 # Could you share the query plan (logical/physical)? Query plan from previous 
batch would be OK.
 *Sure. Please find the attachment [^Logical-Plan.txt]*
 # Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so 
that we can avoid investigating issue which might be already resolved?
 *For this we might need some more time as we need some changes to be done in 
our cluster settings. Kindly bear us with the delay.* 

Thank you. 


was (Author: puviarasu):
Hello [~kabhwan] , 

Please find below the comments in *bold*.
 # Is it always reproducible under the same input & checkpoint (start from 
initial checkpoint or specific checkpoint)? 
*With the same checkpoint and input the application is failing with the same 
exception in the same offset. Also we tested clearing the checkpoint with the 
same input. In this case exception dint happen for that particular input.* 
 # Could you share the query plan (logical/physical)? Query plan from previous 
batch would be OK. 
*Sure. Please find the attachment [^Logical-Plan.txt]*
 # Could you try it out with recent version like 2.4.5 or 3.0.0-preview2 so 
that we can avoid investigating issue which might be already resolved? 
*For this we might need some more time as we need some changes to be done in 
our cluster settings. Kindly bear us with the delay.* 

Thank you. 

> Spark Structured Streaming: NullPointerException in Stream Stream join
> --
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>Reporter: Puviarasu
>Priority: Major
>  Labels: structured-streaming
> Attachments: CodeGen.txt, Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent fai

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2020-11-24 Thread fanxin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237961#comment-17237961
 ] 

fanxin edited comment on SPARK-31754 at 11/24/20, 8:50 AM:
---

I encountered the same NPE in Spark2.4.4. I have found that the root cause of 
the error is that the state store saved a corrupted snapshot files (and delta 
files were corrupted too, although corrupted delta file will not result in 
NPE). It has already been fixed in Spark3. Issue related to this problem is 
https://issues.apache.org/jira/browse/SPARK-29438.


was (Author: fanxiin):
I encountered the same NEP in Spark2.4.4. I have found that the root cause of 
the error is that the state store saved a corrupted snapshot files (and delta 
files were corrupted too, although corrupted delta file will not result in 
NEP). It has already been fixed in Spark3. Issue related to this problem is to 
https://issues.apache.org/jira/browse/SPARK-29438.

> Spark Structured Streaming: NullPointerException in Stream Stream join
> --
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>Reporter: Puviarasu
>Priority: Major
>  Labels: structured-streaming
> Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, 
> Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
> at org.apache.spark.util.NextIterator.has

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2020-11-24 Thread fanxin (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237961#comment-17237961
 ] 

fanxin edited comment on SPARK-31754 at 11/24/20, 9:07 AM:
---

I encountered the same NPE in Spark2.4.4. I have found that the root cause of 
the error is that the state store saved a corrupted snapshot files (and delta 
files were corrupted too, although corrupted delta file will not result in 
NPE). It has already been fixed in Spark3. Issue related to this problem is 
https://issues.apache.org/jira/browse/SPARK-29438.  By the way, in my case, the 
`coalesce` may lead to NPE.


was (Author: fanxiin):
I encountered the same NPE in Spark2.4.4. I have found that the root cause of 
the error is that the state store saved a corrupted snapshot files (and delta 
files were corrupted too, although corrupted delta file will not result in 
NPE). It has already been fixed in Spark3. Issue related to this problem is 
https://issues.apache.org/jira/browse/SPARK-29438.

> Spark Structured Streaming: NullPointerException in Stream Stream join
> --
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>Reporter: Puviarasu
>Priority: Major
>  Labels: structured-streaming
> Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, 
> Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:1

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

2021-03-25 Thread Chenxi Zhao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309012#comment-17309012
 ] 

Chenxi Zhao edited comment on SPARK-31754 at 3/26/21, 12:08 AM:


I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from 
Kafka source loading about 288GB data. After the joining state begins, I 
immediately start to see such exception:

 

21/03/24 04:56:51 ERROR Utils: Aborting task
 java.lang.NullPointerException
 at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown
 Source)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
 at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

Edit: It might be related to the corrupted checkpoint. I deleted the checkpoint 
folder and this particular exception is gone.


was (Author: homezcx):
I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from 
Kafka source loading about 288GB data. After the joining state begins, I 
immediately start to see such exception:

 

21/03/24 04:56:51 ERROR Utils: Aborting task
java.lang.NullPointerException
 at 
org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown
 Source)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
 at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingS