Re: Reading from .bz2 files with Spark

2014-05-19 Thread Andrew Ash
Hi Xiangrui, many thanks to you and Sandy for fixing this issue!


On Fri, May 16, 2014 at 10:23 PM, Xiangrui Meng men...@gmail.com wrote:

 Hi Andrew,

 I submitted a patch and verified it solves the problem. You can
 download the patch from
 https://issues.apache.org/jira/browse/HADOOP-10614 .

 Best,
 Xiangrui

 On Fri, May 16, 2014 at 6:48 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi Andrew,
 
  This is the JIRA I created:
  https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
  someone wants to work on it.
 
  Best,
  Xiangrui
 
  On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi Andre,
 
  I could reproduce the bug with Hadoop 2.2.0. Some older version of
  Hadoop do not support splittable compression, so you ended up with
  sequential reads. It is easy to reproduce the bug with the following
  setup:
 
  1) Workers are configured with multiple cores.
  2) BZip2 files are big enough or minPartitions is large enough when
  you load the file via sc.textFile(), so that one worker has more than
  one tasks.
 
  Best,
  Xiangrui
 
  On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com
 wrote:
  Hi Xiangrui,
 
  // FYI I'm getting your emails late due to the Apache mailing list
 outage
 
  I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The
 .jars are
  named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
 
  I'm also glad you were able to reproduce!  Please paste a link to the
 Hadoop
  bug you file so I can follow along.
 
  Thanks!
  Andrew
 
 
  On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com
 wrote:
 
  Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
  the problem you described, but it does contain several fixes to bzip2
  format. -Xiangrui
 
  On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com
 wrote:
   Hi all,
  
   Is anyone reading and writing to .bz2 files stored in HDFS from
 Spark
   with
   success?
  
  
   I'm finding the following results on a recent commit (756c96 from
 24hr
   ago)
   and CDH 4.4.0:
  
   Works: val r = sc.textFile(/user/aa/myfile.bz2).count
   Doesn't work: val r =
 sc.textFile(/user/aa/myfile.bz2).map((s:String)
   =
   s+|  ).count
  
   Specifically, I'm getting an exception coming out of the bzip2
 libraries
   (see below stacktraces), which is unusual because I'm able to read
 from
   that
   file without an issue using the same libraries via Pig.  It was
   originally
   created from Pig as well.
  
   Digging a little deeper I found this line in the .bz2 decompressor's
   javadoc
   for CBZip2InputStream:
  
   Instances of this class are not threadsafe. [source]
  
  
   My current working theory is that Spark has a much higher level of
   parallelism than Pig/Hadoop does and thus I get these wild
   IndexOutOfBounds
   exceptions much more frequently (as in can't finish a run over a
 little
   2M
   row file) vs hardly at all in other libraries.
  
   The only other reference I could find to the issue was in
 presto-users,
   but
   the recommendation to leave .bz2 for .lzo doesn't help if I
 actually do
   want
   the higher compression levels of .bz2.
  
  
   Would love to hear if I have some kind of configuration issue or if
   there's
   a bug in .bz2 that's fixed in later versions of CDH, or generally
 any
   other
   thoughts on the issue.
  
  
   Thanks!
   Andrew
  
  
  
   Below are examples of some exceptions I'm getting:
  
   14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
   java.lang.ArrayIndexOutOfBoundsException
   java.lang.ArrayIndexOutOfBoundsException: 65535
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
   at
  
  
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
   at
  
  
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
   at java.io.InputStream.read(InputStream.java:101)
   at
  
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
   at
   org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
   at
  
  
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
   at
  
 

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

Could you try varying the minPartitions parameter? For example:

val r = sc.textFile(/user/aa/myfile.bz2, 4).count
val r = sc.textFile(/user/aa/myfile.bz2, 8).count

Best,
Xiangrui

On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:
 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at 

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Andre Bois-Crettez

We never saw your exception when reading bzip2 files with spark.

But when we wrongly compiled spark against older version of hadoop (was
default in spark), we ended up with sequential reading of bzip2 file,
not taking advantage of block splits to work in parallel.
Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read
in parallel, as expected with a recent hadoop.

http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions

Make sure Spark is compiled against Hadoop v2

André

On 2014-05-13 18:08, Xiangrui Meng wrote:

Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:

Hi all,

Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
success?


I'm finding the following results on a recent commit (756c96 from 24hr ago)
and CDH 4.4.0:

Works: val r = sc.textFile(/user/aa/myfile.bz2).count
Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
s+|  ).count

Specifically, I'm getting an exception coming out of the bzip2 libraries
(see below stacktraces), which is unusual because I'm able to read from that
file without an issue using the same libraries via Pig.  It was originally
created from Pig as well.

Digging a little deeper I found this line in the .bz2 decompressor's javadoc
for CBZip2InputStream:

Instances of this class are not threadsafe. [source]


My current working theory is that Spark has a much higher level of
parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
exceptions much more frequently (as in can't finish a run over a little 2M
row file) vs hardly at all in other libraries.

The only other reference I could find to the issue was in presto-users, but
the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
the higher compression levels of .bz2.


