Repository: spark
Updated Branches:
  refs/heads/master 877f82cb3 -> ec506bd30


[SPARK-26283][CORE] Enable reading from open frames of zstd, when reading zstd 
compressed eventLog

## What changes were proposed in this pull request?
Root cause: Prior to Spark2.4, When we enable zst for eventLog compression, for 
inprogress application, It always throws exception in the Application UI, when 
we open from the history server. But after 2.4 it will display the UI 
information based on the completed frames in the zstd compressed eventLog. But 
doesn't read incomplete frames for inprogress application.
In this PR, we have added 'setContinous(true)' for reading input stream from 
eventLog, so that it can read from open frames also. (By default 
'isContinous=false' for zstd inputStream and when we try to read an open frame, 
it throws truncated error)

## How was this patch tested?
Test steps:
1) Add the configurations in the spark-defaults.conf
   (i) spark.eventLog.compress true
   (ii) spark.io.compression.codec zstd
2) Restart history server
3) bin/spark-shell
4) sc.parallelize(1 to 1000, 1000).count
5) Open app UI from the history server UI

**Before fix**
![screenshot from 2018-12-06 
00-01-38](https://user-images.githubusercontent.com/23054875/49537340-bfe28b00-f8ee-11e8-9fca-6d42fdc89e1a.png)

**After fix:**
![screenshot from 2018-12-06 
00-34-39](https://user-images.githubusercontent.com/23054875/49537353-ca9d2000-f8ee-11e8-803d-645897b9153b.png)

Closes #23241 from shahidki31/zstdEventLog.

Authored-by: Shahid <shahidk...@gmail.com>
Signed-off-by: Sean Owen <sean.o...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec506bd3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec506bd3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec506bd3

Branch: refs/heads/master
Commit: ec506bd30c2ca324c12c9ec811764081c2eb8c42
Parents: 877f82c
Author: Shahid <shahidk...@gmail.com>
Authored: Sun Dec 9 11:44:16 2018 -0600
Committer: Sean Owen <sean.o...@databricks.com>
Committed: Sun Dec 9 11:44:16 2018 -0600

----------------------------------------------------------------------
 .../scala/org/apache/spark/io/CompressionCodec.scala    | 12 ++++++++++++
 .../apache/spark/scheduler/EventLoggingListener.scala   |  2 +-
 .../org/apache/spark/scheduler/ReplayListenerBus.scala  |  2 --
 3 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0664c5a..c4f4b18 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -43,6 +43,10 @@ trait CompressionCodec {
   def compressedOutputStream(s: OutputStream): OutputStream
 
   def compressedInputStream(s: InputStream): InputStream
+
+  private[spark] def compressedContinuousInputStream(s: InputStream): 
InputStream = {
+    compressedInputStream(s)
+  }
 }
 
 private[spark] object CompressionCodec {
@@ -197,4 +201,12 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
     // avoid overhead excessive of JNI call while trying to uncompress small 
amount of data.
     new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def compressedContinuousInputStream(s: InputStream): InputStream = {
+    // SPARK-26283: Enable reading from open frames of zstd (for eg: zstd 
compressed eventLog
+    // Reading). By default `isContinuous` is false, and when we try to read 
from open frames,
+    // `compressedInputStream` method above throws truncated error exception. 
This method set
+    // `isContinuous` true to allow reading from open frames.
+    new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 5f697fe..069a91f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -402,7 +402,7 @@ private[spark] object EventLoggingListener extends Logging {
       val codec = codecName(log).map { c =>
         codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new 
SparkConf, c))
       }
-      codec.map(_.compressedInputStream(in)).getOrElse(in)
+      codec.map(_.compressedContinuousInputStream(in)).getOrElse(in)
     } catch {
       case e: Throwable =>
         in.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/ec506bd3/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 4c6b0c1..226c237 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
       case e: HaltReplayException =>
         // Just stop replay.
       case _: EOFException if maybeTruncated =>
-      case _: IOException if maybeTruncated =>
-        logWarning(s"Failed to read Spark event log: $sourceName")
       case ioe: IOException =>
         throw ioe
       case e: Exception =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to