[FLINK-1318] [api-breaking] Simplified quoted string parsing, made it optional, and use a configurable quote character
This closes #265 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2665cf4e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2665cf4e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2665cf4e Branch: refs/heads/master Commit: 2665cf4e2a9e33e0e94ac7e0b7518a10445febbb Parents: 5e1cc9e Author: Fabian Hueske <fhue...@apache.org> Authored: Mon Oct 20 15:18:20 2014 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Feb 5 11:17:38 2015 +0100 ---------------------------------------------------------------------- .../api/common/io/GenericCsvInputFormat.java | 91 +++++++------ .../apache/flink/types/parser/FieldParser.java | 4 +- .../apache/flink/types/parser/StringParser.java | 130 +++++++------------ .../flink/types/parser/StringValueParser.java | 102 +++++++-------- .../types/parser/QuotedStringParserTest.java | 61 +++++++++ .../parser/QuotedStringValueParserTest.java | 65 ++++++++++ .../flink/types/parser/StringParserTest.java | 64 --------- .../types/parser/StringValueParserTest.java | 66 ---------- .../types/parser/UnquotedStringParserTest.java | 56 ++++++++ .../parser/UnquotedStringValueParserTest.java | 56 ++++++++ .../types/parser/VarLengthStringParserTest.java | 88 ++++++------- .../org/apache/flink/api/java/io/CsvReader.java | 25 +++- .../flink/api/java/io/CsvInputFormatTest.java | 47 ++++++- .../flink/api/scala/ExecutionEnvironment.scala | 6 + .../flink/api/scala/io/CsvInputFormatTest.scala | 41 ++++++ 15 files changed, 538 insertions(+), 364 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java index 151b1e2..662cf1b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java @@ -25,6 +25,8 @@ import com.google.common.primitives.Ints; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.apache.flink.types.parser.StringValueParser; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; @@ -41,15 +43,12 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','}; - private static final char QUOTE_CHARACTER = '"'; - - // -------------------------------------------------------------------------------------------- // Variables for internal operation. // They are all transient, because we do not want them so be serialized // -------------------------------------------------------------------------------------------- - private transient FieldParser<Object>[] fieldParsers; + private transient FieldParser<?>[] fieldParsers; // -------------------------------------------------------------------------------------------- @@ -65,6 +64,10 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> private boolean lenient; private boolean skipFirstLineAsHeader; + + private boolean quotedStringParsing = false; + + private byte quoteCharacter; // -------------------------------------------------------------------------------------------- @@ -124,6 +127,11 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> public void setSkipFirstLineAsHeader(boolean skipFirstLine) { this.skipFirstLineAsHeader = skipFirstLine; } + + public void enableQuotedStringParsing(char quoteCharacter) { + quotedStringParsing = true; + this.quoteCharacter = (byte)quoteCharacter; + } // -------------------------------------------------------------------------------------------- @@ -254,7 +262,7 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> // instantiate the parsers @SuppressWarnings("unchecked") - FieldParser<Object>[] parsers = new FieldParser[fieldTypes.length]; + FieldParser<?>[] parsers = new FieldParser[fieldTypes.length]; for (int i = 0; i < fieldTypes.length; i++) { if (fieldTypes[i] != null) { @@ -265,7 +273,16 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> @SuppressWarnings("unchecked") - FieldParser<Object> p = (FieldParser<Object>) InstantiationUtil.instantiate(parserType, FieldParser.class); + FieldParser<?> p = (FieldParser<?>) InstantiationUtil.instantiate(parserType, FieldParser.class); + + if (this.quotedStringParsing) { + if (p instanceof StringParser) { + ((StringParser)p).enableQuotedStringParsing(this.quoteCharacter); + } else if (p instanceof StringValueParser) { + ((StringValueParser)p).enableQuotedStringParsing(this.quoteCharacter); + } + } + parsers[i] = p; } } @@ -347,50 +364,42 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> protected int skipFields(byte[] bytes, int startPos, int limit, byte[] delim) { int i = startPos; - byte current; + final int delimLimit = limit - delim.length + 1; - - // skip over initial whitespace lines - while (i < limit && ((current = bytes[i]) == ' ' || current == '\t')) { - i++; - } - - // first none whitespace character - if (i < limit && bytes[i] == QUOTE_CHARACTER) { - // quoted string - i++; // the quote - - while (i < limit && bytes[i] != QUOTE_CHARACTER) { + + if(quotedStringParsing == true && bytes[i] == quoteCharacter) { + + // quoted string parsing enabled and field is quoted + // search for ending quote character + while(i < limit && bytes[i] != quoteCharacter) { i++; } - - if (i < limit) { - // end of the quoted field - i++; // the quote - - // skip trailing whitespace characters - while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { - current = bytes[i]; - if (current == ' ' || current == '\t') { - i++; - } else { - return -1; // illegal case of non-whitespace characters trailing - } - } - - return (i >= delimLimit ? limit : i + delim.length); + i++; + + if (i == limit) { + // we are at the end of the record + return limit; + } else if ( i < delimLimit && FieldParser.delimiterNext(bytes, i, delim)) { + // we are not at the end, check if delimiter comes next + return i + delim.length; } else { - // exited due to line end without quote termination + // delimiter did not follow end quote. Error... return -1; } - } - else { - // unquoted field - while (i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { + } else { + // field is not quoted + while(i < delimLimit && !FieldParser.delimiterNext(bytes, i, delim)) { i++; } - return (i >= delimLimit ? limit : i + delim.length); + + if (i >= delimLimit) { + // no delimiter found. We are at the end of the record + return limit; + } else { + // delimiter found. + return i + delim.length; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 28f9a7a..33697fd 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -59,10 +59,10 @@ public abstract class FieldParser<T> { /** The field was not in a correct format for the numeric type. */ NUMERIC_VALUE_FORMAT_ERROR, - + /** A quoted string was not terminated until the line end. */ UNTERMINATED_QUOTED_STRING, - + /** The parser found characters between the end of the quoted string and the delimiter. */ UNQUOTED_CHARS_AFTER_QUOTED_STRING } http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index bd2550e..0860d41 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -25,112 +25,70 @@ package org.apache.flink.types.parser; * strings, whitespaces (space and tab) leading and trailing before and after the quotes are removed. */ public class StringParser extends FieldParser<String> { - - private static final byte WHITESPACE_SPACE = (byte) ' '; - private static final byte WHITESPACE_TAB = (byte) '\t'; - private static final byte QUOTE_CHARACTER = (byte) '"'; + private boolean quotedStringParsing = false; + private byte quoteCharacter; - private static enum ParserStates { - NONE, IN_QUOTE, STOP + private String result; + + public void enableQuotedStringParsing(byte quoteCharacter) { + this.quotedStringParsing = true; + this.quoteCharacter = quoteCharacter; } - private String result; - @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, String reusable) { - + int i = startPos; byte current; - boolean delimiterFound = false; final int delimLimit = limit-delimiter.length+1; - - // count initial whitespace lines - while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { - i++; - } - // first determine the boundaries of the cell - ParserStates parserState = ParserStates.NONE; - - // the current position evaluated against the cell boundary - int endOfCellPosition = i - 1; + if(quotedStringParsing == true && bytes[i] == quoteCharacter) { + // quoted string parsing enabled and first character Vis a quote + i++; - while (parserState != ParserStates.STOP && endOfCellPosition < limit) { - endOfCellPosition++; - // make sure we don't step over the end of the buffer - if(endOfCellPosition == limit) { - break; + // search for ending quote character + while(i < limit && bytes[i] != quoteCharacter) { + i++; } - if(endOfCellPosition < delimLimit && delimiterNext(bytes, endOfCellPosition, delimiter)) { - // if we are in a quote do nothing, otherwise we reached the end - if (parserState != ParserStates.IN_QUOTE) { - parserState = ParserStates.STOP; - delimiterFound = true; - } - endOfCellPosition += delimiter.length - 1; - } else if(bytes[endOfCellPosition] == QUOTE_CHARACTER) { - // we entered a quote - if(parserState == ParserStates.IN_QUOTE) { - // we end the quote - parserState = ParserStates.NONE; + + if (i == limit) { + setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); + return -1; + } else { + i++; + // check for proper termination + if (i == limit) { + // either by end of line + this.result = new String(bytes, startPos+1, i - startPos - 2); + return limit; + } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { + // or following field delimiter + this.result = new String(bytes, startPos+1, i - startPos - 2); + return i + delimiter.length; } else { - // we start a new quote - parserState = ParserStates.IN_QUOTE; + // no proper termination + setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); + return -1; } } - } - int delimCorrection = delimiterFound ? delimiter.length : 1; + } else { - if(parserState == ParserStates.IN_QUOTE) { - // exited due to line end without quote termination - setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); - return -1; - } - - // boundary of the cell is now - // i --> endOfCellPosition - - // first none whitespace character - if (i < limit && bytes[i] == QUOTE_CHARACTER) { - - // check if there are characters at the end - current = bytes[endOfCellPosition - delimCorrection]; - - // if the character preceding the end of the cell is not a WHITESPACE or the end QUOTE_DOUBLE - // there are unquoted characters at the end - - if (!(current == WHITESPACE_SPACE || current == WHITESPACE_TAB || current == QUOTE_CHARACTER)) { - setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); - return -1; // illegal case of non-whitespace characters trailing + // look for delimiter + while( i < delimLimit && !delimiterNext(bytes, i, delimiter)) { + i++; } - // skip trailing whitespace after quote .. by moving the cursor backwards - int skipAtEnd = 0; - while (bytes[endOfCellPosition - delimCorrection - skipAtEnd] == WHITESPACE_SPACE || - bytes[endOfCellPosition - delimCorrection - skipAtEnd] == WHITESPACE_TAB) { - skipAtEnd++; + if (i >= delimLimit) { + // no delimiter found. Take the full string + this.result = new String(bytes, startPos, limit - startPos); + return limit; + } else { + // delimiter found. + this.result = new String(bytes, startPos, i - startPos); + return i + delimiter.length; } - - // now unescape - boolean notEscaped = true; - int endOfContent = i + 1; - for(int counter = endOfContent; counter < endOfCellPosition - delimCorrection - skipAtEnd; counter++) { - notEscaped = bytes[counter] != QUOTE_CHARACTER || !notEscaped; - if (notEscaped) { - // realign - bytes[endOfContent++] = bytes[counter]; - } - } - this.result = new String(bytes, i + 1, endOfContent - i - 1); - return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); - } - else { - // unquoted string - // set from the beginning. unquoted strings include the leading whitespaces - this.result = new String(bytes, i, endOfCellPosition - i - (delimCorrection - 1)); - return (endOfCellPosition == limit ? limit : endOfCellPosition + 1); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index 7fd7b54..3cbb21e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -30,13 +30,16 @@ import org.apache.flink.types.StringValue; */ public class StringValueParser extends FieldParser<StringValue> { - private static final byte WHITESPACE_SPACE = (byte) ' '; - private static final byte WHITESPACE_TAB = (byte) '\t'; - - private static final byte QUOTE_DOUBLE = (byte) '"'; - + private boolean quotedStringParsing = false; + private byte quoteCharacter; + private StringValue result; + public void enableQuotedStringParsing(byte quoteCharacter) { + this.quotedStringParsing = true; + this.quoteCharacter = quoteCharacter; + } + @Override public int parseField(byte[] bytes, int startPos, int limit, byte[] delimiter, StringValue reusable) { @@ -46,64 +49,53 @@ public class StringValueParser extends FieldParser<StringValue> { final int delimLimit = limit-delimiter.length+1; - // count initial whitespace lines - while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE || current == WHITESPACE_TAB)) { + if(quotedStringParsing == true && bytes[i] == quoteCharacter) { + // quoted string parsing enabled and first character is a quote i++; - } - - // first none whitespace character - if (i < limit && bytes[i] == QUOTE_DOUBLE) { - // quoted string - i++; // the quote - - // we count only from after the quote - int quoteStart = i; - while (i < limit && bytes[i] != QUOTE_DOUBLE) { + + // search for ending quote character + while(i < limit && bytes[i] != quoteCharacter) { i++; } - - if (i < limit) { - // end of the string - reusable.setValueAscii(bytes, quoteStart, i-quoteStart); - - i++; // the quote - - // skip trailing whitespace characters - while (i < limit) { - - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - return i+delimiter.length; - } - current = bytes[i]; - if (current == WHITESPACE_SPACE || current == WHITESPACE_TAB) { - i++; - } - else { - setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); - return -1; // illegal case of non-whitespace characters trailing - } - } - if( i > limit ){ - i--; - } - return (i == limit ? limit : i + delimiter.length); - } else { - // exited due to line end without quote termination + + if (i == limit) { setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING); return -1; - } - } - else { - // unquoted string -delim.length - while (i < limit) { - if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { - break; + } else { + i++; + // check for proper termination + if (i == limit) { + // either by end of line + reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + return limit; + } else if ( i < delimLimit && delimiterNext(bytes, i, delimiter)) { + // or following field delimiter + reusable.setValueAscii(bytes, startPos+1, i - startPos - 2); + return i + delimiter.length; + } else { + // no proper termination + setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING); + return -1; } + + } + + } else { + + // look for delimiter + while( i < delimLimit && !delimiterNext(bytes, i, delimiter)) { i++; } - // set from the beginning. unquoted strings include the leading whitespaces - reusable.setValueAscii(bytes, startPos, i-startPos); - return (i == limit ? limit : i + delimiter.length); + + if (i >= delimLimit) { + // no delimiter found. Take the full string + reusable.setValueAscii(bytes, startPos, limit - startPos); + return limit; + } else { + // delimiter found. + reusable.setValueAscii(bytes, startPos, i - startPos); + return i + delimiter.length; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java new file mode 100644 index 0000000..89d5ac8 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + + +public class QuotedStringParserTest extends ParserTestBase<String> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "\"abcdefgh\"", "\"i\"", "\"jklmno\"", "\"abc|de|fgh\"", + "\"abc&&&&def&&&&ghij\"", "\"i\"", "\"Hello9\"", + "abcdefgh", "i", "jklmno", "Hello9" + }; + } + + @Override + public String[] getValidTestResults() { + return new String[] { + "abcdefgh", "i", "jklmno", "abc|de|fgh", + "abc&&&&def&&&&ghij", "i", "Hello9", + "abcdefgh", "i", "jklmno", "Hello9" + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + "\"abcd\"ef", "\"abcdef" + }; + } + + @Override + public FieldParser<String> getParser() { + StringParser p = new StringParser(); + p.enableQuotedStringParsing((byte)'"'); + return p; + } + + @Override + public Class<String> getTypeClass() { + return String.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java new file mode 100644 index 0000000..66097ac --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import org.apache.flink.types.StringValue; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringValueParser; + + +public class QuotedStringValueParserTest extends ParserTestBase<StringValue> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "\"abcdefgh\"", "\"i\"", "\"jklmno\"", "\"abc|de|fgh\"", + "\"abc&&&&def&&&&ghij\"", "\"i\"", "\"Hello9\"", + "abcdefgh", "i", "jklmno", "Hello9" + }; + } + + @Override + public StringValue[] getValidTestResults() { + return new StringValue[] { + new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), new StringValue("abc|de|fgh"), + new StringValue("abc&&&&def&&&&ghij"), new StringValue("i"), new StringValue("Hello9"), + new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), new StringValue("Hello9"), + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { + "\"abcd\"ef", "\"abcdef" + }; + } + + @Override + public FieldParser<StringValue> getParser() { + StringValueParser p = new StringValueParser(); + p.enableQuotedStringParsing((byte)'"'); + return p; + } + + @Override + public Class<StringValue> getTypeClass() { + return StringValue.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java deleted file mode 100644 index 702f985..0000000 --- a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.types.parser; - -import org.apache.flink.types.parser.StringParser; -import org.apache.flink.types.parser.FieldParser; - - -public class StringParserTest extends ParserTestBase<String> { - - @Override - public String[] getValidTestValues() { - return new String[] { - "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", "\"jklmno\"", - "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"", - " \"abcdefgh\"", " \"i\"\t\t\t", "\t \t\"jklmno\" ", - " \" abcd \" \t ", "Hello9" - }; - } - - @Override - public String[] getValidTestResults() { - return new String[] { - "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno", - "ab,cde|fg", "hij|m|n|op", "hij&&m&&n&&op", - "abcdefgh", "i", "jklmno", - " abcd ", "Hello9" - }; - } - - @Override - public String[] getInvalidTestValues() { - return new String[] { - " \"abcdefgh ", " \"ijklmno\" hj" - }; - } - - @Override - public FieldParser<String> getParser() { - return new StringParser(); - } - - @Override - public Class<String> getTypeClass() { - return String.class; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java deleted file mode 100644 index 66ad32d..0000000 --- a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.types.parser; - -import org.apache.flink.types.StringValue; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.StringValueParser; - - -public class StringValueParserTest extends ParserTestBase<StringValue> { - - @Override - public String[] getValidTestValues() { - return new String[] { - "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", "\"jklmno\"", - "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"", - " \"abcdefgh\"", " \"i\"\t\t\t", "\t \t\"jklmno\" ", - " \" abcd \" \t ", "Hello9" - }; - } - - @Override - public StringValue[] getValidTestResults() { - return new StringValue[] { - new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), - new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), - new StringValue("ab,cde|fg"), new StringValue("hij|m|n|op"), new StringValue("hij&&m&&n&&op"), - new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), - new StringValue(" abcd "), new StringValue("Hello9") - }; - } - - @Override - public String[] getInvalidTestValues() { - return new String[] { - " \"abcdefgh ", " \"ijklmno\" hj" - }; - } - - @Override - public FieldParser<StringValue> getParser() { - return new StringValueParser(); - } - - @Override - public Class<StringValue> getTypeClass() { - return StringValue.class; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java new file mode 100644 index 0000000..cadd021 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import org.apache.flink.types.parser.StringParser; +import org.apache.flink.types.parser.FieldParser; + + +public class UnquotedStringParserTest extends ParserTestBase<String> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9" + }; + } + + @Override + public String[] getValidTestResults() { + return new String[] { + "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9" + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { }; + } + + @Override + public FieldParser<String> getParser() { + return new StringParser(); + } + + @Override + public Class<String> getTypeClass() { + return String.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java new file mode 100644 index 0000000..d66e852 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.types.parser; + +import org.apache.flink.types.StringValue; + + +public class UnquotedStringValueParserTest extends ParserTestBase<StringValue> { + + @Override + public String[] getValidTestValues() { + return new String[] { + "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9" + }; + } + + @Override + public StringValue[] getValidTestResults() { + return new StringValue[] { + new StringValue("abcdefgh"), new StringValue("i"), new StringValue("jklmno"), + new StringValue("\"abc\"defgh\""), new StringValue("\"i\""), new StringValue("Hello9") + }; + } + + @Override + public String[] getInvalidTestValues() { + return new String[] { }; + } + + @Override + public FieldParser<StringValue> getParser() { + return new StringValueParser(); + } + + @Override + public Class<StringValue> getTypeClass() { + return StringValue.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 56b1dd0..4f9069e 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -38,6 +38,8 @@ public class VarLengthStringParserTest { @Test public void testParseValidUnquotedStrings() { + + this.parser = new StringValueParser(); // check valid strings with out whitespaces and trailing delimiter byte[] recBytes = "abcdefgh|i|jklmno|".getBytes(); @@ -77,8 +79,11 @@ public class VarLengthStringParserTest { } @Test - public void testParseValidQuotedStringsWithoutWhitespaces() { - + public void testParseValidQuotedStrings() { + + this.parser = new StringValueParser(); + this.parser.enableQuotedStringParsing((byte)'"'); + // check valid strings with out whitespaces and trailing delimiter byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes(); StringValue s = new StringValue(); @@ -137,68 +142,57 @@ public class VarLengthStringParserTest { assertTrue(startPos == 25); assertTrue(s.getValue().equals("hij|kl|mn|op")); } - + @Test - public void testParseValidQuotedStringsWithWhitespaces() { - + public void testParseValidMixedStrings() { + + this.parser = new StringValueParser(); + this.parser.enableQuotedStringParsing((byte)'@'); + // check valid strings with out whitespaces and trailing delimiter - byte[] recBytes = " \"abcdefgh\"| \"i\"\t\t\t|\t \t\"jklmno\" |".getBytes(); + byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes(); StringValue s = new StringValue(); - + int startPos = 0; startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 13); - assertTrue(s.getValue().equals("abcdefgh")); - - startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 25); - assertTrue(s.getValue().equals("i")); - - startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 39); - assertTrue(s.getValue().equals("jklmno")); - - // check valid strings with out whitespaces without trailing delimiter - recBytes = " \"abcdefgh\"| \"i\"\t\t\t|\t \t\"jklmno\" ".getBytes(); - s = new StringValue(); - - startPos = 0; - startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 13); - assertTrue(s.getValue().equals("abcdefgh")); - + assertTrue(startPos == 11); + assertTrue(s.getValue().equals("abcde|gh")); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 25); + assertTrue(startPos == 15); assertTrue(s.getValue().equals("i")); - + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 38); - assertTrue(s.getValue().equals("jklmno")); - - // check single field not terminated - recBytes = " \t\"abcde\"\t\t \t ".getBytes(); - startPos = 0; + assertTrue(startPos == 24); + assertTrue(s.getValue().equals("jklmnopq")); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 16); - assertTrue(s.getValue().equals("abcde")); - - // check single field terminated - recBytes = " \t\"abcde\"\t\t \t |".getBytes(); - startPos = 0; + assertTrue(startPos == 29); + assertTrue(s.getValue().equals("rs")); + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); - assertTrue(startPos == 17); - assertTrue(s.getValue().equals("abcde")); + assertTrue(startPos == 32); + assertTrue(s.getValue().equals("tuv")); + } - + + @Test public void testParseInvalidQuotedStrings() { - + + this.parser = new StringValueParser(); + this.parser.enableQuotedStringParsing((byte)'"'); + // check valid strings with out whitespaces and trailing delimiter - byte[] recBytes = " \"abcdefgh\" gh | \"i\"\t\t\t|\t \t\"jklmno\" |".getBytes(); + byte[] recBytes = "\"abcdefgh\"-|\"jklmno ".getBytes(); StringValue s = new StringValue(); - + int startPos = 0; startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); assertTrue(startPos < 0); + + startPos = 12; + startPos = parser.parseField(recBytes, startPos, recBytes.length, new byte[] {'|'}, s); + assertTrue(startPos < 0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java index b31c9a4..ac879b7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java @@ -50,9 +50,13 @@ public class CsvReader { protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER; protected String fieldDelimiter = CsvInputFormat.DEFAULT_FIELD_DELIMITER; - + protected String commentPrefix = null; //default: no comments + protected boolean parseQuotedStrings = false; + + protected char quoteCharacter = '"'; + protected boolean skipFirstLineAsHeader = false; protected boolean ignoreInvalidLines = false; @@ -117,7 +121,21 @@ public class CsvReader { this.fieldDelimiter = delimiter; return this; } - + + /** + * Enables quoted String parsing. Field delimiters in quoted Strings are ignored. + * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise. + * Leading or tailing whitespaces are not allowed. + * + * @param quoteCharacter The character which is used as quoting character. + * @return The CSV reader instance itself, to allow for fluent function chaining. + */ + public CsvReader parseQuotedStrings(char quoteCharacter) { + this.parseQuotedStrings = true; + this.quoteCharacter = quoteCharacter; + return this; + } + /** * Configures the string that starts comments. * By default comments will be treated as invalid lines. @@ -297,6 +315,9 @@ public class CsvReader { format.setCommentPrefix(this.commentPrefix); format.setSkipFirstLineAsHeader(skipFirstLineAsHeader); format.setLenient(ignoreInvalidLines); + if (this.parseQuotedStrings) { + format.enableQuotedStringParsing(this.quoteCharacter); + } if (this.includedMask == null) { format.setFieldTypes(types); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java index 906e6d9..6306f6e 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java @@ -232,6 +232,49 @@ public class CsvInputFormatTest { fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); } } + + @Test + public void readMixedQuotedStringFields() { + try { + final String fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"; + final FileInputSplit split = createTempFile(fileContent); + + final CsvInputFormat<Tuple3<String, String, String>> format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", String.class, String.class, String.class); + + final Configuration parameters = new Configuration(); + format.configure(parameters); + format.enableQuotedStringParsing('@'); + format.open(split); + + Tuple3<String, String, String> result = new Tuple3<String, String, String>(); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("a|b|c", result.f0); + assertEquals("def", result.f1); + assertEquals("ghijk", result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("abc", result.f0); + assertEquals("", result.f1); + assertEquals("|hhg", result.f2); + + result = format.nextRecord(result); + assertNotNull(result); + assertEquals("", result.f0); + assertEquals("", result.f1); + assertEquals("", result.f2); + + result = format.nextRecord(result); + assertNull(result); + assertTrue(format.reachedEnd()); + } + catch (Exception ex) { + ex.printStackTrace(); + fail("Test failed due to a " + ex.getClass().getName() + ": " + ex.getMessage()); + } + } @Test public void readStringFieldsWithTrailingDelimiters() { @@ -493,6 +536,7 @@ public class CsvInputFormatTest { @Test public void testParseStringErrors() throws Exception { StringParser stringParser = new StringParser(); + stringParser.enableQuotedStringParsing((byte)'"'); Object[][] failures = { {"\"string\" trailing", FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING}, @@ -511,7 +555,8 @@ public class CsvInputFormatTest { } - @Test + // Test disabled becase we do not support double-quote escaped quotes right now. + // @Test public void testParserCorrectness() throws Exception { // RFC 4180 Compliance Test content // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 63e5ba1..6305619 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -197,6 +197,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * "hdfs://host:port/file/path"). * @param lineDelimiter * @param lineDelimiter The string that separates lines, defaults to newline. * @param fieldDelimiter The string that separates individual fields, defaults to ",". + * @param quoteCharacter The character to use for quoted String parsing, disabled by default. * @param ignoreFirstLine Whether the first line in the file should be ignored. * @param ignoreComments Lines that start with the given String are ignored, disabled by default. * @param lenient Whether the parser should silently ignore malformed lines. @@ -207,6 +208,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { filePath: String, lineDelimiter: String = "\n", fieldDelimiter: String = ",", + quoteCharacter: Character = null, ignoreFirstLine: Boolean = false, ignoreComments: String = null, lenient: Boolean = false, @@ -221,6 +223,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { inputFormat.setLenient(lenient) inputFormat.setCommentPrefix(ignoreComments) + if (quoteCharacter != null) { + inputFormat.enableQuotedStringParsing(quoteCharacter); + } + val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity) for (i <- 0 until typeInfo.getArity) { classes(i) = typeInfo.getTypeAt(i).getTypeClass http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala index e6e1e21..9b60f3f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala @@ -162,6 +162,47 @@ class CsvInputFormatTest { } @Test + def readMixedQuotedStringFields():Unit = { + try { + val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||" + val split = createTempFile(fileContent) + val format = new ScalaCsvInputFormat[(String, String, String)]( + PATH, createTypeInformation[(String, String, String)]) + format.setDelimiter("\n") + format.enableQuotedStringParsing('"') + format.setFieldDelimiter("|") + val parameters = new Configuration + format.configure(parameters) + format.open(split) + var result: (String, String, String) = null + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("abc", result._1) + assertEquals("de|f", result._2) + assertEquals("ghijk", result._3) + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("a|bc", result._1) + assertEquals("", result._2) + assertEquals("hhg", result._3) + result = format.nextRecord(result) + assertNotNull(result) + assertEquals("", result._1) + assertEquals("", result._2) + assertEquals("", result._3) + result = format.nextRecord(result) + assertNull(result) + assertTrue(format.reachedEnd) + } + catch { + case ex: Exception => { + ex.printStackTrace() + fail("Test failed due to a " + ex.getClass.getName + ": " + ex.getMessage) + } + } + } + + @Test def readStringFieldsWithTrailingDelimiters(): Unit = { try { val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"