Sebastian Neef created FLINK-3568:
-------------------------------------

             Summary: Hadoop's Bzip decompression is not thread safe
                 Key: FLINK-3568
                 URL: https://issues.apache.org/jira/browse/FLINK-3568
             Project: Flink
          Issue Type: Bug
          Components: Hadoop Compatibility
    Affects Versions: 0.10.1
            Reporter: Sebastian Neef


Hi,
first of all, this is my first time filing a bug report for Apache Flink. If 
you need any additional information or something else, please let me know. 

h1. Background
I was trying to process [Wikipedia's XML 
dumps|https://dumps.wikimedia.org/enwiki/20160204/] with Apache Flink. To save 
disk space I decided to use the bziped versions. 

Apache Flink is compatible to [Hadoop's InputFormats 
|https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html]
 and Hadoop's TextInputFormat [supports compressed 
files|https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html#line.5].
 [Bzip2 files are splittable|http://comphadoop.weebly.com/index.html] and thus 
perfect for parallel processing.

h1. Problem
I started to test the decompression with a simple Job based on the Apache Flink 
Quickstart code:

{code}
    public static void main(String[] args) throws Exception {

        if(args.length != 2) {
            System.err.println("USAGE: Job <wikipediadump.xml.bz2> 
<output.txt>");
            return;
        }

        // set up the execution environment
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<LongWritable, Text>> input =
                env.readHadoopFile(new TextInputFormat(), LongWritable.class, 
Text.class, args[0]);

        input.writeAsText(args[1]);


        // execute program
        env.execute("Bzip compression test");
}
{code}
When starting the job, I get the following exception:
{noformat}
02/29/2016 11:59:50 CHAIN DataSource (at 
createInput(ExecutionEnvironment.java:508) 
(org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Map (Map at 
main(Job.java:67))(1/2) switched to FAILED 
java.lang.ArrayIndexOutOfBoundsException: 18002
    at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:730)
    at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:801)
    at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
    at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
    at 
org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399)
    at 
org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483)
    at java.io.InputStream.read(InputStream.java:101)
    at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at 
org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.fetchNext(HadoopInputFormatBase.java:185)
    at 
org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.reachedEnd(HadoopInputFormatBase.java:179)
    at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:166)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
{noformat}
This does not happen with "-p 1", but a parallelism greater 1.

h1. Research
Googling the error message leads to some spark/hadoop mailing lists and it 
looks like the used "compress.bzip2.CBZip2InputStream" class is not threadsafe:

- [Link one|https://issues.apache.org/jira/browse/HADOOP-10614]
- [Link 
two|http://stackoverflow.com/questions/5159602/processing-a-bzip-string-file-in-scala]
- [Link 
three|http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-using-sc-textFile-on-BZ2-compressed-files-td22905.html]
- [Link 
four|https://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c1402318634.7682.yahoomail...@web190401.mail.sg3.yahoo.com%3E]

Especially Link one is the most interesting one, because the Hadoop project 
resolved the issue:

{quote}
Hadoop uses CBZip2InputStream to decode bzip2 files. However, the 
implementation is not threadsafe. This is not a really problem for Hadoop 
MapReduce because Hadoop runs each task in a separate JVM. But for other 
libraries that utilize multithreading and use Hadoop's InputFormat, e.g., 
Spark, it will cause exceptions like the following:
{quote}

My guess is that Apache Flink needs to update/patch the CBZip2InputStream class 
to resolve the problem? 

All the best,
Sebastian



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to