Repository: spark
Updated Branches:
  refs/heads/branch-2.2 0bd918f67 -> 82ae1f0ac


[SPARK-20716][SS] StateStore.abort() should not throw exceptions

## What changes were proposed in this pull request?

StateStore.abort() should do a best effort attempt to clean up temporary 
resources. It should not throw errors, especially because its called in a 
TaskCompletionListener, because this error could hide previous real errors in 
the task.

## How was this patch tested?
No unit test.

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #17958 from tdas/SPARK-20716.

(cherry picked from commit 271175e2bd0f7887a068db92de73eff60f5ef2b2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82ae1f0a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82ae1f0a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82ae1f0a

Branch: refs/heads/branch-2.2
Commit: 82ae1f0aca9c00fddba130c144adfe0777172cc8
Parents: 0bd918f
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon May 15 10:46:38 2017 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon May 15 10:46:45 2017 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 22 ++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82ae1f0a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 1426728..fb2bf47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming.state
 
 import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+import java.nio.channels.ClosedChannelException
 import java.util.Locale
 
 import scala.collection.JavaConverters._
@@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
     /** Abort all the updates made on this store. This store will not be 
usable any more. */
     override def abort(): Unit = {
       verify(state == UPDATING || state == ABORTED, "Cannot abort after 
already committed")
+      try {
+        state = ABORTED
+        if (tempDeltaFileStream != null) {
+          tempDeltaFileStream.close()
+        }
+        if (tempDeltaFile != null) {
+          fs.delete(tempDeltaFile, true)
+        }
+      } catch {
+        case c: ClosedChannelException =>
+          // This can happen when underlying file output stream has been 
closed before the
+          // compression stream.
+          logDebug(s"Error aborting version $newVersion into $this", c)
 
-      state = ABORTED
-      if (tempDeltaFileStream != null) {
-        tempDeltaFileStream.close()
-      }
-      if (tempDeltaFile != null) {
-        fs.delete(tempDeltaFile, true)
+        case e: Exception =>
+          logWarning(s"Error aborting version $newVersion into $this", e)
       }
       logInfo(s"Aborted version $newVersion for $this")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to