Repository: spark Updated Branches: refs/heads/master 9d48bd0b3 -> 4943ea598
[SPARK-22058][CORE] the BufferedInputStream will not be closed if an exception occurs. ## What changes were proposed in this pull request? EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)` occurs an exception . But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, the BufferedInputStream `in` will not be closed anymore. ## How was this patch tested? exist tests Author: zuotingbing <zuo.tingbi...@zte.com.cn> Closes #19277 from zuotingbing/SPARK-22058. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4943ea59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4943ea59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4943ea59 Branch: refs/heads/master Commit: 4943ea59840a894ca47d241fe68d520f1e97fa56 Parents: 9d48bd0 Author: zuotingbing <zuo.tingbi...@zte.com.cn> Authored: Sun Sep 24 09:38:46 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Sun Sep 24 09:38:46 2017 +0100 ---------------------------------------------------------------------- .../org/apache/spark/scheduler/EventLoggingListener.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4943ea59/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 00ab2a3..9dafa0b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -351,14 +351,14 @@ private[spark] object EventLoggingListener extends Logging { // Since we sanitize the app ID to not include periods, it is safe to split on it val logName = log.getName.stripSuffix(IN_PROGRESS) val codecName: Option[String] = logName.split("\\.").tail.lastOption - val codec = codecName.map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) - } try { + val codec = codecName.map { c => + codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch { - case e: Exception => + case e: Throwable => in.close() throw e } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org