[ https://issues.apache.org/jira/browse/SPARK-8513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15041742#comment-15041742 ]
Jiri Syrovy edited comment on SPARK-8513 at 12/4/15 4:45 PM: ------------------------------------------------------------- The same thing happens when writing partitioned files with coalesce(1) and spark.speculation set to "false": {code:java} DataFrameWriter writer = df.coalesce(1).write().format("parquet"); writer = perTemplate ? writer.partitionBy("templateId", "definitionId") : writer.partitionBy("definitionId"); writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + location); {code} {noformat} [2015-12-04 16:14:59,821] WARN .apache.hadoop.fs.FileUtil [] [akka://JobServer/user/context-supervisor/CSCONTEXT] - Failed to delete file or dir [/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]: it still exists. [2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] [akka://JobServer/user/context-supervisor/CSCONTEXT] - Aborting job. java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918; isDirectory=true; modification_time=1449245693000; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to file:/data/build/result_12349.parquet.stats/templateId=2918 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46) {noformat} was (Author: xjrk): The same thing happens when writing partitioned files with coalesce(1) and spark.speculation set to "false": {code:java} DataFrameWriter writer = df.coalesce(1).write().format("parquet"); writer = perTemplate ? writer.partitionBy("templateId", "definitionId") : writer.partitionBy("definitionId"); writer.mode(SaveMode.Append).save(ConfConsts.STORAGE_PREFIX + location); {code:java} {noformat} [2015-12-04 16:14:59,821] WARN .apache.hadoop.fs.FileUtil [] [akka://JobServer/user/context-supervisor/CSCONTEXT] - Failed to delete file or dir [/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918/definitionId=4/.part-r-00000-cee32e28-fa7c-43ec-bbe1-63b639deb395.snappy.parquet.crc]: it still exists. [2015-12-04 16:14:59,821] ERROR InsertIntoHadoopFsRelation [] [akka://JobServer/user/context-supervisor/CSCONTEXT] - Aborting job. java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/data/build/result_12349.parquet.stats/_temporary/0/task_201512041614_5493_m_000000/templateId=2918; isDirectory=true; modification_time=1449245693000; access_time=0; owner=; group=; permission=rwxrwxrwx; isSymlink=false} to file:/data/build/result_12349.parquet.stats/templateId=2918 at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.renameOrMerge(FileOutputCommitter.java:397) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:388) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46) {noformat} > _temporary may be left undeleted when a write job committed with > FileOutputCommitter fails due to a race condition > ------------------------------------------------------------------------------------------------------------------ > > Key: SPARK-8513 > URL: https://issues.apache.org/jira/browse/SPARK-8513 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 1.2.2, 1.3.1, 1.4.0 > Reporter: Cheng Lian > > To reproduce this issue, we need a node with relatively more cores, say 32 > (e.g., Spark Jenkins builder is a good candidate). With such a node, the > following code should be relatively easy to reproduce this issue: > {code} > sqlContext.range(0, 10).repartition(32).select('id / > 0).write.mode("overwrite").parquet("file:///tmp/foo") > {code} > You may observe similar log lines as below: > {noformat} > 01:58:27.682 pool-1-thread-1-ScalaTest-running-CommitFailureTestRelationSuite > WARN FileUtil: Failed to delete file or dir > [/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-a918b285-fa59-4a29-857e-a95e38fa355a/_temporary/0/_temporary]: > it still exists. > {noformat} > The reason is that, for a Spark job with multiple tasks, when a task fails > after multiple retries, the job gets canceled on driver side. At the same > time, all child tasks of this job also get canceled. However, task > cancelation is asynchronous. This means, some tasks may still be running > when the job is already killed on driver side. > With this in mind, the following execution order may cause the log line > mentioned above: > # Job {{A}} spawns 32 tasks to write the Parquet file > Since {{ParquetOutputCommitter}} is a subclass of {{FileOutputClass}}, a > temporary directory {{D1}} is created to hold output files of different task > attempts. > # Task {{a1}} fails after several retries first because of the division by > zero error > # Task {{a1}} aborts the Parquet write task and tries to remove its task > attempt output directory {{d1}} (a sub-directory of {{D1}}) > # Job {{A}} gets canceled on driver side, all the other 31 tasks also get > canceled *asynchronously* > # {{ParquetOutputCommitter.abortJob()}} tries to remove {{D1}} by first > removing all its child files/directories first > Note that when testing with local directory, {{RawLocalFileSystem}} simply > calls {{java.io.File.delete()}} to deletion, and only empty directories can > be deleted. > # Because tasks are canceled asynchronously, some other task, say {{a2}}, may > just get scheduled and create its own task attempt directory {{d2}} under > {{D1}} > # Now {{ParquetOutputCommitter.abortJob()}} tries to finally remove {{D1}} > itself, but fails because {{d2}} makes {{D1}} non-empty again > Notice that this bug affects all Spark jobs that writes files with > {{FileOutputCommitter}} and its subclasses which create and delete temporary > directories. > One of the possible way to fix this issue can be making task cancellation > synchronous, but this also increases latency. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org