Hi all, Is anyone reading and writing to .bz2 files stored in HDFS from Spark with success?
I'm finding the following results on a recent commit (756c96 from 24hr ago) and CDH 4.4.0: Works: val r = sc.textFile("/user/aa/myfile.bz2").count Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) => s+"| " ).count Specifically, I'm getting an exception coming out of the bzip2 libraries (see below stacktraces), which is unusual because I'm able to read from that file without an issue using the same libraries via Pig. It was originally created from Pig as well. Digging a little deeper I found this line in the .bz2 decompressor's javadoc for CBZip2InputStream: "Instances of this class are not threadsafe." [source<http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.html> ] My current working theory is that Spark has a much higher level of parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds exceptions much more frequently (as in can't finish a run over a little 2M row file) vs hardly at all in other libraries. The only other reference I could find to the issue was in presto-users<https://groups.google.com/forum/#!topic/presto-users/6Wos0W2Sh9M>, but the recommendation to leave .bz2 for .lzo doesn't help if I actually do want the higher compression levels of .bz2. Would love to hear if I have some kind of configuration issue or if there's a bug in .bz2 that's fixed in later versions of CDH, or generally any other thoughts on the issue. Thanks! Andrew Below are examples of some exceptions I'm getting: 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to java.lang.ArrayIndexOutOfBoundsException java.lang.ArrayIndexOutOfBoundsException: 65535 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) java.lang.ArrayIndexOutOfBoundsException: 900000 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) java.lang.ArrayIndexOutOfBoundsException: -921878509 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) java.lang.ArrayIndexOutOfBoundsException: -1321104434 at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) at java.io.InputStream.read(InputStream.java:101) at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD.org $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)