[ 
https://issues.apache.org/jira/browse/SPARK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041742#comment-15041742
 ] 

Jiri Syrovy edited comment on SPARK-8513 at 12/4/15 4:47 PM:
-------------------------------------------------------------

The same thing happens when writing partitioned files with coalesce(1) and 
spark.speculation set to "false". I've tried multiple different hadoop versions 
(2.6 -> 2.7.1), multiple writing modes (Overwrite, append, ...), but the result 
is always almost the same.
{code:java}
                DataFrameWriter writer = 
df.coalesce(1).write().format("parquet");
                writer = perTemplate ? writer.partitionBy("templateId", 
"definitionId") 
                                : writer.partitionBy("definitionId");
                writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + 
location);
{code}

{noformat}
[2015-12-04 16:14:59,821] WARN  .apache.hadoop.fs.FileUtil [] 
[akka://JobServer/user/context-supervisor/CSCONTEXT] - Failed to delete file or 
dir 
[/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]:
 it still exists.
[2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] 
[akka://JobServer/user/context-supervisor/CSCONTEXT] - Aborting job.
java.io.IOException: Failed to rename 
DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918;
 isDirectory=true; modification_time=1449245693000; access_time=0; owner=; 
group=; permission=rwxrwxrwx; isSymlink=false} to 
file:/data/build/result_12349.parquet.stats/templateId=2918
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
        at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
{noformat}


was (Author: xjrk):
The same thing happens when writing partitioned files with coalesce(1) and 
spark.speculation set to "false":
{code:java}
                DataFrameWriter writer = 
df.coalesce(1).write().format("parquet");
                writer = perTemplate ? writer.partitionBy("templateId", 
"definitionId") 
                                : writer.partitionBy("definitionId");
                writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + 
location);
{code}

{noformat}
[2015-12-04 16:14:59,821] WARN  .apache.hadoop.fs.FileUtil [] 
[akka://JobServer/user/context-supervisor/CSCONTEXT] - Failed to delete file or 
dir 
[/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]:
 it still exists.
[2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] 
[akka://JobServer/user/context-supervisor/CSCONTEXT] - Aborting job.
java.io.IOException: Failed to rename 
DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918;
 isDirectory=true; modification_time=1449245693000; access_time=0; owner=; 
group=; permission=rwxrwxrwx; isSymlink=false} to 
file:/data/build/result_12349.parquet.stats/templateId=2918
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
        at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
        at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
{noformat}

> _temporary may be left undeleted when a write job committed with 
> FileOutputCommitter fails due to a race condition
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8513
>                 URL: https://issues.apache.org/jira/browse/SPARK-8513
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, SQL
>    Affects Versions: 1.2.2, 1.3.1, 1.4.0
>            Reporter: Cheng Lian
>
> To reproduce this issue, we need a node with relatively more cores, say 32 
> (e.g., Spark Jenkins builder is a good candidate).  With such a node, the 
> following code should be relatively easy to reproduce this issue:
> {code}
> sqlContext.range(0, 10).repartition(32).select('id / 
> 0).write.mode("overwrite").parquet("file:///tmp/foo")
> {code}
> You may observe similar log lines as below:
> {noformat}
> 01:58:27.682 pool-1-thread-1-ScalaTest-running-CommitFailureTestRelationSuite 
> WARN FileUtil: Failed to delete file or dir 
> [/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-a918b285-fa59-4a29-857e-a95e38fa355a/_temporary/0/_temporary]:
>  it still exists.
> {noformat}
> The reason is that, for a Spark job with multiple tasks, when a task fails 
> after multiple retries, the job gets canceled on driver side.  At the same 
> time, all child tasks of this job also get canceled.  However, task 
> cancelation is asynchronous.  This means, some tasks may still be running 
> when the job is already killed on driver side.
> With this in mind, the following execution order may cause the log line 
> mentioned above:
> # Job {{A}} spawns 32 tasks to write the Parquet file
>   Since {{ParquetOutputCommitter}} is a subclass of {{FileOutputClass}}, a 
> temporary directory {{D1}} is created to hold output files of different task 
> attempts.
> # Task {{a1}} fails after several retries first because of the division by 
> zero error
> # Task {{a1}} aborts the Parquet write task and tries to remove its task 
> attempt output directory {{d1}} (a sub-directory of {{D1}})
> # Job {{A}} gets canceled on driver side, all the other 31 tasks also get 
> canceled *asynchronously*
> # {{ParquetOutputCommitter.abortJob()}} tries to remove {{D1}} by first 
> removing all its child files/directories first
>   Note that when testing with local directory, {{RawLocalFileSystem}} simply 
> calls {{java.io.File.delete()}} to deletion, and only empty directories can 
> be deleted.
> # Because tasks are canceled asynchronously, some other task, say {{a2}}, may 
> just get scheduled and create its own task attempt directory {{d2}} under 
> {{D1}}
> # Now {{ParquetOutputCommitter.abortJob()}} tries to finally remove {{D1}} 
> itself, but fails because {{d2}} makes {{D1}} non-empty again
> Notice that this bug affects all Spark jobs that writes files with 
> {{FileOutputCommitter}} and its subclasses which create and delete temporary 
> directories.
> One of the possible way to fix this issue can be making task cancellation 
> synchronous, but this also increases latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to