Hello Josh,
Are you suggesting to store the source data in LZF compression and use the
same Spark code as is ?
Currently its stored in sequence file format and compressed with GZIP.

First line of the data:

(SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'
org.apache.hadoop.io.compress.GzipCodec?v?
)

Regards,
Deepak

On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen <rosenvi...@gmail.com> wrote:

> If you can't run a patched Spark version, then you could also consider
> using LZF compression instead, since that codec isn't affected by this bug.
>
> On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or <and...@databricks.com> wrote:
>
>> Hi Deepak,
>>
>> This is a notorious bug that is being tracked at
>> https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one
>> source of this bug (it turns out Snappy had a bug in buffer reuse that
>> caused data corruption). There are other known sources that are being
>> addressed in outstanding patches currently.
>>
>> Since you're using 1.3.1 my guess is that you don't have this patch:
>> https://github.com/apache/spark/pull/6176, which I believe should fix
>> the issue in your case. It's merged for 1.3.2 (not yet released) but not in
>> time for 1.3.1, so feel free to patch it yourself and see if it works.
>>
>> -Andrew
>>
>>
>> 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com>:
>>
>>> Any suggestions ?
>>>
>>> I using Spark 1.3.1 to read   sequence file stored in Sequence File
>>> format
>>> (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v?
>>> )
>>>
>>> with this code and settings
>>> sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new
>>> org.apache.spark.HashPartitioner(2053))
>>> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>>       .set("spark.kryoserializer.buffer.mb",
>>> arguments.get("buffersize").get)
>>>       .set("spark.kryoserializer.buffer.max.mb",
>>> arguments.get("maxbuffersize").get)
>>>       .set("spark.driver.maxResultSize",
>>> arguments.get("maxResultSize").get)
>>>       .set("spark.yarn.maxAppAttempts", "0")
>>>       //.set("spark.akka.askTimeout", arguments.get("askTimeout").get)
>>>       //.set("spark.akka.timeout", arguments.get("akkaTimeout").get)
>>>       //.set("spark.worker.timeout", arguments.get("workerTimeout").get)
>>>
>>> .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum]))
>>>
>>>
>>> and values are
>>> buffersize=128 maxbuffersize=1068 maxResultSize=200G
>>>
>>>
>>> And i see this exception in each executor task
>>>
>>> FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com,
>>> 54757), shuffleId=6, mapId=2810, reduceId=1117, message=
>>>
>>> org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5)
>>>
>>> at
>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>>
>>> at
>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>
>>> at
>>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>>
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>
>>> at
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>
>>> at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>
>>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>>>
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>
>>> at
>>> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
>>>
>>> at
>>> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172)
>>>
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79)
>>>
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)*
>>>
>>> at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
>>>
>>> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
>>>
>>> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444)
>>>
>>> at org.xerial.snappy.Snappy.uncompress(Snappy.java:480)
>>>
>>> at
>>> org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135)
>>>
>>> at
>>> org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:92)
>>>
>>> at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:58)
>>>
>>> at
>>> org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:160)
>>>
>>> at
>>> org.apache.spark.storage.BlockManager.wrapForCompression(BlockManager.scala:1165)
>>>
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:301)
>>>
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$4.apply(ShuffleBlockFetcherIterator.scala:300)
>>>
>>> at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
>>>
>>> at scala.util.Try$.apply(Try.scala:161)
>>>
>>> at scala.util.Success.map(Try.scala:206)
>>>
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
>>>
>>> at
>>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>>>
>>> ... 18 more
>>>
>>
>>
>


-- 
Deepak

Reply via email to