Hi Andrew, This is the JIRA I created: https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully someone wants to work on it.
Best, Xiangrui On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng <men...@gmail.com> wrote: > Hi Andre, > > I could reproduce the bug with Hadoop 2.2.0. Some older version of > Hadoop do not support splittable compression, so you ended up with > sequential reads. It is easy to reproduce the bug with the following > setup: > > 1) Workers are configured with multiple cores. > 2) BZip2 files are big enough or minPartitions is large enough when > you load the file via sc.textFile(), so that one worker has more than > one tasks. > > Best, > Xiangrui > > On Fri, May 16, 2014 at 4:06 PM, Andrew Ash <and...@andrewash.com> wrote: >> Hi Xiangrui, >> >> // FYI I'm getting your emails late due to the Apache mailing list outage >> >> I'm using CDH4.4.0, which I think uses the MapReduce v2 API. The .jars are >> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar >> >> I'm also glad you were able to reproduce! Please paste a link to the Hadoop >> bug you file so I can follow along. >> >> Thanks! >> Andrew >> >> >> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <men...@gmail.com> wrote: >>> >>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes >>> the problem you described, but it does contain several fixes to bzip2 >>> format. -Xiangrui >>> >>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <and...@andrewash.com> wrote: >>> > Hi all, >>> > >>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark >>> > with >>> > success? >>> > >>> > >>> > I'm finding the following results on a recent commit (756c96 from 24hr >>> > ago) >>> > and CDH 4.4.0: >>> > >>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count >>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) >>> > => >>> > s+"| " ).count >>> > >>> > Specifically, I'm getting an exception coming out of the bzip2 libraries >>> > (see below stacktraces), which is unusual because I'm able to read from >>> > that >>> > file without an issue using the same libraries via Pig. It was >>> > originally >>> > created from Pig as well. >>> > >>> > Digging a little deeper I found this line in the .bz2 decompressor's >>> > javadoc >>> > for CBZip2InputStream: >>> > >>> > "Instances of this class are not threadsafe." [source] >>> > >>> > >>> > My current working theory is that Spark has a much higher level of >>> > parallelism than Pig/Hadoop does and thus I get these wild >>> > IndexOutOfBounds >>> > exceptions much more frequently (as in can't finish a run over a little >>> > 2M >>> > row file) vs hardly at all in other libraries. >>> > >>> > The only other reference I could find to the issue was in presto-users, >>> > but >>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do >>> > want >>> > the higher compression levels of .bz2. >>> > >>> > >>> > Would love to hear if I have some kind of configuration issue or if >>> > there's >>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any >>> > other >>> > thoughts on the issue. >>> > >>> > >>> > Thanks! >>> > Andrew >>> > >>> > >>> > >>> > Below are examples of some exceptions I'm getting: >>> > >>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to >>> > java.lang.ArrayIndexOutOfBoundsException >>> > java.lang.ArrayIndexOutOfBoundsException: 65535 >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> > at >>> > >>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> > at java.io.InputStream.read(InputStream.java:101) >>> > at >>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> > at >>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> > at >>> > >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> > at >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> > >>> > >>> > >>> > >>> > java.lang.ArrayIndexOutOfBoundsException: 900000 >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> > at >>> > >>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> > at java.io.InputStream.read(InputStream.java:101) >>> > at >>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> > at >>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> > at >>> > >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> > at >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> > at >>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> > at >>> > >>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> > at >>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> > at >>> > >>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>> > >>> > >>> > >>> > java.lang.ArrayIndexOutOfBoundsException: -921878509 >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> > at >>> > >>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432) >>> > at java.io.InputStream.read(InputStream.java:101) >>> > at >>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> > at >>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> > at >>> > >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> > at >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> > at >>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> > at >>> > >>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> > at >>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> > at >>> > >>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>> > at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879) >>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>> > at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548) >>> > >>> > >>> > >>> > java.lang.ArrayIndexOutOfBoundsException: -1321104434 >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) >>> > at >>> > >>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397) >>> > at >>> > >>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426) >>> > at java.io.InputStream.read(InputStream.java:101) >>> > at >>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) >>> > at >>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) >>> > at >>> > >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203) >>> > at >>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198) >>> > at >>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181) >>> > at >>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>> > at >>> > >>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at >>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>> > at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>> > at >>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>> > at >>> > >>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868) >> >>