Repository: spark
Updated Branches:
  refs/heads/branch-1.5 80dac0b07 -> f28399e1a


[SPARK-11328][SQL] Improve error message when hitting this issue

The issue is that the output commiter is not idempotent and retry attempts will
fail because the output file already exists. It is not safe to clean up the file
as this output committer is by design not retryable. Currently, the job fails
with a confusing file exists error. This patch is a stop gap to tell the user
to look at the top of the error log for the proper message.

This is difficult to test locally as Spark is hardcoded not to retry. Manually
verified by upping the retry attempts.

Author: Nong Li <n...@databricks.com>
Author: Nong Li <non...@gmail.com>

Closes #10080 from nongli/spark-11328.

(cherry picked from commit 47a0abc343550c855e679de12983f43e6fcc0171)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f28399e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f28399e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f28399e1

Branch: refs/heads/branch-1.5
Commit: f28399e1ab6fd3a829d05ffbc637aa2c86cffdf2
Parents: 80dac0b
Author: Nong Li <n...@databricks.com>
Authored: Tue Dec 1 15:30:21 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Dec 1 15:59:09 2015 -0800

----------------------------------------------------------------------
 .../execution/datasources/WriterContainer.scala | 22 ++++++++++++++++++--
 .../parquet/DirectParquetOutputCommitter.scala  |  3 ++-
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index c1599f1..6c39dba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -122,6 +122,24 @@ private[sql] abstract class BaseWriterContainer(
     }
   }
 
+  protected def newOutputWriter(path: String): OutputWriter = {
+    try {
+      outputWriterFactory.newInstance(path, dataSchema, taskAttemptContext)
+    } catch {
+      case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
+        if 
(outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
+          // Spark-11382: DirectParquetOutputCommitter is not idempotent, 
meaning on retry
+          // attempts, the task will fail because the output file is created 
from a prior attempt.
+          // This often means the most visible error to the user is 
misleading. Augment the error
+          // to tell the user to look for the actual error.
+          throw new SparkException("The output file already exists but this 
could be due to a " +
+            "failure from an earlier attempt. Look through the earlier logs or 
stage page for " +
+            "the first error.\n  File exists error: " + e)
+        }
+        throw e
+    }
+  }
+
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter 
= {
     val defaultOutputCommitter = 
outputFormatClass.newInstance().getOutputCommitter(context)
 
@@ -230,7 +248,7 @@ private[sql] class DefaultWriterContainer(
     executorSideSetup(taskContext)
     val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
     configuration.set("spark.sql.sources.output.path", outputPath)
-    val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, 
taskAttemptContext)
+    val writer = newOutputWriter(getWorkPath)
     writer.initConverter(dataSchema)
 
     var writerClosed = false
@@ -400,7 +418,7 @@ private[sql] class DynamicPartitionWriterContainer(
       val configuration = 
SparkHadoopUtil.get.getConfigurationFromJobContext(taskAttemptContext)
       configuration.set(
         "spark.sql.sources.output.path", new Path(outputPath, 
partitionPath).toString)
-      val newWriter = outputWriterFactory.newInstance(path.toString, 
dataSchema, taskAttemptContext)
+      val newWriter = super.newOutputWriter(path.toString)
       newWriter.initConverter(dataSchema)
       newWriter
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f28399e1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
index 2c6b914..cf61a2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetOutputCommitter.scala
@@ -41,7 +41,8 @@ import org.apache.parquet.hadoop.{ParquetFileReader, 
ParquetFileWriter, ParquetO
  *   no safe way undo a failed appending job (that's why both `abortTask()` 
and `abortJob()` are
  *   left * empty).
  */
-private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
+private[datasources] class DirectParquetOutputCommitter(
+    outputPath: Path, context: TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {
   val LOG = Log.getLog(classOf[ParquetOutputCommitter])
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to