Hey Austin, I think you are right. The problematic row contains an odd number of delimiters in which case skipFields will return -1, which in turn leads to an exception.
I opened a bug ticket https://issues.apache.org/jira/browse/FLINK-19711 to fix it. Regards, Roman On Fri, Oct 16, 2020 at 8:32 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey all, > > I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1]. > > Even with the `ignoreParseErrors()` set, the job fails when it encounters > some types of malformed rows. The root cause is indeed a `ParseException`, > so I'm wondering if there's anything more I need to do to ignore these > rows. Each field in the schema is a STRING. > > > I've configured the CSV format and table like so: > > tableEnv.connect( > new FileSystem() > .path(path) > ) > .withFormat( > new Csv() > .quoteCharacter('"') > .ignoreParseErrors() > ) > .withSchema(schema) > .inAppendMode() > > > Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check > to `isLenient()` if there is an unexpected parser position?[2] > > Example error: > > 2020-10-16 12:50:18 > org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught > exception when processing split: null > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351) > Caused by: org.apache.flink.api.common.io.ParseException: Unexpected > parser position for column 1 of row '", > https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,, > "",,,,company,' > at > org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204) > at > org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111) > at > org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520) > at > org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329) > > > Thanks, > Austin > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format > [2]: > https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206 >