Hi, Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field. As a workaround, you can read the file as a regular text file, line by line and do the parsing in a MapFunction.
Best, Fabian 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Forgot to add parseQuotedStrings('"'). After adding it I'm getting the > same exception with the second code too. > > 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Hi Fabian, >> >> I tried to debug the code, and it turns out a line in my csv data is >> causing the ArrayIndexOutOfBoundsException, here is the exception >> stacktrace: >> >> java.lang.ArrayIndexOutOfBoundsException: -1 >> at org.apache.flink.types.parser.StringParser.parseField(String >> Parser.java:49) >> at org.apache.flink.types.parser.StringParser.parseField(String >> Parser.java:28) >> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd >> Parse(FieldParser.java:98) >> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe >> cord(GenericCsvInputFormat.java:395) >> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn >> putFormat.java:110) >> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco >> rd(DelimitedInputFormat.java:470) >> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn >> putFormat.java:78) >> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF >> ormat.java:106) >> at org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:162) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> >> And here is a sample CSV: >> >> timestamp,url,id >> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr >> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 >> >> Using my code, I get the previous exception when parsing the sample CSV. >> If I use the following code, I get an incorrect result : (2016-08-31 >> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 >> 12:08:11.223, 0000000) >> >> DataSet<Tuple2<String, String>> withReadCSV = >> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") >> .ignoreFirstLine() >> .fieldDelimiter(",") >> .includeFields("101") >> .ignoreInvalidLines() >> .types(String.class, String.class); >> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", >> FileSystem.WriteMode.OVERWRITE).setParallelism(1); >> >> >> Is it a bug in Flink or is my data not compliant with the csv standards? >> >> Thanks, >> Yassine >> >> >> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> Hi Yassine, >>> >>> I ran your code without problems and got the correct result. >>> Can you provide the Stacktrace of the Exception? >>> >>> Thanks, Fabian >>> >>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com> >>> : >>> >>>> Thank you Fabian and Stephan for the suggestions. >>>> I couldn't override "readLine()" because it's final, so went with >>>> Fabian's solution, but I'm struggling with csv field masks. Any help is >>>> appreciated. >>>> I created an Input Format which is basically TupleCsvInputFormat for >>>> which I overrode the nextRecord() method to catch the exceptions. But I'm >>>> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >>>> mask, the job succeeds but outputs the first and second columns. Here is my >>>> code: >>>> >>>> TupleTypeInfo<Tuple2<String, String>> typeInfo = >>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >>>> Path histPath = new Path("hdfs:///shared/file.csv"); >>>> >>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new >>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >>>> myInputFormt.enableQuotedStringParsing('"'); >>>> myInputFormt.setSkipFirstLineAsHeader(true); >>>> myInputFormt.setLenient(true); >>>> >>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t >>>> ypeInfo).withParameters(parameters); >>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); >>>> >>>> and here is the custom input format: >>>> >>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { >>>> private static final long serialVersionUID = 1L; >>>> private TupleSerializerBase<OUT> tupleSerializer; >>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>> tupleTypeInfo) { >>>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>>> tupleTypeInfo); >>>> } >>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { >>>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>> createDefaultMask(tupleTypeInfo.getArity())); >>>> } >>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>> tupleTypeInfo, int[] includedFieldsMask) { >>>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>>> tupleTypeInfo, includedFieldsMask); >>>> } >>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] >>>> includedFieldsMask) { >>>> super(filePath); >>>> boolean[] mask = (includedFieldsMask == null) >>>> ? createDefaultMask(tupleTypeInfo.getArity()) >>>> : toBooleanMask(includedFieldsMask); >>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >>>> } >>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>> tupleTypeInfo, boolean[] includedFieldsMask) { >>>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>>> tupleTypeInfo, includedFieldsMask); >>>> } >>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] >>>> includedFieldsMask) { >>>> super(filePath); >>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>> includedFieldsMask); >>>> } >>>> private void configure(String lineDelimiter, String fieldDelimiter, >>>> TupleTypeInfoBase<OUT> tupleTypeInfo, >>>> boolean[] includedFieldsMask) { >>>> if (tupleTypeInfo.getArity() == 0) { >>>> throw new IllegalArgumentException("Tuple size must be >>>> greater than 0."); >>>> } >>>> if (includedFieldsMask == null) { >>>> includedFieldsMask = createDefaultMask(tupleTypeInf >>>> o.getArity()); >>>> } >>>> tupleSerializer = (TupleSerializerBase<OUT>) >>>> tupleTypeInfo.createSerializer(new ExecutionConfig()); >>>> setDelimiter(lineDelimiter); >>>> setFieldDelimiter(fieldDelimiter); >>>> Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; >>>> for (int i = 0; i < tupleTypeInfo.getArity(); i++) { >>>> classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); >>>> } >>>> setFieldsGeneric(includedFieldsMask, classes); >>>> } >>>> @Override >>>> public OUT fillRecord(OUT reuse, Object[] parsedValues) { >>>> return tupleSerializer.createOrReuseInstance(parsedValues, >>>> reuse); >>>> } >>>> >>>> @Override >>>> public OUT nextRecord(OUT record) { >>>> OUT returnRecord = null; >>>> do { >>>> try { >>>> returnRecord = super.nextRecord(record); >>>> } catch (IOException e) { >>>> e.printStackTrace(); >>>> } >>>> } while (returnRecord == null && !reachedEnd()); >>>> return returnRecord; >>>> } >>>> } >>>> >>>> Thanks, >>>> Yassine >>>> >>>> >>>> >>>> >>>> >>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: >>>> >>>>> How about just overriding the "readLine()" method to call >>>>> "super.readLine()" and catching EOF exceptions? >>>>> >>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Yassine, >>>>>> >>>>>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>>>>> You could try to implement a FileInputFormat that wraps the >>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>>>>> The wrapper would also catch and ignore the EOFException. >>>>>> >>>>>> If you do that, you would not be able to use the env.readCsvFile() >>>>>> shortcut but would need to create an instance of your own InputFormat and >>>>>> add it with >>>>>> env.readFile(yourIF). >>>>>> >>>>>> Hope this helps, >>>>>> Fabian >>>>>> >>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI < >>>>>> y.marzou...@mindlytix.com>: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I am reading a large number of GZip compressed csv files, nested in >>>>>>> a HDFS directory: >>>>>>> >>>>>>> Configuration parameters = new Configuration(); >>>>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>>>>> d/logs/") >>>>>>> .ignoreFirstLine() >>>>>>> .fieldDelimiter("|") >>>>>>> .includeFields("011000") >>>>>>> .types(String.class, Long.class) >>>>>>> .withParameters(parameters); >>>>>>> >>>>>>> My job is failing with the following exception: >>>>>>> >>>>>>> 2016-10-04 17:19:59,933 INFO >>>>>>> org.apache.flink.runtime.jobmanager.JobManager - Status >>>>>>> of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>>>>>> >>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>>>>> >>>>>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>>>>> >>>>>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>>>>> >>>>>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>>>>> >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>>>>> >>>>>>> at java.lang.Thread.run(Unknown Source) >>>>>>> >>>>>>> I think it is due to some unproperly compressed files, how can I handle >>>>>>> and ignore such exceptions? Thanks. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Yassine >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >