Hello Josh, Are you suggesting to store the source data in LZF compression and use the same Spark code as is ? Currently its stored in sequence file format and compressed with GZIP.
First line of the data: (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text' org.apache.hadoop.io.compress.GzipCodec?v? ) Regards, Deepak On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen <rosenvi...@gmail.com> wrote: > If you can't run a patched Spark version, then you could also consider > using LZF compression instead, since that codec isn't affected by this bug. > > On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or <and...@databricks.com> wrote: > >> Hi Deepak, >> >> This is a notorious bug that is being tracked at >> https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one >> source of this bug (it turns out Snappy had a bug in buffer reuse that >> caused data corruption). There are other known sources that are being >> addressed in outstanding patches currently. >> >> Since you're using 1.3.1 my guess is that you don't have this patch: >> https://github.com/apache/spark/pull/6176, which I believe should fix >> the issue in your case. It's merged for 1.3.2 (not yet released) but not in >> time for 1.3.1, so feel free to patch it yourself and see if it works. >> >> -Andrew >> >> >> 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>: >> >>> Any suggestions ? >>> >>> I using Spark 1.3.1 to read sequence file stored in Sequence File >>> format >>> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? >>> ) >>> >>> with this code and settings >>> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new >>> org.apache.spark.HashPartitioner(2053)) >>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") >>> .set("spark.kryoserializer.buffer.mb", >>> arguments.get("buffersize").get) >>> .set("spark.kryoserializer.buffer.max.mb", >>> arguments.get("maxbuffersize").get) >>> .set("spark.driver.maxResultSize", >>> arguments.get("maxResultSize").get) >>> .set("spark.yarn.maxAppAttempts", "0") >>> //.set("spark.akka.askTimeout", arguments.get("askTimeout").get) >>> //.set("spark.akka.timeout", arguments.get("akkaTimeout").get) >>> //.set("spark.worker.timeout", arguments.get("workerTimeout").get) >>> >>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) >>> >>> >>> and values are >>> buffersize=128 maxbuffersize=1068 maxResultSize=200G >>> >>> >>> And i see this exception in each executor task >>> >>> FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, >>> 54757), shuffleId=6, mapId=2810, reduceId=1117, message= >>> >>> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) >>> >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >>> >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>> >>> at >>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >>> >>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>> >>> at >>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >>> >>> at >>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >>> >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> >>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) >>> >>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> >>> at >>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) >>> >>> at >>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) >>> >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) >>> >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>> >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> >>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>> >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>> >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* >>> >>> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) >>> >>> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) >>> >>> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) >>> >>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) >>> >>> at >>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) >>> >>> at >>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92) >>> >>> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58) >>> >>> at >>> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160) >>> >>> at >>> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165) >>> >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301) >>> >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300) >>> >>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) >>> >>> at scala.util.Try$.apply(Try.scala:161) >>> >>> at scala.util.Success.map(Try.scala:206) >>> >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300) >>> >>> at >>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) >>> >>> ... 18 more >>> >> >> > -- Deepak