My suggestion is that you change the Spark setting which controls the
compression codec that Spark uses for internal data transfers. Set
spark.io.compression.codec
to lzf in your SparkConf.

On Mon, Jun 1, 2015 at 8:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> Hello Josh,
> Are you suggesting to store the source data in LZF compression and use the
> same Spark code as is ?
> Currently its stored in sequence file format and compressed with GZIP.
>
> First line of the data:
>
> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'
> org.apache.hadoop.io.compress.GzipCodec?v?
> )
>
> Regards,
> Deepak
>
> On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen <rosenvi...@gmail.com> wrote:
>
>> If you can't run a patched Spark version, then you could also consider
>> using LZF compression instead, since that codec isn't affected by this bug.
>>
>> On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or <and...@databricks.com> wrote:
>>
>>> Hi Deepak,
>>>
>>> This is a notorious bug that is being tracked at
>>> https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one
>>> source of this bug (it turns out Snappy had a bug in buffer reuse that
>>> caused data corruption). There are other known sources that are being
>>> addressed in outstanding patches currently.
>>>
>>> Since you're using 1.3.1 my guess is that you don't have this patch:
>>> https://github.com/apache/spark/pull/6176, which I believe should fix
>>> the issue in your case. It's merged for 1.3.2 (not yet released) but not in
>>> time for 1.3.1, so feel free to patch it yourself and see if it works.
>>>
>>> -Andrew
>>>
>>>
>>> 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>>
>>>> Any suggestions ?
>>>>
>>>> I using Spark 1.3.1 to read   sequence file stored in Sequence File
>>>> format
>>>> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v?
>>>> )
>>>>
>>>> with this code and settings
>>>> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new
>>>> org.apache.spark.HashPartitioner(2053))
>>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>>       .set("spark.kryoserializer.buffer.mb",
>>>> arguments.get("buffersize").get)
>>>>       .set("spark.kryoserializer.buffer.max.mb",
>>>> arguments.get("maxbuffersize").get)
>>>>       .set("spark.driver.maxResultSize",
>>>> arguments.get("maxResultSize").get)
>>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>>       //.set("spark.akka.askTimeout", arguments.get("askTimeout").get)
>>>>       //.set("spark.akka.timeout", arguments.get("akkaTimeout").get)
>>>>       //.set("spark.worker.timeout", arguments.get("workerTimeout").get)
>>>>
>>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))
>>>>
>>>>
>>>> and values are
>>>> buffersize=128 maxbuffersize=1068 maxResultSize=200G
>>>>
>>>>
>>>> And i see this exception in each executor task
>>>>
>>>> FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com,
>>>> 54757), shuffleId=6, mapId=2810, reduceId=1117, message=
>>>>
>>>> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
>>>>
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>>
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>
>>>> at
>>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>>
>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>
>>>> at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>
>>>> at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>>
>>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>
>>>> at
>>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>>
>>>> at
>>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>>
>>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>>
>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>
>>>> at
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)*
>>>>
>>>> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>>>>
>>>> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>>>
>>>> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
>>>>
>>>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
>>>>
>>>> at
>>>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
>>>>
>>>> at
>>>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
>>>>
>>>> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
>>>>
>>>> at
>>>> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
>>>>
>>>> at
>>>> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165)
>>>>
>>>> at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
>>>>
>>>> at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
>>>>
>>>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>>>>
>>>> at scala.util.Try$.apply(Try.scala:161)
>>>>
>>>> at scala.util.Success.map(Try.scala:206)
>>>>
>>>> at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>>>>
>>>> at
>>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>>>>
>>>> ... 18 more
>>>>
>>>
>>>
>>
>
>
> --
> Deepak
>
>

Reply via email to