[ https://issues.apache.org/jira/browse/SPARK-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Henrique dos Santos Goulart updated SPARK-10063: ------------------------------------------------ Comment: was deleted (was: There is any alternative right now that works with Parquet that uses partitionBy? Because it works very well if I set version=2 and do not use paritionBy parquet, but if I use dataset.partitionBy(..).option(..algorithm.version", "2").parquet(...) it will create temporary folders =( Reference question: [https://stackoverflow.com/questions/48573643/spark-dataset-parquet-partition-on-s3-creating-temporary-folder] [~rxin] [~yhuai] [~ste...@apache.org] [~chiragvaya]) > Remove DirectParquetOutputCommitter > ----------------------------------- > > Key: SPARK-10063 > URL: https://issues.apache.org/jira/browse/SPARK-10063 > Project: Spark > Issue Type: Bug > Components: SQL > Reporter: Yin Huai > Assignee: Reynold Xin > Priority: Critical > Fix For: 2.0.0 > > > When we use DirectParquetOutputCommitter on S3 and speculation is enabled, > there is a chance that we can loss data. > Here is the code to reproduce the problem. > {code} > import org.apache.spark.sql.functions._ > val failSpeculativeTask = sqlContext.udf.register("failSpeculativeTask", (i: > Int, partitionId: Int, attemptNumber: Int) => { > if (partitionId == 0 && i == 5) { > if (attemptNumber > 0) { > Thread.sleep(15000) > throw new Exception("new exception") > } else { > Thread.sleep(10000) > } > } > > i > }) > val df = sc.parallelize((1 to 100), 20).mapPartitions { iter => > val context = org.apache.spark.TaskContext.get() > val partitionId = context.partitionId > val attemptNumber = context.attemptNumber > iter.map(i => (i, partitionId, attemptNumber)) > }.toDF("i", "partitionId", "attemptNumber") > df > .select(failSpeculativeTask($"i", $"partitionId", > $"attemptNumber").as("i"), $"partitionId", $"attemptNumber") > .write.mode("overwrite").format("parquet").save("/home/yin/outputCommitter") > sqlContext.read.load("/home/yin/outputCommitter").count > // The result is 99 and 5 is missing from the output. > {code} > What happened is that the original task finishes first and uploads its output > file to S3, then the speculative task somehow fails. Because we have to call > output stream's close method, which uploads data to S3, we actually uploads > the partial result generated by the failed speculative task to S3 and this > file overwrites the correct file generated by the original task. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org