Would love to hear if I have some kind of configuration issue or if there's
a bug in .bz2 that's fixed in later versions of CDH, or generally any other
thoughts on the issue.


Thanks!
Andrew



Below are examples of some exceptions I'm getting:

14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ArrayIndexOutOfBoundsException
java.lang.ArrayIndexOutOfBoundsException: 65535
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




java.lang.ArrayIndexOutOfBoundsException: 90
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at 

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Andrew Ash
Hi Xiangrui,

// FYI I'm getting your emails late due to the Apache mailing list outage

I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

I'm also glad you were able to reproduce!  Please paste a link to the
Hadoop bug you file so I can follow along.

Thanks!
Andrew


On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:

 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
  Hi all,
 
  Is anyone reading and writing to .bz2 files stored in HDFS from Spark
 with
  success?
 
 
  I'm finding the following results on a recent commit (756c96 from 24hr
 ago)
  and CDH 4.4.0:
 
  Works: val r = sc.textFile(/user/aa/myfile.bz2).count
  Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String)
 =
  s+|  ).count
 
  Specifically, I'm getting an exception coming out of the bzip2 libraries
  (see below stacktraces), which is unusual because I'm able to read from
 that
  file without an issue using the same libraries via Pig.  It was
 originally
  created from Pig as well.
 
  Digging a little deeper I found this line in the .bz2 decompressor's
 javadoc
  for CBZip2InputStream:
 
  Instances of this class are not threadsafe. [source]
 
 
  My current working theory is that Spark has a much higher level of
  parallelism than Pig/Hadoop does and thus I get these wild
 IndexOutOfBounds
  exceptions much more frequently (as in can't finish a run over a little
 2M
  row file) vs hardly at all in other libraries.
 
  The only other reference I could find to the issue was in presto-users,
 but
  the recommendation to leave .bz2 for .lzo doesn't help if I actually do
 want
  the higher compression levels of .bz2.
 
 
  Would love to hear if I have some kind of configuration issue or if
 there's
  a bug in .bz2 that's fixed in later versions of CDH, or generally any
 other
  thoughts on the issue.
 
 
  Thanks!
  Andrew
 
 
 
  Below are examples of some exceptions I'm getting:
 
  14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ArrayIndexOutOfBoundsException
  java.lang.ArrayIndexOutOfBoundsException: 65535
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
 org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 
 
 
 
  java.lang.ArrayIndexOutOfBoundsException: 90
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
 org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
  at
  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
  at
  org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
  at
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
  at
 
 

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

I verified that this is due to thread safety. I changed
SPARK_WORKER_CORES to 1 in spark-env.sh, so there is only 1 thread per
worker. Then I can load the file without any problem with different
values of minPartitions. I will submit a JIRA to both Spark and
Hadoop.

Best,
Xiangrui

On Thu, May 15, 2014 at 3:48 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Andrew,

 Could you try varying the minPartitions parameter? For example:

 val r = sc.textFile(/user/aa/myfile.bz2, 4).count
 val r = sc.textFile(/user/aa/myfile.bz2, 8).count

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:
 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at 

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andrew,

This is the JIRA I created:
https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
someone wants to work on it.

Best,
Xiangrui

On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Andre,

 I could reproduce the bug with Hadoop 2.2.0. Some older version of
 Hadoop do not support splittable compression, so you ended up with
 sequential reads. It is easy to reproduce the bug with the following
 setup:

 1) Workers are configured with multiple cores.
 2) BZip2 files are big enough or minPartitions is large enough when
 you load the file via sc.textFile(), so that one worker has more than
 one tasks.

 Best,
 Xiangrui

 On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Xiangrui,

 // FYI I'm getting your emails late due to the Apache mailing list outage

 I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
 named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

 I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
 bug you file so I can follow along.

 Thanks!
 Andrew


 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:

 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
  Hi all,
 
  Is anyone reading and writing to .bz2 files stored in HDFS from Spark
  with
  success?
 
 
  I'm finding the following results on a recent commit (756c96 from 24hr
  ago)
  and CDH 4.4.0:
 
  Works: val r = sc.textFile(/user/aa/myfile.bz2).count
  Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String)
  =
  s+|  ).count
 
  Specifically, I'm getting an exception coming out of the bzip2 libraries
  (see below stacktraces), which is unusual because I'm able to read from
  that
  file without an issue using the same libraries via Pig.  It was
  originally
  created from Pig as well.
 
  Digging a little deeper I found this line in the .bz2 decompressor's
  javadoc
  for CBZip2InputStream:
 
  Instances of this class are not threadsafe. [source]
 
 
  My current working theory is that Spark has a much higher level of
  parallelism than Pig/Hadoop does and thus I get these wild
  IndexOutOfBounds
  exceptions much more frequently (as in can't finish a run over a little
  2M
  row file) vs hardly at all in other libraries.
 
  The only other reference I could find to the issue was in presto-users,
  but
  the recommendation to leave .bz2 for .lzo doesn't help if I actually do
  want
  the higher compression levels of .bz2.
 
 
  Would love to hear if I have some kind of configuration issue or if
  there's
  a bug in .bz2 that's fixed in later versions of CDH, or generally any
  other
  thoughts on the issue.
 
 
  Thanks!
  Andrew
 
 
 
  Below are examples of some exceptions I'm getting:
 
  14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ArrayIndexOutOfBoundsException
  java.lang.ArrayIndexOutOfBoundsException: 65535
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
  org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
 
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 
 
 
 
  java.lang.ArrayIndexOutOfBoundsException: 90
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  

