My suggestion is that you change the Spark setting which controls the
compression codec that Spark uses for internal data transfers. Set
to lzf in your SparkConf.

> Hello Josh,
> Are you suggesting to store the source data in LZF compression and use the
> same Spark code as is ?
> Currently its stored in sequence file format and compressed with GZIP.
> First line of the data:
> ('
> )
>> If you can't run a patched Spark version, then you could also consider
>> using LZF compression instead, since that codec isn't affected by this bug.
>>> This is a notorious bug that is being tracked at
>>> We have fixed one
>>> source of this bug (it turns out Snappy had a bug in buffer reuse that
>>> caused data corruption). There are other known sources that are being
>>> addressed in outstanding patches currently.
>>> Since you're using 1.3.1 my guess is that you don't have this patch:
>>>, which I believe should fix
>>> the issue in your case. It's merged for 1.3.2 (not yet released) but not in
>>> time for 1.3.1, so feel free to patch it yourself and see if it works.
>>>> Any suggestions ?
>>>> I using Spark 1.3.1 to read   sequence file stored in Sequence File
>>>> format
>>>> ('
>>>> )
>>>> with this code and settings
>>>> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new
>>>> org.apache.spark.HashPartitioner(2053))
>>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>>       .set("spark.kryoserializer.buffer.mb",
>>>> arguments.get("buffersize").get)
>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>> arguments.get("maxbuffersize").get)
>>>>       .set("spark.driver.maxResultSize",
>>>> arguments.get("maxResultSize").get)
>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>       //.set("spark.akka.askTimeout", arguments.get("askTimeout").get)
>>>>       //.set("spark.akka.timeout", arguments.get("akkaTimeout").get)
>>>>       //.set("spark.worker.timeout", arguments.get("workerTimeout").get)
>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))
>>>> and values are
>>>> buffersize=128 maxbuffersize=1068 maxResultSize=200G
>>>> And i see this exception in each executor task
>>>> FetchFailed(BlockManagerId(278,,
>>>> 54757), shuffleId=6, mapId=2810, reduceId=1117, message=
>>>> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>> at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>> at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>> at
>>>> at
>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>> at
>>>> at org.apache.spark.executor.Executor$
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$
>>>> at
>>>> *Caused by: FAILED_TO_UNCOMPRESS(5)*
>>>> at org.xerial.snappy.SnappyNative.throw_error(
>>>> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>>> at org.xerial.snappy.Snappy.rawUncompress(
>>>> at org.xerial.snappy.Snappy.uncompress(
>>>> at
>>>> org.xerial.snappy.SnappyInputStream.readFully(
>>>> at
>>>> org.xerial.snappy.SnappyInputStream.readHeader(
>>>> at org.xerial.snappy.SnappyInputStream.<init>(
>>>> at
>>>> at
>>>> at
>>>> at
>>>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>>>> at scala.util.Try$.apply(Try.scala:161)
>>>> at
>>>> at
>>>> at
>>>> ... 18 more
