[ 
https://issues.apache.org/jira/browse/SPARK-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henrique dos Santos Goulart updated SPARK-10063:
------------------------------------------------
    Comment: was deleted

(was: There is any alternative right now that works with Parquet that uses 
partitionBy? Because it works very well if I set version=2 and do not use 
paritionBy parquet, but if I use 
dataset.partitionBy(..).option(..algorithm.version", "2").parquet(...) it will 
create temporary folders =(

Reference question: 
[https://stackoverflow.com/questions/48573643/spark-dataset-parquet-partition-on-s3-creating-temporary-folder]

 

[~rxin] [~yhuai] [~ste...@apache.org] [~chiragvaya])

> Remove DirectParquetOutputCommitter
> -----------------------------------
>
>                 Key: SPARK-10063
>                 URL: https://issues.apache.org/jira/browse/SPARK-10063
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>            Reporter: Yin Huai
>            Assignee: Reynold Xin
>            Priority: Critical
>             Fix For: 2.0.0
>
>
> When we use DirectParquetOutputCommitter on S3 and speculation is enabled, 
> there is a chance that we can loss data. 
> Here is the code to reproduce the problem.
> {code}
> import org.apache.spark.sql.functions._
> val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: 
> Int, partitionId: Int, attemptNumber: Int) => {
>   if (partitionId == 0 && i == 5) {
>     if (attemptNumber > 0) {
>       Thread.sleep(15000)
>       throw new Exception("new exception")
>     } else {
>       Thread.sleep(10000)
>     }
>   }
>   
>   i
> })
> val df = sc.parallelize((1 to 100), 20).mapPartitions { iter =>
>   val context = org.apache.spark.TaskContext.get()
>   val partitionId = context.partitionId
>   val attemptNumber = context.attemptNumber
>   iter.map(i => (i, partitionId, attemptNumber))
> }.toDF("i", "partitionId", "attemptNumber")
> df
>   .select(failSpeculativeTask($"i", $"partitionId", 
> $"attemptNumber").as("i"), $"partitionId", $"attemptNumber")
>   .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter")
> sqlContext.read.load("/home/yin/outputCommitter").count
> // The result is 99 and 5 is missing from the output.
> {code}
> What happened is that the original task finishes first and uploads its output 
> file to S3, then the speculative task somehow fails. Because we have to call 
> output stream's close method, which uploads data to S3, we actually uploads 
> the partial result generated by the failed speculative task to S3 and this 
> file overwrites the correct file generated by the original task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to