Hi Fabian,

I tried to debug the code, and it turns out a line in my csv data is
causing the ArrayIndexOutOfBoundsException, here is the exception
stacktrace:

java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49)
at
org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28)
at
org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98)
at
org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
at
org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

And here is a sample CSV:

timestamp,url,id
2016-08-31 12:08:11.223,"
https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00
/""=/-3h00 %=) 1",0000000

Using my code, I get the previous exception when parsing the sample CSV. If
I use the following code, I get an incorrect result : (2016-08-31
12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
12:08:11.223, 0000000)

DataSet<Tuple2<String, String>> withReadCSV =
env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
        .ignoreFirstLine()
        .fieldDelimiter(",")
        .includeFields("101")
        .ignoreInvalidLines()
        .types(String.class, String.class);
withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt",
FileSystem.WriteMode.OVERWRITE).setParallelism(1);


Is it a bug in Flink or is my data not compliant with the csv standards?

Thanks,
Yassine


2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi Yassine,
>
> I ran your code without problems and got the correct result.
> Can you provide the Stacktrace of the Exception?
>
> Thanks, Fabian
>
> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Thank you Fabian and Stephan for the suggestions.
>> I couldn't override "readLine()" because it's final, so went with
>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>> appreciated.
>> I created an Input Format which is basically TupleCsvInputFormat for
>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>> mask, the job succeeds but outputs the first and second columns. Here is my
>> code:
>>
>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>> Path histPath = new Path("hdfs:///shared/file.csv");
>>
>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>> myInputFormt.enableQuotedStringParsing('"');
>> myInputFormt.setSkipFirstLineAsHeader(true);
>> myInputFormt.setLenient(true);
>>
>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>> ypeInfo).withParameters(parameters);
>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>>
>> and here is the  custom input format:
>>
>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
>>     private static final long serialVersionUID = 1L;
>>     private TupleSerializerBase<OUT> tupleSerializer;
>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>> tupleTypeInfo) {
>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>> tupleTypeInfo);
>>     }
>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>> createDefaultMask(tupleTypeInfo.getArity()));
>>     }
>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>> tupleTypeInfo, int[] includedFieldsMask) {
>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>> tupleTypeInfo, includedFieldsMask);
>>     }
>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
>> includedFieldsMask) {
>>         super(filePath);
>>         boolean[] mask = (includedFieldsMask == null)
>>                 ? createDefaultMask(tupleTypeInfo.getArity())
>>                 : toBooleanMask(includedFieldsMask);
>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>>     }
>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>> tupleTypeInfo, boolean[] includedFieldsMask) {
>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>> tupleTypeInfo, includedFieldsMask);
>>     }
>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
>> includedFieldsMask) {
>>         super(filePath);
>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
>> includedFieldsMask);
>>     }
>>     private void configure(String lineDelimiter, String fieldDelimiter,
>>                            TupleTypeInfoBase<OUT> tupleTypeInfo,
>> boolean[] includedFieldsMask) {
>>         if (tupleTypeInfo.getArity() == 0) {
>>             throw new IllegalArgumentException("Tuple size must be
>> greater than 0.");
>>         }
>>         if (includedFieldsMask == null) {
>>             includedFieldsMask = createDefaultMask(tupleTypeInf
>> o.getArity());
>>         }
>>         tupleSerializer = (TupleSerializerBase<OUT>)
>> tupleTypeInfo.createSerializer(new ExecutionConfig());
>>         setDelimiter(lineDelimiter);
>>         setFieldDelimiter(fieldDelimiter);
>>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>>         }
>>         setFieldsGeneric(includedFieldsMask, classes);
>>     }
>>     @Override
>>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>>         return tupleSerializer.createOrReuseInstance(parsedValues,
>> reuse);
>>     }
>>
>>     @Override
>>     public OUT nextRecord(OUT record) {
>>         OUT returnRecord = null;
>>         do {
>>             try {
>>                 returnRecord = super.nextRecord(record);
>>             } catch (IOException e) {
>>                 e.printStackTrace();
>>             }
>>         } while (returnRecord == null && !reachedEnd());
>>         return returnRecord;
>>     }
>> }
>>
>> Thanks,
>> Yassine
>>
>>
>>
>>
>>
>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>
>>> How about just overriding the "readLine()" method to call
>>> "super.readLine()" and catching EOF exceptions?
>>>
>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Yassine,
>>>>
>>>> AFAIK, there is no built-in way to ignore corrupted compressed files.
>>>> You could try to implement a FileInputFormat that wraps the
>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>>> The wrapper would also catch and ignore the EOFException.
>>>>
>>>> If you do that, you would not be able to use the env.readCsvFile()
>>>> shortcut but would need to create an instance of your own InputFormat and
>>>> add it with
>>>> env.readFile(yourIF).
>>>>
>>>> Hope this helps,
>>>> Fabian
>>>>
>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com
>>>> >:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I am reading a large number of GZip compressed csv files, nested in a
>>>>> HDFS directory:
>>>>>
>>>>> Configuration parameters = new Configuration();
>>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>>> d/logs/")
>>>>>                 .ignoreFirstLine()
>>>>>                 .fieldDelimiter("|")
>>>>>                 .includeFields("011000")
>>>>>                 .types(String.class, Long.class)
>>>>>                 .withParameters(parameters);
>>>>>
>>>>> My job is failing with the following exception:
>>>>>
>>>>> 2016-10-04 17:19:59,933 INFO  
>>>>> org.apache.flink.runtime.jobmanager.JobManager                - Status of 
>>>>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>>>
>>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>>
>>>>>   at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>>
>>>>>   at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>>
>>>>>   at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>>
>>>>>   at 
>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>>
>>>>>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>>
>>>>>   at java.lang.Thread.run(Unknown Source)
>>>>>
>>>>> I think it is due to some unproperly compressed files, how can I handle 
>>>>> and ignore such exceptions? Thanks.
>>>>>
>>>>>
>>>>> Best,
>>>>> Yassine
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to