Re: Reading from .bz2 files with Spark

2014-05-16 Thread Xiangrui Meng
Hi Andre,

I could reproduce the bug with Hadoop 2.2.0. Some older version of
Hadoop do not support splittable compression, so you ended up with
sequential reads. It is easy to reproduce the bug with the following
setup:

1) Workers are configured with multiple cores.
2) BZip2 files are big enough or minPartitions is large enough when
you load the file via sc.textFile(), so that one worker has more than
one tasks.

Best,
Xiangrui

On Fri, May 16, 2014 at 4:06 PM, Andrew Ash and...@andrewash.com wrote:
 Hi Xiangrui,

 // FYI I'm getting your emails late due to the Apache mailing list outage

 I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
 named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar

 I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
 bug you file so I can follow along.

 Thanks!
 Andrew


 On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng men...@gmail.com wrote:

 Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
 the problem you described, but it does contain several fixes to bzip2
 format. -Xiangrui

 On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
  Hi all,
 
  Is anyone reading and writing to .bz2 files stored in HDFS from Spark
  with
  success?
 
 
  I'm finding the following results on a recent commit (756c96 from 24hr
  ago)
  and CDH 4.4.0:
 
  Works: val r = sc.textFile(/user/aa/myfile.bz2).count
  Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String)
  =
  s+|  ).count
 
  Specifically, I'm getting an exception coming out of the bzip2 libraries
  (see below stacktraces), which is unusual because I'm able to read from
  that
  file without an issue using the same libraries via Pig.  It was
  originally
  created from Pig as well.
 
  Digging a little deeper I found this line in the .bz2 decompressor's
  javadoc
  for CBZip2InputStream:
 
  Instances of this class are not threadsafe. [source]
 
 
  My current working theory is that Spark has a much higher level of
  parallelism than Pig/Hadoop does and thus I get these wild
  IndexOutOfBounds
  exceptions much more frequently (as in can't finish a run over a little
  2M
  row file) vs hardly at all in other libraries.
 
  The only other reference I could find to the issue was in presto-users,
  but
  the recommendation to leave .bz2 for .lzo doesn't help if I actually do
  want
  the higher compression levels of .bz2.
 
 
  Would love to hear if I have some kind of configuration issue or if
  there's
  a bug in .bz2 that's fixed in later versions of CDH, or generally any
  other
  thoughts on the issue.
 
 
  Thanks!
  Andrew
 
 
 
  Below are examples of some exceptions I'm getting:
 
  14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
  java.lang.ArrayIndexOutOfBoundsException
  java.lang.ArrayIndexOutOfBoundsException: 65535
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
  at
  org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
  at
 
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
  at
  org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 
 
 
 
  java.lang.ArrayIndexOutOfBoundsException: 90
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
  at
 
  org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
  at
 
  org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
  at java.io.InputStream.read(InputStream.java:101)
  at
  org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 

Re: Reading from .bz2 files with Spark

2014-05-13 Thread Xiangrui Meng
Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
the problem you described, but it does contain several fixes to bzip2
format. -Xiangrui

On Wed, May 7, 2014 at 9:19 PM, Andrew Ash and...@andrewash.com wrote:
 Hi all,

 Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
 success?


 I'm finding the following results on a recent commit (756c96 from 24hr ago)
 and CDH 4.4.0:

 Works: val r = sc.textFile(/user/aa/myfile.bz2).count
 Doesn't work: val r = sc.textFile(/user/aa/myfile.bz2).map((s:String) =
 s+|  ).count

 Specifically, I'm getting an exception coming out of the bzip2 libraries
 (see below stacktraces), which is unusual because I'm able to read from that
 file without an issue using the same libraries via Pig.  It was originally
 created from Pig as well.

 Digging a little deeper I found this line in the .bz2 decompressor's javadoc
 for CBZip2InputStream:

 Instances of this class are not threadsafe. [source]


 My current working theory is that Spark has a much higher level of
 parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
 exceptions much more frequently (as in can't finish a run over a little 2M
 row file) vs hardly at all in other libraries.

 The only other reference I could find to the issue was in presto-users, but
 the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
 the higher compression levels of .bz2.


 Would love to hear if I have some kind of configuration issue or if there's
 a bug in .bz2 that's fixed in later versions of CDH, or generally any other
 thoughts on the issue.


 Thanks!
 Andrew



 Below are examples of some exceptions I'm getting:

 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ArrayIndexOutOfBoundsException
 java.lang.ArrayIndexOutOfBoundsException: 65535
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)




 java.lang.ArrayIndexOutOfBoundsException: 90
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
 at
 org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
 at
 org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
 at java.io.InputStream.read(InputStream.java:101)
 at
 org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
 at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
 at
 org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
 at
 org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
 at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)