[ https://issues.apache.org/jira/browse/FLINK-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179712#comment-15179712 ]
Moritz Schubotz commented on FLINK-3568: ---------------------------------------- What exactly do you mean by "removing the 'shaded-hadoop' and adding a newer org.apache.hadoop-dependency." > Hadoop's Bzip decompression is not thread safe > ---------------------------------------------- > > Key: FLINK-3568 > URL: https://issues.apache.org/jira/browse/FLINK-3568 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility > Affects Versions: 0.10.1 > Reporter: Sebastian Neef > Priority: Critical > > Hi, > first of all, this is my first time filing a bug report for Apache Flink. If > you need any additional information or something else, please let me know. > h1. Background > I was trying to process [Wikipedia's XML > dumps|https://dumps.wikimedia.org/enwiki/20160204/] with Apache Flink. To > save disk space I decided to use the bziped versions. > Apache Flink is compatible to [Hadoop's InputFormats > |https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html] > and Hadoop's TextInputFormat [supports compressed > files|https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html#line.5]. > [Bzip2 files are splittable|http://comphadoop.weebly.com/index.html] and > thus perfect for parallel processing. > h1. Problem > I started to test the decompression with a simple Job based on the Apache > Flink Quickstart code: > {code} > public static void main(String[] args) throws Exception { > if(args.length != 2) { > System.err.println("USAGE: Job <wikipediadump.xml.bz2> > <output.txt>"); > return; > } > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet<Tuple2<LongWritable, Text>> input = > env.readHadoopFile(new TextInputFormat(), LongWritable.class, > Text.class, args[0]); > input.writeAsText(args[1]); > // execute program > env.execute("Bzip compression test"); > } > {code} > When starting the job, I get the following exception: > {noformat} > 02/29/2016 11:59:50 CHAIN DataSource (at > createInput(ExecutionEnvironment.java:508) > (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Map (Map at > main(Job.java:67))(1/2) switched to FAILED > java.lang.ArrayIndexOutOfBoundsException: 18002 > at > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:730) > at > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:801) > at > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504) > at > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333) > at > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399) > at > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483) > at java.io.InputStream.read(InputStream.java:101) > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130) > at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216) > at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174) > at > org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209) > at > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.fetchNext(HadoopInputFormatBase.java:185) > at > org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.reachedEnd(HadoopInputFormatBase.java:179) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:166) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This does not happen with "-p 1", but a parallelism greater 1. > h1. Research > Googling the error message leads to some spark/hadoop mailing lists and it > looks like the used "compress.bzip2.CBZip2InputStream" class is not > threadsafe: > - [Link one|https://issues.apache.org/jira/browse/HADOOP-10614] > - [Link > two|http://stackoverflow.com/questions/5159602/processing-a-bzip-string-file-in-scala] > - [Link > three|http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-using-sc-textFile-on-BZ2-compressed-files-td22905.html] > - [Link > four|https://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c1402318634.7682.yahoomail...@web190401.mail.sg3.yahoo.com%3E] > Especially Link one is the most interesting one, because the Hadoop project > resolved the issue: > {quote} > Hadoop uses CBZip2InputStream to decode bzip2 files. However, the > implementation is not threadsafe. This is not a really problem for Hadoop > MapReduce because Hadoop runs each task in a separate JVM. But for other > libraries that utilize multithreading and use Hadoop's InputFormat, e.g., > Spark, it will cause exceptions like the following: > {quote} > My guess is that Apache Flink needs to update/patch the CBZip2InputStream > class to resolve the problem? > All the best, > Sebastian -- This message was sent by Atlassian JIRA (v6.3.4#6332)