Repository: spark
Updated Branches:
  refs/heads/branch-1.6 baf29854e -> c12db0d33


[SPARK-14454] [1.6] Better exception handling while marking tasks as failed

Backports https://github.com/apache/spark/pull/12234 to 1.6. Original 
description below:

## What changes were proposed in this pull request?

This patch adds support for better handling of exceptions inside catch blocks 
if the code within the block throws an exception. For instance here is the code 
in a catch block before this change in `WriterContainer.scala`:

```scala
logError("Aborting task.", cause)
// call failure callbacks first, so we could have a chance to cleanup the 
writer.
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
if (currentWriter != null) {
  currentWriter.close()
}
abortTask()
throw new SparkException("Task failed while writing rows.", cause)
```

If `markTaskFailed` or `currentWriter.close` throws an exception, we currently 
lose the original cause. This PR fixes this problem by implementing a utility 
function `Utils.tryWithSafeCatch` that suppresses (`Throwable.addSuppressed`) 
the exception that are thrown within the catch block and rethrowing the 
original exception.

## How was this patch tested?

No new functionality added

Author: Sameer Agarwal <sam...@databricks.com>

Closes #12272 from sameeragarwal/fix-exception-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c12db0d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c12db0d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c12db0d3

Branch: refs/heads/branch-1.6
Commit: c12db0d3361e152698521ad077e7a16f2188a4b8
Parents: baf2985
Author: Sameer Agarwal <sam...@databricks.com>
Authored: Mon Apr 11 10:20:22 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Apr 11 10:20:22 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   8 +-
 .../scala/org/apache/spark/scheduler/Task.scala |  14 +-
 .../scala/org/apache/spark/util/Utils.scala     |  29 ++--
 .../execution/datasources/WriterContainer.scala | 147 +++++++++----------
 4 files changed, 103 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c12db0d3/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 853dbc2..2506019 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -1116,9 +1116,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, 
recordsWritten)
           recordsWritten += 1
         }
-      } {
-        writer.close(hadoopContext)
-      }
+      }(finallyBlock = writer.close(hadoopContext))
       committer.commitTask(hadoopContext)
       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) 
}
       outputMetrics.setRecordsWritten(recordsWritten)
@@ -1202,9 +1200,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, 
recordsWritten)
           recordsWritten += 1
         }
-      } {
-        writer.close()
-      }
+      }(finallyBlock = writer.close())
       writer.commit()
       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) 
}
       outputMetrics.setRecordsWritten(recordsWritten)

