Repository: spark
Updated Branches:
  refs/heads/master 9d48bd0b3 -> 4943ea598


[SPARK-22058][CORE] the BufferedInputStream will not be closed if an exception 
occurs.

## What changes were proposed in this pull request?

EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and 
will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)`  occurs 
an exception .
But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, 
the BufferedInputStream `in` will not be closed anymore.

## How was this patch tested?

exist tests

Author: zuotingbing <zuo.tingbi...@zte.com.cn>

Closes #19277 from zuotingbing/SPARK-22058.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4943ea59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4943ea59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4943ea59

Branch: refs/heads/master
Commit: 4943ea59840a894ca47d241fe68d520f1e97fa56
Parents: 9d48bd0
Author: zuotingbing <zuo.tingbi...@zte.com.cn>
Authored: Sun Sep 24 09:38:46 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Sep 24 09:38:46 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/EventLoggingListener.scala    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4943ea59/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 00ab2a3..9dafa0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends 
Logging {
     // Since we sanitize the app ID to not include periods, it is safe to 
split on it
     val logName = log.getName.stripSuffix(IN_PROGRESS)
     val codecName: Option[String] = logName.split("\\.").tail.lastOption
-    val codec = codecName.map { c =>
-      codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, 
c))
-    }
 
     try {
+      val codec = codecName.map { c =>
+        codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new 
SparkConf, c))
+      }
       codec.map(_.compressedInputStream(in)).getOrElse(in)
     } catch {
-      case e: Exception =>
+      case e: Throwable =>
         in.close()
         throw e
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to