[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23241


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239618167
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

Updated the code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239614561
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

Thanks @vanzin for the review. 

It seems we need both the methods in the ZstdCompressionCodec class, like 
in the previous change. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239609592
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

> Is it actually desirable to not fail on a partial frame?

If you're reading a shuffle file compressed with zstd, and the shuffle file 
is corrupted somehow, this change may be allowing Spark to read incomplete 
shuffle data...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239590339
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
--- End diff --

OK, this could be OK, if this was really added only to address what you are 
fixing here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239534469
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
-logWarning(s"Failed to read Spark event log: $sourceName")
   case ioe: IOException =>
-throw ioe
+if (maybeTruncated) {
--- End diff --

Thanks. updated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239532748
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
-logWarning(s"Failed to read Spark event log: $sourceName")
   case ioe: IOException =>
-throw ioe
+if (maybeTruncated) {
--- End diff --

Oh I see. Actually, what about just removing the second case? it's simpler 
to just let it throw.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239530668
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
-logWarning(s"Failed to read Spark event log: $sourceName")
   case ioe: IOException =>
-throw ioe
+if (maybeTruncated) {
--- End diff --

Yes. I think, I simplified it in one block.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239525888
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,10 +118,12 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
-logWarning(s"Failed to read Spark event log: $sourceName")
   case ioe: IOException =>
-throw ioe
+if (maybeTruncated) {
--- End diff --

I think this was already the behavior? if it doesn't match the 'if' it 
would just throw anyway


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239521593
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

I have updated the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239516496
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

Thanks @srowen . 

> Is it actually desirable to not fail on a partial frame? I'm not sure. We 
shouldn't encounter it elsewhere.
Yes. Ideally it shouldn't fail. Even for EventLoggingListener if the 
application is finished, the frame will close (That is why it is applicable for 
only running application). After analyzing again the zstd code, the impact 
seems lesser "Either throw exception or read the frame", and latter seems 
better.
I can update the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239509724
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

That's what I'm wondering about. Is it actually desirable to not fail on a 
partial frame? I'm not sure. We *shouldn't* encounter it elsewhere.

This changes a developer API, but may not even be a breaking change as 
there is a default implementation. We can take breaking changes in Spark 3 
though.

I think I agree with your approach here in the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239496266
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
--- End diff --

This was added for zstd incomplete frame reading issue. But after this 
change, that issue is no longer happens. Yes. we can keep as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread shahidki31
Github user shahidki31 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239494478
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

Hi @srowen Thanks for the comment. 
Yes. This parameter is, by default false, intended for continuous stream 
reading. So, if some classes doesn't need continuously read the data, do we 
need to set `isContinuous` as true.

 This method is called by other classes, like 'UnsafeShuffleWriter' etc. 
which are performance sensitive. If it try to read from the open frames, this 
issue (read error exception) will happen in other classes as well. But, other 
than from 'EventLoggingListener', this issue hasn't reported. That is why, I 
tried to limit it to the EventLoggingListener call. 

Yes. If we do 'continuous' true for all, then this code will be much 
simplified. 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239476570
  
--- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
@@ -197,4 +201,8 @@ class ZStdCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 // avoid overhead excessive of JNI call while trying to uncompress 
small amount of data.
 new BufferedInputStream(new ZstdInputStream(s), bufferSize)
   }
+
+  override def zstdEventLogCompressedInputStream(s: InputStream): 
InputStream = {
+new BufferedInputStream(new ZstdInputStream(s).setContinuous(true), 
bufferSize)
--- End diff --

BTW it seems like 'continuous' changes behavior very little: 
https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/ZstdInputStream.java#L147
 I agree with your concern to keep the change minimal. I'm trying to think if 
this would break anything if everything were read as 'continuous'. It wouldn't 
fail fast in some case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23241: [SPARK-26283][CORE] Enable reading from open fram...

2018-12-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/23241#discussion_r239476672
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala ---
@@ -118,8 +118,6 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   case e: HaltReplayException =>
 // Just stop replay.
   case _: EOFException if maybeTruncated =>
-  case _: IOException if maybeTruncated =>
--- End diff --

Can this still happen for non-zstd compression though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org