[ 
https://issues.apache.org/jira/browse/SPARK-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589592#comment-14589592
 ] 

Cheng Lian edited comment on SPARK-8406 at 6/17/15 6:07 PM:
------------------------------------------------------------

An example task execution order which causes overwriting:

# Writing a DataFrame with 4 RDD partitions to an empty directory.
# Task 1 and task 2 get scheduled, while task 3 and task 4 are queued.  Both 
task 1 and task 2 find current max part number to be 0 (because destination 
directory is empty).
# Task 1 finishes, generates {{part-r-00001.gz.parquet}}. Current max part 
number becomes 1.
# Task 4 gets scheduled, decides to write to {{part-r-00005.gz.parquet}} (5 = 
current max part number + task ID), but hasn't start writing the file yet.
# Task 2 finishes, generates {{part-r-00002.gz.parquet}}. Current max part 
number becomes 2.
# Task 3 gets scheduled, also decides to write to {{part-r-00005.gz.parquet}} 
since task 4 hasn't start writing its output file, and task 3 finds current max 
part number is still 2.
# Task 4 finishes writing {{part-r-00005.gz.parquet}}
# Task 3 finishes writing {{part-r-00005.gz.parquet}}
# Output of task 4 is overwritten.


was (Author: lian cheng):
An example task completion and scheduling order that causes overwriting:

# Writing a DataFrame with 4 RDD partitions to an empty directory.
# Task 1 and task 2 get scheduled, while task 3 and task 4 are queued.  Both 
task 1 and task 2 find current max part number to be 0 (because destination 
directory is empty).
# Task 1 finishes, generates {{part-r-00001.gz.parquet}}. Current max part 
number becomes 1.
# Task 4 gets scheduled, decides to write to {{part-r-00005.gz.parquet}} (5 = 
current max part number + task ID), but hasn't start writing the file yet.
# Task 2 finishes, generates {{part-r-00002.gz.parquet}}. Current max part 
number becomes 2.
# Task 3 gets scheduled, also decides to write to {{part-r-00005.gz.parquet}} 
since task 4 hasn't start writing its output file, and task 3 finds current max 
part number is still 2.
# Task 4 finishes writing {{part-r-00005.gz.parquet}}
# Task 3 finishes writing {{part-r-00005.gz.parquet}}
# Output of task 4 is overwritten.

> Race condition when writing Parquet files
> -----------------------------------------
>
>                 Key: SPARK-8406
>                 URL: https://issues.apache.org/jira/browse/SPARK-8406
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.4.0
>            Reporter: Cheng Lian
>            Assignee: Cheng Lian
>            Priority: Blocker
>
> To support appending, the Parquet data source tries to find out the max part 
> number of part-files in the destination directory (the <id> in output file 
> name "part-r-<id>.gz.parquet") at the beginning of the write job. In 1.3.0, 
> this step happens on driver side before any files are written. However, in 
> 1.4.0, this is moved to task side. Thus, for tasks scheduled later, they may 
> see wrong max part number generated by newly written files by other finished 
> tasks within the same job. This actually causes a race condition. In most 
> cases, this only causes nonconsecutive IDs in output file names. But when the 
> DataFrame contains thousands of RDD partitions, it's likely that two tasks 
> may choose the same part number, thus one of them gets overwritten by the 
> other.
> The data loss situation is not quite easy to reproduce. But the following 
> Spark shell snippet can reproduce nonconsecutive output file IDs:
> {code}
> sqlContext.range(0, 
> 128).repartition(16).write.mode("overwrite").parquet("foo")
> {code}
> "16" can be replaced with any integer that is greater than the default 
> parallelism on your machine (usually it means core number, on my machine it's 
> 8).
> {noformat}
> -rw-r--r--   3 lian supergroup          0 2015-06-17 00:06 
> /user/lian/foo/_SUCCESS
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00001.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00002.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00003.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00004.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00005.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00006.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00007.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00008.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00017.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00018.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00019.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00020.gz.parquet
> -rw-r--r--   3 lian supergroup        352 2015-06-17 00:06 
> /user/lian/foo/part-r-00021.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00022.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00023.gz.parquet
> -rw-r--r--   3 lian supergroup        353 2015-06-17 00:06 
> /user/lian/foo/part-r-00024.gz.parquet
> {noformat}
> Notice that the newly added ORC data source doesn't suffer this issue because 
> it uses both part number and {{System.currentTimeMills()}} to generate the 
> output file name.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to