[ 
https://issues.apache.org/jira/browse/SPARK-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698949#comment-14698949
 ] 

Yin Huai commented on SPARK-7837:
---------------------------------

ah i see the reason of the NPE. We actually called close twice. In 
DefaultWriterContainer's writeRows, we start to write out rows and at the end 
we call commitTask. In commitTask, we first call writer.close and then we call 
super.commitTask(). In writer.close, we triggered ParquetRecordWriter's close, 
which sets columnStore to null. Then, because the speculative task's commit is 
rejected (i.e. super.commitTask() is rejected by OutputCommitCoordinator), we 
cal labortTask, which triggers writer.close again. Inside writer.close we call 
ParquetRecordWriter's close and then we get NPE because columnStore is already 
set to null.

> NPE when save as parquet in speculative tasks
> ---------------------------------------------
>
>                 Key: SPARK-7837
>                 URL: https://issues.apache.org/jira/browse/SPARK-7837
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>            Reporter: Yin Huai
>            Assignee: Cheng Lian
>            Priority: Critical
>
> The query is like {{df.orderBy(...).saveAsTable(...)}}.
> When there is no partitioning columns and there is a skewed key, I found the 
> following exception in speculative tasks. After these failures, seems we 
> could not call {{SparkHadoopMapRedUtil.commitTask}} correctly.
> {code}
> java.lang.NullPointerException
>       at 
> parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:146)
>       at 
> parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:112)
>       at parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73)
>       at 
> org.apache.spark.sql.parquet.ParquetOutputWriter.close(newParquet.scala:115)
>       at 
> org.apache.spark.sql.sources.DefaultWriterContainer.abortTask(commands.scala:385)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:150)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
>       at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:122)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>       at org.apache.spark.scheduler.Task.run(Task.scala:70)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to