http://git-wip-us.apache.org/repos/asf/spark/blob/c12db0d3/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 17304ea..c7b1199 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -87,10 +87,16 @@ private[spark] abstract class Task[T](
     }
     try {
       (runTask(context), context.collectAccumulators())
-    } catch { case e: Throwable =>
-      // Catch all errors; run task failure callbacks, and rethrow the 
exception.
-      context.markTaskFailed(e)
-      throw e
+    } catch {
+      case e: Throwable =>
+        // Catch all errors; run task failure callbacks, and rethrow the 
exception.
+        try {
+          context.markTaskFailed(e)
+        } catch {
+          case t: Throwable =>
+            e.addSuppressed(t)
+        }
+        throw e
     } finally {
       // Call the task completion callbacks.
       context.markTaskCompleted()

http://git-wip-us.apache.org/repos/asf/spark/blob/c12db0d3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c102f43..0bcbf26 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1259,26 +1259,35 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Execute a block of code, call the failure callbacks before finally block 
if there is any
-   * exceptions happen. But if exceptions happen in the finally block, do not 
suppress the original
-   * exception.
+   * Execute a block of code and call the failure callbacks in the catch 
block. If exceptions occur
+   * in either the catch or the finally block, they are appended to the list 
of suppressed
+   * exceptions in original exception which is then rethrown.
    *
-   * This is primarily an issue with `finally { out.close() }` blocks, where
-   * close needs to be called to clean up `out`, but if an exception happened
-   * in `out.write`, it's likely `out` may be corrupted and `out.close` will
+   * This is primarily an issue with `catch { abort() }` or `finally { 
out.close() }` blocks,
+   * where the abort/close needs to be called to clean up `out`, but if an 
exception happened
+   * in `out.write`, it's likely `out` may be corrupted and `abort` or 
`out.close` will
    * fail as well. This would then suppress the original/likely more meaningful
    * exception from the original `out.write` call.
    */
-  def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)(finallyBlock: => 
Unit): T = {
+  def tryWithSafeFinallyAndFailureCallbacks[T](block: => T)
+      (catchBlock: => Unit = (), finallyBlock: => Unit = ()): T = {
     var originalThrowable: Throwable = null
     try {
       block
     } catch {
-      case t: Throwable =>
+      case cause: Throwable =>
         // Purposefully not using NonFatal, because even fatal exceptions
         // we don't want to have our finallyBlock suppress
-        originalThrowable = t
-        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(t)
+        originalThrowable = cause
+        try {
+          logError("Aborting task", originalThrowable)
+          
TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(originalThrowable)
+          catchBlock
+        } catch {
+          case t: Throwable =>
+            originalThrowable.addSuppressed(t)
+            logWarning(s"Suppressing exception in catch: " + t.getMessage, t)
+        }
         throw originalThrowable
     } finally {
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/c12db0d3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 83b4eaf..f413185 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.UnsafeKVExternalSorter
 import org.apache.spark.sql.sources.{HadoopFsRelation, OutputWriter, 
OutputWriterFactory}
 import org.apache.spark.sql.types.{StructType, StringType}
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 
 private[sql] abstract class BaseWriterContainer(
@@ -257,19 +257,16 @@ private[sql] class DefaultWriterContainer(
 
     // If anything below fails, we should abort the task.
     try {
-      while (iterator.hasNext) {
-        val internalRow = iterator.next()
-        writer.writeInternal(internalRow)
-      }
-
-      commitTask()
+      Utils.tryWithSafeFinallyAndFailureCallbacks {
+        while (iterator.hasNext) {
+          val internalRow = iterator.next()
+          writer.writeInternal(internalRow)
+        }
+        commitTask()
+      }(catchBlock = abortTask())
     } catch {
-      case cause: Throwable =>
-        logError("Aborting task.", cause)
-        // call failure callbacks first, so we could have a chance to cleanup 
the writer.
-        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
-        abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
     }
 
     def commitTask(): Unit = {
@@ -343,81 +340,81 @@ private[sql] class DynamicPartitionWriterContainer(
     // If anything below fails, we should abort the task.
     var currentWriter: OutputWriter = null
     try {
-      // This will be filled in if we have to fall back on sorting.
-      var sorter: UnsafeKVExternalSorter = null
-      while (iterator.hasNext && sorter == null) {
-        val inputRow = iterator.next()
-        val currentKey = getPartitionKey(inputRow)
-        currentWriter = outputWriters.get(currentKey)
-
-        if (currentWriter == null) {
-          if (outputWriters.size < maxOpenFiles) {
-            currentWriter = newOutputWriter(currentKey)
-            outputWriters.put(currentKey.copy(), currentWriter)
-            currentWriter.writeInternal(getOutputRow(inputRow))
+      Utils.tryWithSafeFinallyAndFailureCallbacks {
+        // This will be filled in if we have to fall back on sorting.
+        var sorter: UnsafeKVExternalSorter = null
+        while (iterator.hasNext && sorter == null) {
+          val inputRow = iterator.next()
+          val currentKey = getPartitionKey(inputRow)
+          currentWriter = outputWriters.get(currentKey)
+
+          if (currentWriter == null) {
+            if (outputWriters.size < maxOpenFiles) {
+              currentWriter = newOutputWriter(currentKey)
+              outputWriters.put(currentKey.copy(), currentWriter)
+              currentWriter.writeInternal(getOutputRow(inputRow))
+            } else {
+              logInfo(s"Maximum partitions reached, falling back on sorting.")
+              sorter = new UnsafeKVExternalSorter(
+                StructType.fromAttributes(partitionColumns),
+                StructType.fromAttributes(dataColumns),
+                SparkEnv.get.blockManager,
+                TaskContext.get().taskMemoryManager().pageSizeBytes)
+              sorter.insertKV(currentKey, getOutputRow(inputRow))
+            }
           } else {
-            logInfo(s"Maximum partitions reached, falling back on sorting.")
-            sorter = new UnsafeKVExternalSorter(
-              StructType.fromAttributes(partitionColumns),
-              StructType.fromAttributes(dataColumns),
-              SparkEnv.get.blockManager,
-              TaskContext.get().taskMemoryManager().pageSizeBytes)
-            sorter.insertKV(currentKey, getOutputRow(inputRow))
+            currentWriter.writeInternal(getOutputRow(inputRow))
           }
-        } else {
-          currentWriter.writeInternal(getOutputRow(inputRow))
-        }
-      }
-      // current writer is included in outputWriters
-      currentWriter = null
-
-      // If the sorter is not null that means that we reached the maxFiles 
above and need to finish
-      // using external sort.
-      if (sorter != null) {
-        while (iterator.hasNext) {
-          val currentRow = iterator.next()
-          sorter.insertKV(getPartitionKey(currentRow), 
getOutputRow(currentRow))
         }
+        // current writer is included in outputWriters
+        currentWriter = null
+
+        // If the sorter is not null that means that we reached the maxFiles 
above and need to
+        // finish using external sort.
+        if (sorter != null) {
+          while (iterator.hasNext) {
+            val currentRow = iterator.next()
+            sorter.insertKV(getPartitionKey(currentRow), 
getOutputRow(currentRow))
+          }
 
-        logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
-
-        val sortedIterator = sorter.sortedIterator()
-        var currentKey: InternalRow = null
-        while (sortedIterator.next()) {
-          if (currentKey != sortedIterator.getKey) {
-            if (currentWriter != null) {
-              currentWriter.close()
-              currentWriter = null
+          logInfo(s"Sorting complete. Writing out partition files one at a 
time.")
+
+          val sortedIterator = sorter.sortedIterator()
+          var currentKey: InternalRow = null
+          while (sortedIterator.next()) {
+            if (currentKey != sortedIterator.getKey) {
+              if (currentWriter != null) {
+                currentWriter.close()
+                currentWriter = null
+              }
+              currentKey = sortedIterator.getKey.copy()
+              logDebug(s"Writing partition: $currentKey")
+
+              // Either use an existing file from before, or open a new one.
+              currentWriter = outputWriters.remove(currentKey)
+              if (currentWriter == null) {
+                currentWriter = newOutputWriter(currentKey)
+              }
             }
-            currentKey = sortedIterator.getKey.copy()
-            logDebug(s"Writing partition: $currentKey")
 
-            // Either use an existing file from before, or open a new one.
-            currentWriter = outputWriters.remove(currentKey)
-            if (currentWriter == null) {
-              currentWriter = newOutputWriter(currentKey)
-            }
+            currentWriter.writeInternal(sortedIterator.getValue)
+          }
+          if (currentWriter != null) {
+            currentWriter.close()
+            currentWriter = null
           }
-
-          currentWriter.writeInternal(sortedIterator.getValue)
-        }
-        if (currentWriter != null) {
-          currentWriter.close()
-          currentWriter = null
         }
-      }
 
-      commitTask()
-    } catch {
-      case cause: Throwable =>
-        logError("Aborting task.", cause)
-        // call failure callbacks first, so we could have a chance to cleanup 
the writer.
-        TaskContext.get().asInstanceOf[TaskContextImpl].markTaskFailed(cause)
+        commitTask()
+      }(catchBlock = {
         if (currentWriter != null) {
           currentWriter.close()
         }
         abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
+      })
+    } catch {
+      case t: Throwable =>
+        throw new SparkException("Task failed while writing rows", t)
     }
 
     /** Open and returns a new OutputWriter given a partition key. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to