Hi Deepak,

This is a notorious bug that is being tracked at
https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one source
of this bug (it turns out Snappy had a bug in buffer reuse that caused data
corruption). There are other known sources that are being addressed in
outstanding patches currently.

Since you're using 1.3.1 my guess is that you don't have this patch:
https://github.com/apache/spark/pull/6176, which I believe should fix the
issue in your case. It's merged for 1.3.2 (not yet released) but not in
time for 1.3.1, so feel free to patch it yourself and see if it works.

-Andrew


2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:

> Any suggestions ?
>
> I using Spark 1.3.1 to read   sequence file stored in Sequence File format
> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v?
> )
>
> with this code and settings
> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new
> org.apache.spark.HashPartitioner(2053))
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryoserializer.buffer.mb",
> arguments.get("buffersize").get)
>       .set("spark.kryoserializer.buffer.max.mb",
> arguments.get("maxbuffersize").get)
>       .set("spark.driver.maxResultSize",
> arguments.get("maxResultSize").get)
>       .set("spark.yarn.maxAppAttempts", "0")
>       //.set("spark.akka.askTimeout", arguments.get("askTimeout").get)
>       //.set("spark.akka.timeout", arguments.get("akkaTimeout").get)
>       //.set("spark.worker.timeout", arguments.get("workerTimeout").get)
>
> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))
>
>
> and values are
> buffersize=128 maxbuffersize=1068 maxResultSize=200G
>
>
> And i see this exception in each executor task
>
> FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com,
> 54757), shuffleId=6, mapId=2810, reduceId=1117, message=
>
> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
>
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
> at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)*
>
> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
>
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
>
> at
> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
>
> at
> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
>
> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
>
> at
> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
>
> at
> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165)
>
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
>
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
>
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at scala.util.Success.map(Try.scala:206)
>
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>
> ... 18 more
>

Reply via email to