Repository: spark Updated Branches: refs/heads/master 877f82cb3 -> ec506bd30
[SPARK-26283][CORE] Enable reading from open frames of zstd, when reading zstd compressed eventLog ## What changes were proposed in this pull request? Root cause: Prior to Spark2.4, When we enable zst for eventLog compression, for inprogress application, It always throws exception in the Application UI, when we open from the history server. But after 2.4 it will display the UI information based on the completed frames in the zstd compressed eventLog. But doesn't read incomplete frames for inprogress application. In this PR, we have added 'setContinous(true)' for reading input stream from eventLog, so that it can read from open frames also. (By default 'isContinous=false' for zstd inputStream and when we try to read an open frame, it throws truncated error) ## How was this patch tested? Test steps: 1) Add the configurations in the spark-defaults.conf (i) spark.eventLog.compress true (ii) spark.io.compression.codec zstd 2) Restart history server 3) bin/spark-shell 4) sc.parallelize(1 to 1000, 1000).count 5) Open app UI from the history server UI **Before fix** ![screenshot from 2018-12-06 00-01-38](https://user-images.githubusercontent.com/23054875/49537340-bfe28b00-f8ee-11e8-9fca-6d42fdc89e1a.png) **After fix:** ![screenshot from 2018-12-06 00-34-39](https://user-images.githubusercontent.com/23054875/49537353-ca9d2000-f8ee-11e8-803d-645897b9153b.png) Closes #23241 from shahidki31/zstdEventLog. Authored-by: Shahid <shahidk...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec506bd3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec506bd3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec506bd3 Branch: refs/heads/master Commit: ec506bd30c2ca324c12c9ec811764081c2eb8c42 Parents: 877f82c Author: Shahid <shahidk...@gmail.com> Authored: Sun Dec 9 11:44:16 2018 -0600 Committer: Sean Owen <sean.o...@databricks.com> Committed: Sun Dec 9 11:44:16 2018 -0600 ---------------------------------------------------------------------- .../scala/org/apache/spark/io/CompressionCodec.scala | 12 ++++++++++++ .../apache/spark/scheduler/EventLoggingListener.scala | 2 +- .../org/apache/spark/scheduler/ReplayListenerBus.scala | 2 -- 3 files changed, 13 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 0664c5a..c4f4b18 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -43,6 +43,10 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream def compressedInputStream(s: InputStream): InputStream + + private[spark] def compressedContinuousInputStream(s: InputStream): InputStream = { + compressedInputStream(s) + } } private[spark] object CompressionCodec { @@ -197,4 +201,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def compressedContinuousInputStream(s: InputStream): InputStream = { + // SPARK-26283: Enable reading from open frames of zstd (for eg: zstd compressed eventLog + // Reading). By default `isContinuous` is false, and when we try to read from open frames, + // `compressedInputStream` method above throws truncated error exception. This method set + // `isContinuous` true to allow reading from open frames. + new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 5f697fe..069a91f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -402,7 +402,7 @@ private[spark] object EventLoggingListener extends Logging { val codec = codecName(log).map { c => codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) } - codec.map(_.compressedInputStream(in)).getOrElse(in) + codec.map(_.compressedContinuousInputStream(in)).getOrElse(in) } catch { case e: Throwable => in.close() http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 4c6b0c1..226c237 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => - logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => throw ioe case e: Exception => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org