[ 
https://issues.apache.org/jira/browse/SPARK-25318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reza Safi updated SPARK-25318:
------------------------------
    Description: 
SPARK-4105 provided a solution to block corruption issue by retrying the fetch 
or the stage. In the solution there is a step that wraps the input stream with 
compression and/or encryption. This step is prone to exceptions, but in the 
current code there is no exception handling for this step and this has caused 
confusion for the user. In fact we have customers who reported an exception 
like the following when SPARK-4105 is available to them:
{noformat}
2018-08-28 22:35:54,361 ERROR [Driver] 
org.apache.spark.deploy.yarn.ApplicationMaster:95 User class threw exception: 
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to 
       stage failure: Task 452 in stage 209.0 failed 4 times, most recent 
failure: Lost task 452.3 in stage y.0 (TID z, xxxxx, executor xx): 
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
  3976         at 
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
  3977         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
  3978         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:395)
  3979         at org.xerial.snappy.Snappy.uncompress(Snappy.java:431)
  3980         at 
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
  3981         at 
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
  3982         at 
org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
  3983         at 
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
  3984         at 
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1219)
  3985         at 
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:48)
  3986         at 
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:47)
  3987         at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:328)
  3988         at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:55)
  3989         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  3990         a
{noformat}
In this customer's version of spark, line 328 of 
ShuffleBlockFetcherIterator.scala is the line that the following occurs:
{noformat}
input = streamWrapper(blockId, in)
{noformat}
It would be nice to add exception handling around this line to avoid confusions.

  was:
SPARK-4105 provided a solution to block corruption issue by retrying the fetch 
or the stage. In the solution there is a step that wraps the input stream with 
compression and/or encryption. This step is prune to exceptions, but in the 
current code there is no exception handling for this step and this has caused 
confusion for the user.. In fact we have customers who reported an exception 
like the following when SPARK-4105 is available to them:
{noformat}
2018-08-28 22:35:54,361 ERROR [Driver] 
org.apache.spark.deploy.yarn.ApplicationMaster:95 User class threw exception: 
java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due to 
       stage failure: Task 452 in stage 209.0 failed 4 times, most recent 
failure: Lost task 452.3 in stage y.0 (TID z, xxxxx, executor xx): 
java.io.IOException: FAILED_TO_UNCOMPRESS(5)
  3976         at 
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
  3977         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
  3978         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:395)
  3979         at org.xerial.snappy.Snappy.uncompress(Snappy.java:431)
  3980         at 
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
  3981         at 
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
  3982         at 
org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
  3983         at 
org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
  3984         at 
org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1219)
  3985         at 
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:48)
  3986         at 
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:47)
  3987         at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:328)
  3988         at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:55)
  3989         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  3990         a
{noformat}
In this customer's version of spark, line 328 of 
ShuffleBlockFetcherIterator.scala is the line that the following occurs:
{noformat}
input = streamWrapper(blockId, in)
{noformat}
It would be nice to add exception handling around this line to avoid confusions.


> Add exception handling when wrapping the input stream during the the fetch or 
> stage retry in response to a corrupted block
> --------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25318
>                 URL: https://issues.apache.org/jira/browse/SPARK-25318
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.1.3, 2.2.2, 2.3.1, 2.4.0
>            Reporter: Reza Safi
>            Priority: Minor
>
> SPARK-4105 provided a solution to block corruption issue by retrying the 
> fetch or the stage. In the solution there is a step that wraps the input 
> stream with compression and/or encryption. This step is prone to exceptions, 
> but in the current code there is no exception handling for this step and this 
> has caused confusion for the user. In fact we have customers who reported an 
> exception like the following when SPARK-4105 is available to them:
> {noformat}
> 2018-08-28 22:35:54,361 ERROR [Driver] 
> org.apache.spark.deploy.yarn.ApplicationMaster:95 User class threw exception: 
> java.lang.RuntimeException: org.apache.spark.SparkException: Job aborted due 
> to        stage failure: Task 452 in stage 209.0 failed 4 times, most recent 
> failure: Lost task 452.3 in stage y.0 (TID z, xxxxx, executor xx): 
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
>   3976         at 
> org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
>   3977         at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>   3978         at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:395)
>   3979         at org.xerial.snappy.Snappy.uncompress(Snappy.java:431)
>   3980         at 
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:127)
>   3981         at 
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
>   3982         at 
> org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
>   3983         at 
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:159)
>   3984         at 
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1219)
>   3985         at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:48)
>   3986         at 
> org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:47)
>   3987         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:328)
>   3988         at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:55)
>   3989         at 
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>   3990         a
> {noformat}
> In this customer's version of spark, line 328 of 
> ShuffleBlockFetcherIterator.scala is the line that the following occurs:
> {noformat}
> input = streamWrapper(blockId, in)
> {noformat}
> It would be nice to add exception handling around this line to avoid 
> confusions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to