[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/23241 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239618167 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- Updated the code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239614561 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- Thanks @vanzin for the review. It seems we need both the methods in the ZstdCompressionCodec class, like in the previous change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239609592 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- > Is it actually desirable to not fail on a partial frame? If you're reading a shuffle file compressed with zstd, and the shuffle file is corrupted somehow, this change may be allowing Spark to read incomplete shuffle data... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239590339 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => --- End diff -- OK, this could be OK, if this was really added only to address what you are fixing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239534469 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => -logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => -throw ioe +if (maybeTruncated) { --- End diff -- Thanks. updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239532748 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => -logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => -throw ioe +if (maybeTruncated) { --- End diff -- Oh I see. Actually, what about just removing the second case? it's simpler to just let it throw. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239530668 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => -logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => -throw ioe +if (maybeTruncated) { --- End diff -- Yes. I think, I simplified it in one block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239525888 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => -logWarning(s"Failed to read Spark event log: $sourceName") case ioe: IOException => -throw ioe +if (maybeTruncated) { --- End diff -- I think this was already the behavior? if it doesn't match the 'if' it would just throw anyway --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239521593 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- I have updated the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239516496 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- Thanks @srowen . > Is it actually desirable to not fail on a partial frame? I'm not sure. We shouldn't encounter it elsewhere. Yes. Ideally it shouldn't fail. Even for EventLoggingListener if the application is finished, the frame will close (That is why it is applicable for only running application). After analyzing again the zstd code, the impact seems lesser "Either throw exception or read the frame", and latter seems better. I can update the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239509724 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- That's what I'm wondering about. Is it actually desirable to not fail on a partial frame? I'm not sure. We *shouldn't* encounter it elsewhere. This changes a developer API, but may not even be a breaking change as there is a default implementation. We can take breaking changes in Spark 3 though. I think I agree with your approach here in the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239496266 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => --- End diff -- This was added for zstd incomplete frame reading issue. But after this change, that issue is no longer happens. Yes. we can keep as it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user shahidki31 commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239494478 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- Hi @srowen Thanks for the comment. Yes. This parameter is, by default false, intended for continuous stream reading. So, if some classes doesn't need continuously read the data, do we need to set `isContinuous` as true. This method is called by other classes, like 'UnsafeShuffleWriter' etc. which are performance sensitive. If it try to read from the open frames, this issue (read error exception) will happen in other classes as well. But, other than from 'EventLoggingListener', this issue hasn't reported. That is why, I tried to limit it to the EventLoggingListener call. Yes. If we do 'continuous' true for all, then this code will be much simplified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239476570 --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala --- @@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { // avoid overhead excessive of JNI call while trying to uncompress small amount of data. new BufferedInputStream(new ZstdInputStream(s), bufferSize) } + + override def zstdEventLogCompressedInputStream(s: InputStream): InputStream = { +new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), bufferSize) --- End diff -- BTW it seems like 'continuous' changes behavior very little: https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStream.java#L147 I agree with your concern to keep the change minimal. I'm trying to think if this would break anything if everything were read as 'continuous'. It wouldn't fail fast in some case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/23241#discussion_r239476672 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala --- @@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case e: HaltReplayException => // Just stop replay. case _: EOFException if maybeTruncated => - case _: IOException if maybeTruncated => --- End diff -- Can this still happen for non-zstd compression though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org