We never saw your exception when reading bzip2 files with spark.

But when we wrongly compiled spark against older version of hadoop (was
default in spark), we ended up with sequential reading of bzip2 file,
not taking advantage of block splits to work in parallel.
Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read
in parallel, as expected with a recent hadoop.

http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions

Make sure Spark is compiled against Hadoop v2

André

On 2014-05-13 18:08, Xiangrui Meng wrote:
Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <and...@andrewash.com> wrote:
Hi all,

Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
success?


I'm finding the following results on a recent commit (756c96 from 24hr ago)
and CDH 4.4.0:

Works: val r = sc.textFile("/user/aa/myfile.bz2").count
Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
s+"| " ).count

Specifically, I'm getting an exception coming out of the bzip2 libraries
(see below stacktraces), which is unusual because I'm able to read from that
file without an issue using the same libraries via Pig.  It was originally
created from Pig as well.

Digging a little deeper I found this line in the .bz2 decompressor's javadoc
for CBZip2InputStream:

"Instances of this class are not threadsafe." [source]


My current working theory is that Spark has a much higher level of
parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
exceptions much more frequently (as in can't finish a run over a little 2M
row file) vs hardly at all in other libraries.

The only other reference I could find to the issue was in presto-users, but
the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
the higher compression levels of .bz2.


Would love to hear if I have some kind of configuration issue or if there's
a bug in .bz2 that's fixed in later versions of CDH, or generally any other
thoughts on the issue.


Thanks!
Andrew



Below are examples of some exceptions I'm getting:

14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 65535
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
         at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
         at java.io.InputStream.read(InputStream.java:101)
         at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




java.lang.ArrayIndexOutOfBoundsException: 900000
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
         at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
         at java.io.InputStream.read(InputStream.java:101)
         at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
         at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         at
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)



java.lang.ArrayIndexOutOfBoundsException: -921878509
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
         at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
         at java.io.InputStream.read(InputStream.java:101)
         at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
         at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         at
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)



java.lang.ArrayIndexOutOfBoundsException: -1321104434
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
         at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
         at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
         at java.io.InputStream.read(InputStream.java:101)
         at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
         at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
         at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
         at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
         at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
         at
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)


--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.

Reply via email to