Hi Fabian, I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace:
java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49) at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28) at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98) at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395) at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) And here is a sample CSV: timestamp,url,id 2016-08-31 12:08:11.223," https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 0000000) DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") .ignoreFirstLine() .fieldDelimiter(",") .includeFields("101") .ignoreInvalidLines() .types(String.class, String.class); withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1); Is it a bug in Flink or is my data not compliant with the csv standards? Thanks, Yassine 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi Yassine, > > I ran your code without problems and got the correct result. > Can you provide the Stacktrace of the Exception? > > Thanks, Fabian > > 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Thank you Fabian and Stephan for the suggestions. >> I couldn't override "readLine()" because it's final, so went with >> Fabian's solution, but I'm struggling with csv field masks. Any help is >> appreciated. >> I created an Input Format which is basically TupleCsvInputFormat for >> which I overrode the nextRecord() method to catch the exceptions. But I'm >> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >> mask, the job succeeds but outputs the first and second columns. Here is my >> code: >> >> TupleTypeInfo<Tuple2<String, String>> typeInfo = >> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >> Path histPath = new Path("hdfs:///shared/file.csv"); >> >> CsvInputFormat <Tuple2<String, String>> myInputFormt = new >> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >> myInputFormt.enableQuotedStringParsing('"'); >> myInputFormt.setSkipFirstLineAsHeader(true); >> myInputFormt.setLenient(true); >> >> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t >> ypeInfo).withParameters(parameters); >> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); >> >> and here is the custom input format: >> >> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { >> private static final long serialVersionUID = 1L; >> private TupleSerializerBase<OUT> tupleSerializer; >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >> tupleTypeInfo) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { >> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >> createDefaultMask(tupleTypeInfo.getArity())); >> } >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >> tupleTypeInfo, int[] includedFieldsMask) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo, includedFieldsMask); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] >> includedFieldsMask) { >> super(filePath); >> boolean[] mask = (includedFieldsMask == null) >> ? createDefaultMask(tupleTypeInfo.getArity()) >> : toBooleanMask(includedFieldsMask); >> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >> } >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >> tupleTypeInfo, boolean[] includedFieldsMask) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo, includedFieldsMask); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] >> includedFieldsMask) { >> super(filePath); >> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, >> includedFieldsMask); >> } >> private void configure(String lineDelimiter, String fieldDelimiter, >> TupleTypeInfoBase<OUT> tupleTypeInfo, >> boolean[] includedFieldsMask) { >> if (tupleTypeInfo.getArity() == 0) { >> throw new IllegalArgumentException("Tuple size must be >> greater than 0."); >> } >> if (includedFieldsMask == null) { >> includedFieldsMask = createDefaultMask(tupleTypeInf >> o.getArity()); >> } >> tupleSerializer = (TupleSerializerBase<OUT>) >> tupleTypeInfo.createSerializer(new ExecutionConfig()); >> setDelimiter(lineDelimiter); >> setFieldDelimiter(fieldDelimiter); >> Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; >> for (int i = 0; i < tupleTypeInfo.getArity(); i++) { >> classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); >> } >> setFieldsGeneric(includedFieldsMask, classes); >> } >> @Override >> public OUT fillRecord(OUT reuse, Object[] parsedValues) { >> return tupleSerializer.createOrReuseInstance(parsedValues, >> reuse); >> } >> >> @Override >> public OUT nextRecord(OUT record) { >> OUT returnRecord = null; >> do { >> try { >> returnRecord = super.nextRecord(record); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> } while (returnRecord == null && !reachedEnd()); >> return returnRecord; >> } >> } >> >> Thanks, >> Yassine >> >> >> >> >> >> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: >> >>> How about just overriding the "readLine()" method to call >>> "super.readLine()" and catching EOF exceptions? >>> >>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Yassine, >>>> >>>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>>> You could try to implement a FileInputFormat that wraps the >>>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>>> The wrapper would also catch and ignore the EOFException. >>>> >>>> If you do that, you would not be able to use the env.readCsvFile() >>>> shortcut but would need to create an instance of your own InputFormat and >>>> add it with >>>> env.readFile(yourIF). >>>> >>>> Hope this helps, >>>> Fabian >>>> >>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com >>>> >: >>>> >>>>> Hi all, >>>>> >>>>> I am reading a large number of GZip compressed csv files, nested in a >>>>> HDFS directory: >>>>> >>>>> Configuration parameters = new Configuration(); >>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>>> d/logs/") >>>>> .ignoreFirstLine() >>>>> .fieldDelimiter("|") >>>>> .includeFields("011000") >>>>> .types(String.class, Long.class) >>>>> .withParameters(parameters); >>>>> >>>>> My job is failing with the following exception: >>>>> >>>>> 2016-10-04 17:19:59,933 INFO >>>>> org.apache.flink.runtime.jobmanager.JobManager - Status of >>>>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>>>> >>>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>>> >>>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>>> >>>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>>> >>>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>>> >>>>> at >>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>>> >>>>> at >>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>>> >>>>> at >>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>>> >>>>> at >>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>>> >>>>> at >>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>>> >>>>> at >>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>>> >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>>> >>>>> at java.lang.Thread.run(Unknown Source) >>>>> >>>>> I think it is due to some unproperly compressed files, how can I handle >>>>> and ignore such exceptions? Thanks. >>>>> >>>>> >>>>> Best, >>>>> Yassine >>>>> >>>>> >>>> >>> >> >