Hi all,

Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
success?


I'm finding the following results on a recent commit (756c96 from 24hr ago)
and CDH 4.4.0:

Works: val r = sc.textFile("/user/aa/myfile.bz2").count
Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
s+"| " ).count

Specifically, I'm getting an exception coming out of the bzip2 libraries
(see below stacktraces), which is unusual because I'm able to read from
that file without an issue using the same libraries via Pig.  It was
originally created from Pig as well.

Digging a little deeper I found this line in the .bz2 decompressor's
javadoc for CBZip2InputStream:

"Instances of this class are not threadsafe."
[source<http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.html>
]


My current working theory is that Spark has a much higher level of
parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
exceptions much more frequently (as in can't finish a run over a little 2M
row file) vs hardly at all in other libraries.

The only other reference I could find to the issue was in
presto-users<https://groups.google.com/forum/#!topic/presto-users/6Wos0W2Sh9M>,
but
the recommendation to leave .bz2 for .lzo doesn't help if I actually do
want the higher compression levels of .bz2.


Would love to hear if I have some kind of configuration issue or if there's
a bug in .bz2 that's fixed in later versions of CDH, or generally any other
thoughts on the issue.


Thanks!
Andrew



Below are examples of some exceptions I'm getting:

14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 65535
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
        at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
        at java.io.InputStream.read(InputStream.java:101)
        at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




java.lang.ArrayIndexOutOfBoundsException: 900000
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
        at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
        at java.io.InputStream.read(InputStream.java:101)
        at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD.org
$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)



java.lang.ArrayIndexOutOfBoundsException: -921878509
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
        at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
        at java.io.InputStream.read(InputStream.java:101)
        at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD.org
$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
        at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
        at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
        at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
        at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)



java.lang.ArrayIndexOutOfBoundsException: -1321104434
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
        at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
        at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
        at java.io.InputStream.read(InputStream.java:101)
        at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
        at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
        at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.rdd.RDD.org
$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)

Reply via email to