If you can't run a patched Spark version, then you could also consider using LZF compression instead, since that codec isn't affected by this bug.
On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or <and...@databricks.com> wrote: > Hi Deepak, > > This is a notorious bug that is being tracked at > https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one > source of this bug (it turns out Snappy had a bug in buffer reuse that > caused data corruption). There are other known sources that are being > addressed in outstanding patches currently. > > Since you're using 1.3.1 my guess is that you don't have this patch: > https://github.com/apache/spark/pull/6176, which I believe should fix the > issue in your case. It's merged for 1.3.2 (not yet released) but not in > time for 1.3.1, so feel free to patch it yourself and see if it works. > > -Andrew > > > 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: > >> Any suggestions ? >> >> I using Spark 1.3.1 to read sequence file stored in Sequence File >> format >> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? >> ) >> >> with this code and settings >> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new >> org.apache.spark.HashPartitioner(2053)) >> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") >> .set("spark.kryoserializer.buffer.mb", >> arguments.get("buffersize").get) >> .set("spark.kryoserializer.buffer.max.mb", >> arguments.get("maxbuffersize").get) >> .set("spark.driver.maxResultSize", >> arguments.get("maxResultSize").get) >> .set("spark.yarn.maxAppAttempts", "0") >> //.set("spark.akka.askTimeout", arguments.get("askTimeout").get) >> //.set("spark.akka.timeout", arguments.get("akkaTimeout").get) >> //.set("spark.worker.timeout", arguments.get("workerTimeout").get) >> >> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) >> >> >> and values are >> buffersize=128 maxbuffersize=1068 maxResultSize=200G >> >> >> And i see this exception in each executor task >> >> FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, >> 54757), shuffleId=6, mapId=2810, reduceId=1117, message= >> >> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >> >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at >> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >> >> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) >> >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >> >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >> >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:745) >> >> *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* >> >> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) >> >> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) >> >> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) >> >> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) >> >> at >> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) >> >> at >> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) >> >> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) >> >> at >> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) >> >> at >> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) >> >> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) >> >> at scala.util.Try$.apply(Try.scala:161) >> >> at scala.util.Success.map(Try.scala:206) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) >> >> ... 18 more >> > >