Hi Andrew,

Could you try varying the minPartitions parameter? For example:

val r = sc.textFile("/user/aa/myfile.bz2", 4).count
val r = sc.textFile("/user/aa/myfile.bz2", 8).count

Best,
Xiangrui

On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <men...@gmail.com> wrote:
> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> the problem you described, but it does contain several fixes to bzip2
> format. -Xiangrui
>
> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <and...@andrewash.com> wrote:
>> Hi all,
>>
>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>> success?
>>
>>
>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>> and CDH 4.4.0:
>>
>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>> s+"| " ).count
>>
>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>> (see below stacktraces), which is unusual because I'm able to read from that
>> file without an issue using the same libraries via Pig.  It was originally
>> created from Pig as well.
>>
>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>> for CBZip2InputStream:
>>
>> "Instances of this class are not threadsafe." [source]
>>
>>
>> My current working theory is that Spark has a much higher level of
>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>> exceptions much more frequently (as in can't finish a run over a little 2M
>> row file) vs hardly at all in other libraries.
>>
>> The only other reference I could find to the issue was in presto-users, but
>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>> the higher compression levels of .bz2.
>>
>>
>> Would love to hear if I have some kind of configuration issue or if there's
>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>> thoughts on the issue.
>>
>>
>> Thanks!
>> Andrew
>>
>>
>>
>> Below are examples of some exceptions I'm getting:
>>
>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ArrayIndexOutOfBoundsException
>> java.lang.ArrayIndexOutOfBoundsException: 65535
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>         at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>         at java.io.InputStream.read(InputStream.java:101)
>>         at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: 900000
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>         at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>         at java.io.InputStream.read(InputStream.java:101)
>>         at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: -921878509
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>         at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
>>         at java.io.InputStream.read(InputStream.java:101)
>>         at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: -1321104434
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>         at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>         at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>         at java.io.InputStream.read(InputStream.java:101)
>>         at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>         at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>         at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)

Reply via email to