[FLINK-1318] [api-breaking] Simplified quoted string parsing, made it optional, 
and use a configurable quote character

This closes #265


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2665cf4e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2665cf4e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2665cf4e

Branch: refs/heads/master
Commit: 2665cf4e2a9e33e0e94ac7e0b7518a10445febbb
Parents: 5e1cc9e
Author: Fabian Hueske <fhue...@apache.org>
Authored: Mon Oct 20 15:18:20 2014 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Feb 5 11:17:38 2015 +0100

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    |  91 +++++++------
 .../apache/flink/types/parser/FieldParser.java  |   4 +-
 .../apache/flink/types/parser/StringParser.java | 130 +++++++------------
 .../flink/types/parser/StringValueParser.java   | 102 +++++++--------
 .../types/parser/QuotedStringParserTest.java    |  61 +++++++++
 .../parser/QuotedStringValueParserTest.java     |  65 ++++++++++
 .../flink/types/parser/StringParserTest.java    |  64 ---------
 .../types/parser/StringValueParserTest.java     |  66 ----------
 .../types/parser/UnquotedStringParserTest.java  |  56 ++++++++
 .../parser/UnquotedStringValueParserTest.java   |  56 ++++++++
 .../types/parser/VarLengthStringParserTest.java |  88 ++++++-------
 .../org/apache/flink/api/java/io/CsvReader.java |  25 +++-
 .../flink/api/java/io/CsvInputFormatTest.java   |  47 ++++++-
 .../flink/api/scala/ExecutionEnvironment.scala  |   6 +
 .../flink/api/scala/io/CsvInputFormatTest.scala |  41 ++++++
 15 files changed, 538 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 151b1e2..662cf1b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -25,6 +25,8 @@ import com.google.common.primitives.Ints;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.apache.flink.types.parser.StringValueParser;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
@@ -41,15 +43,12 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        
        private static final byte[] DEFAULT_FIELD_DELIMITER = new byte[] {','};
 
-       private static final char QUOTE_CHARACTER = '"';
-       
-       
        // 
--------------------------------------------------------------------------------------------
        //  Variables for internal operation.
        //  They are all transient, because we do not want them so be 
serialized 
        // 
--------------------------------------------------------------------------------------------
 
-       private transient FieldParser<Object>[] fieldParsers;
+       private transient FieldParser<?>[] fieldParsers;
        
        
        // 
--------------------------------------------------------------------------------------------
@@ -65,6 +64,10 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        private boolean lenient;
        
        private boolean skipFirstLineAsHeader;
+
+       private boolean quotedStringParsing = false;
+
+       private byte quoteCharacter;
        
        
        // 
--------------------------------------------------------------------------------------------
@@ -124,6 +127,11 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        public void setSkipFirstLineAsHeader(boolean skipFirstLine) {
                this.skipFirstLineAsHeader = skipFirstLine;
        }
+
+       public void enableQuotedStringParsing(char quoteCharacter) {
+               quotedStringParsing = true;
+               this.quoteCharacter = (byte)quoteCharacter;
+       }
        
        // 
--------------------------------------------------------------------------------------------
        
@@ -254,7 +262,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                
                // instantiate the parsers
                @SuppressWarnings("unchecked")
-               FieldParser<Object>[] parsers = new 
FieldParser[fieldTypes.length];
+               FieldParser<?>[] parsers = new FieldParser[fieldTypes.length];
                
                for (int i = 0; i < fieldTypes.length; i++) {
                        if (fieldTypes[i] != null) {
@@ -265,7 +273,16 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                                
 
                                @SuppressWarnings("unchecked")
-                               FieldParser<Object> p = (FieldParser<Object>) 
InstantiationUtil.instantiate(parserType, FieldParser.class);
+                               FieldParser<?> p = (FieldParser<?>) 
InstantiationUtil.instantiate(parserType, FieldParser.class);
+
+                               if (this.quotedStringParsing) {
+                                       if (p instanceof StringParser) {
+                                               
((StringParser)p).enableQuotedStringParsing(this.quoteCharacter);
+                                       } else if (p instanceof 
StringValueParser) {
+                                               
((StringValueParser)p).enableQuotedStringParsing(this.quoteCharacter);
+                                       }
+                               }
+
                                parsers[i] = p;
                        }
                }
@@ -347,50 +364,42 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        protected int skipFields(byte[] bytes, int startPos, int limit, byte[] 
delim) {
 
                int i = startPos;
-               
                byte current;
+
                final int delimLimit = limit - delim.length + 1;
-               
-               // skip over initial whitespace lines
-               while (i < limit && ((current = bytes[i]) == ' ' || current == 
'\t')) {
-                       i++;
-               }
-               
-               // first none whitespace character
-               if (i < limit && bytes[i] == QUOTE_CHARACTER) {
-                       // quoted string
-                       i++; // the quote
-                       
-                       while (i < limit && bytes[i] != QUOTE_CHARACTER) {
+
+               if(quotedStringParsing == true && bytes[i] == quoteCharacter) {
+
+                       // quoted string parsing enabled and field is quoted
+                       // search for ending quote character
+                       while(i < limit && bytes[i] != quoteCharacter) {
                                i++;
                        }
-                       
-                       if (i < limit) {
-                               // end of the quoted field
-                               i++; // the quote
-                               
-                               // skip trailing whitespace characters 
-                               while (i < delimLimit && 
!FieldParser.delimiterNext(bytes, i, delim)) {
-                                       current = bytes[i];
-                                       if (current == ' ' || current == '\t') {
-                                               i++;
-                                       } else {
-                                               return -1;      // illegal case 
of non-whitespace characters trailing
-                                       }
-                               }
-                               
-                               return (i >= delimLimit ? limit : i + 
delim.length);
+                       i++;
+
+                       if (i == limit) {
+                               // we are at the end of the record
+                               return limit;
+                       } else if ( i < delimLimit && 
FieldParser.delimiterNext(bytes, i, delim)) {
+                               // we are not at the end, check if delimiter 
comes next
+                               return i + delim.length;
                        } else {
-                               // exited due to line end without quote 
termination
+                               // delimiter did not follow end quote. Error...
                                return -1;
                        }
-               }
-               else {
-                       // unquoted field
-                       while (i < delimLimit && 
!FieldParser.delimiterNext(bytes, i, delim)) {
+               } else {
+                       // field is not quoted
+                       while(i < delimLimit && 
!FieldParser.delimiterNext(bytes, i, delim)) {
                                i++;
                        }
-                       return (i >= delimLimit ? limit : i + delim.length);
+
+                       if (i >= delimLimit) {
+                               // no delimiter found. We are at the end of the 
record
+                               return limit;
+                       } else {
+                               // delimiter found.
+                               return i + delim.length;
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 28f9a7a..33697fd 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -59,10 +59,10 @@ public abstract class FieldParser<T> {
                
                /** The field was not in a correct format for the numeric type. 
*/
                NUMERIC_VALUE_FORMAT_ERROR,
-               
+
                /** A quoted string was not terminated until the line end. */
                UNTERMINATED_QUOTED_STRING,
-               
+
                /** The parser found characters between the end of the quoted 
string and the delimiter. */
                UNQUOTED_CHARS_AFTER_QUOTED_STRING
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index bd2550e..0860d41 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -25,112 +25,70 @@ package org.apache.flink.types.parser;
  * strings, whitespaces (space and tab) leading and trailing before and after 
the quotes are removed.
  */
 public class StringParser extends FieldParser<String> {
-       
-       private static final byte WHITESPACE_SPACE = (byte) ' ';
-       private static final byte WHITESPACE_TAB = (byte) '\t';
 
-       private static final byte QUOTE_CHARACTER = (byte) '"';
+       private boolean quotedStringParsing = false;
+       private byte quoteCharacter;
 
-       private static enum ParserStates {
-               NONE, IN_QUOTE, STOP
+       private String result;
+
+       public void enableQuotedStringParsing(byte quoteCharacter) {
+               this.quotedStringParsing = true;
+               this.quoteCharacter = quoteCharacter;
        }
 
-       private String result;
-       
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
-               
+
                int i = startPos;
                byte current;
-               boolean delimiterFound = false;
 
                final int delimLimit = limit-delimiter.length+1;
-               
-               // count initial whitespace lines
-               while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE 
|| current == WHITESPACE_TAB)) {
-                       i++;
-               }
 
-               // first determine the boundaries of the cell
-               ParserStates parserState = ParserStates.NONE;
-
-               // the current position evaluated against the cell boundary
-               int endOfCellPosition = i - 1;
+               if(quotedStringParsing == true && bytes[i] == quoteCharacter) {
+                       // quoted string parsing enabled and first character 
Vis a quote
+                       i++;
 
-               while (parserState != ParserStates.STOP && endOfCellPosition < 
limit) {
-                       endOfCellPosition++;
-                       // make sure we don't step over the end of the buffer
-                       if(endOfCellPosition == limit) {
-                               break;
+                       // search for ending quote character
+                       while(i < limit && bytes[i] != quoteCharacter) {
+                               i++;
                        }
-                       if(endOfCellPosition < delimLimit && 
delimiterNext(bytes, endOfCellPosition, delimiter)) {
-                               // if we are in a quote do nothing, otherwise 
we reached the end
-                               if (parserState != ParserStates.IN_QUOTE) {
-                                       parserState = ParserStates.STOP;
-                                       delimiterFound = true;
-                               }
-                               endOfCellPosition += delimiter.length - 1;
-                       } else if(bytes[endOfCellPosition] == QUOTE_CHARACTER) {
-                               // we entered a quote
-                               if(parserState == ParserStates.IN_QUOTE) {
-                                       // we end the quote
-                                       parserState = ParserStates.NONE;
+
+                       if (i == limit) {
+                               
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
+                               return -1;
+                       } else {
+                               i++;
+                               // check for proper termination
+                               if (i == limit) {
+                                       // either by end of line
+                                       this.result = new String(bytes, 
startPos+1, i - startPos - 2);
+                                       return limit;
+                               } else if ( i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
+                                       // or following field delimiter
+                                       this.result = new String(bytes, 
startPos+1, i - startPos - 2);
+                                       return i + delimiter.length;
                                } else {
-                                       // we start a new quote
-                                       parserState = ParserStates.IN_QUOTE;
+                                       // no proper termination
+                                       
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+                                       return -1;
                                }
                        }
-               }
-               int delimCorrection = delimiterFound ? delimiter.length : 1;
+               } else {
 
-               if(parserState == ParserStates.IN_QUOTE) {
-                       // exited due to line end without quote termination
-                       
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
-                       return -1;
-               }
-
-               // boundary of the cell is now
-               // i --> endOfCellPosition
-
-               // first none whitespace character
-               if (i < limit && bytes[i] == QUOTE_CHARACTER) {
-
-                       // check if there are characters at the end
-                       current = bytes[endOfCellPosition - delimCorrection];
-
-                       // if the character preceding the end of the cell is 
not a WHITESPACE or the end QUOTE_DOUBLE
-                       // there are unquoted characters at the end
-
-                       if (!(current == WHITESPACE_SPACE || current == 
WHITESPACE_TAB || current == QUOTE_CHARACTER)) {
-                               
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
-                               return -1;      // illegal case of 
non-whitespace characters trailing
+                       // look for delimiter
+                       while( i < delimLimit && !delimiterNext(bytes, i, 
delimiter)) {
+                               i++;
                        }
 
-                       // skip trailing whitespace after quote .. by moving 
the cursor backwards
-                       int skipAtEnd = 0;
-                       while (bytes[endOfCellPosition - delimCorrection - 
skipAtEnd] == WHITESPACE_SPACE ||
-                                       bytes[endOfCellPosition - 
delimCorrection - skipAtEnd] == WHITESPACE_TAB) {
-                               skipAtEnd++;
+                       if (i >= delimLimit) {
+                               // no delimiter found. Take the full string
+                               this.result = new String(bytes, startPos, limit 
- startPos);
+                               return limit;
+                       } else {
+                               // delimiter found.
+                               this.result = new String(bytes, startPos, i - 
startPos);
+                               return i + delimiter.length;
                        }
-
-                       // now unescape
-                       boolean notEscaped = true;
-                       int endOfContent = i + 1;
-                       for(int counter = endOfContent; counter < 
endOfCellPosition - delimCorrection - skipAtEnd; counter++) {
-                               notEscaped = bytes[counter] != QUOTE_CHARACTER 
|| !notEscaped;
-                               if (notEscaped) {
-                                       // realign
-                                       bytes[endOfContent++] = bytes[counter];
-                               }
-                       }
-                       this.result = new String(bytes, i + 1, endOfContent - i 
- 1);
-                       return (endOfCellPosition == limit ? limit : 
endOfCellPosition + 1);
-               }
-               else {
-                       // unquoted string
-                       // set from the beginning. unquoted strings include the 
leading whitespaces
-                       this.result = new String(bytes, i, endOfCellPosition - 
i - (delimCorrection - 1));
-                       return (endOfCellPosition == limit ? limit : 
endOfCellPosition + 1);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index 7fd7b54..3cbb21e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -30,13 +30,16 @@ import org.apache.flink.types.StringValue;
  */
 public class StringValueParser extends FieldParser<StringValue> {
 
-       private static final byte WHITESPACE_SPACE = (byte) ' ';
-       private static final byte WHITESPACE_TAB = (byte) '\t';
-       
-       private static final byte QUOTE_DOUBLE = (byte) '"';
-       
+       private boolean quotedStringParsing = false;
+       private byte quoteCharacter;
+
        private StringValue result;
 
+       public void enableQuotedStringParsing(byte quoteCharacter) {
+               this.quotedStringParsing = true;
+               this.quoteCharacter = quoteCharacter;
+       }
+       
        @Override
        public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, StringValue reusable) {
 
@@ -46,64 +49,53 @@ public class StringValueParser extends 
FieldParser<StringValue> {
 
                final int delimLimit = limit-delimiter.length+1;
 
-               // count initial whitespace lines
-               while (i < limit && ((current = bytes[i]) == WHITESPACE_SPACE 
|| current == WHITESPACE_TAB)) {
+               if(quotedStringParsing == true && bytes[i] == quoteCharacter) {
+                       // quoted string parsing enabled and first character is 
a quote
                        i++;
-               }
-               
-               // first none whitespace character
-               if (i < limit && bytes[i] == QUOTE_DOUBLE) {
-                       // quoted string
-                       i++; // the quote
-                       
-                       // we count only from after the quote
-                       int quoteStart = i;
-                       while (i < limit && bytes[i] != QUOTE_DOUBLE) {
+
+                       // search for ending quote character
+                       while(i < limit && bytes[i] != quoteCharacter) {
                                i++;
                        }
-                       
-                       if (i < limit) {
-                               // end of the string
-                               reusable.setValueAscii(bytes, quoteStart, 
i-quoteStart);
-                               
-                               i++; // the quote
-                               
-                               // skip trailing whitespace characters
-                               while (i < limit) {
-
-                                       if (i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
-                                               return i+delimiter.length;
-                                       }
-                                       current = bytes[i];
-                                       if (current == WHITESPACE_SPACE || 
current == WHITESPACE_TAB) {
-                                               i++;
-                                       }
-                                       else {
-                                               
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
-                                               return -1;      // illegal case 
of non-whitespace characters trailing
-                                       }
-                               }
-                               if( i > limit ){
-                                       i--;
-                               }
-                               return (i == limit ? limit : i + 
delimiter.length);
-                       } else {
-                               // exited due to line end without quote 
termination
+
+                       if (i == limit) {
                                
setErrorState(ParseErrorState.UNTERMINATED_QUOTED_STRING);
                                return -1;
-                       }
-               }
-               else {
-                       // unquoted string -delim.length
-                       while (i < limit) {
-                               if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
-                                       break;
+                       } else {
+                               i++;
+                               // check for proper termination
+                               if (i == limit) {
+                                       // either by end of line
+                                       reusable.setValueAscii(bytes, 
startPos+1, i - startPos - 2);
+                                       return limit;
+                               } else if ( i < delimLimit && 
delimiterNext(bytes, i, delimiter)) {
+                                       // or following field delimiter
+                                       reusable.setValueAscii(bytes, 
startPos+1, i - startPos - 2);
+                                       return i + delimiter.length;
+                               } else {
+                                       // no proper termination
+                                       
setErrorState(ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING);
+                                       return -1;
                                }
+
+                       }
+
+               } else {
+
+                       // look for delimiter
+                       while( i < delimLimit && !delimiterNext(bytes, i, 
delimiter)) {
                                i++;
                        }
-                       // set from the beginning. unquoted strings include the 
leading whitespaces
-                       reusable.setValueAscii(bytes, startPos, i-startPos);
-                       return (i == limit ? limit : i + delimiter.length);
+
+                       if (i >= delimLimit) {
+                               // no delimiter found. Take the full string
+                               reusable.setValueAscii(bytes, startPos, limit - 
startPos);
+                               return limit;
+                       } else {
+                               // delimiter found.
+                               reusable.setValueAscii(bytes, startPos, i - 
startPos);
+                               return i + delimiter.length;
+                       }
                }
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
new file mode 100644
index 0000000..89d5ac8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+public class QuotedStringParserTest extends ParserTestBase<String> {
+
+    @Override
+    public String[] getValidTestValues() {
+        return new String[] {
+                "\"abcdefgh\"", "\"i\"", "\"jklmno\"", "\"abc|de|fgh\"",
+                "\"abc&&&&def&&&&ghij\"", "\"i\"", "\"Hello9\"",
+                "abcdefgh", "i", "jklmno", "Hello9"
+        };
+    }
+
+    @Override
+    public String[] getValidTestResults() {
+        return new String[] {
+                "abcdefgh", "i", "jklmno", "abc|de|fgh",
+                "abc&&&&def&&&&ghij", "i", "Hello9",
+                "abcdefgh", "i", "jklmno", "Hello9"
+        };
+    }
+
+    @Override
+    public String[] getInvalidTestValues() {
+        return new String[] {
+                "\"abcd\"ef", "\"abcdef"
+        };
+    }
+
+    @Override
+    public FieldParser<String> getParser() {
+        StringParser p = new StringParser();
+        p.enableQuotedStringParsing((byte)'"');
+        return p;
+    }
+
+    @Override
+    public Class<String> getTypeClass() {
+        return String.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
new file mode 100644
index 0000000..66097ac
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import org.apache.flink.types.StringValue;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringValueParser;
+
+
+public class QuotedStringValueParserTest extends ParserTestBase<StringValue> {
+
+    @Override
+    public String[] getValidTestValues() {
+        return new String[] {
+                "\"abcdefgh\"", "\"i\"", "\"jklmno\"", "\"abc|de|fgh\"",
+                "\"abc&&&&def&&&&ghij\"", "\"i\"", "\"Hello9\"",
+                "abcdefgh", "i", "jklmno", "Hello9"
+        };
+    }
+
+    @Override
+    public StringValue[] getValidTestResults() {
+        return new StringValue[] {
+                new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"), new StringValue("abc|de|fgh"),
+                new StringValue("abc&&&&def&&&&ghij"), new StringValue("i"), 
new StringValue("Hello9"),
+                new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"), new StringValue("Hello9"),
+        };
+    }
+
+    @Override
+    public String[] getInvalidTestValues() {
+        return new String[] {
+                "\"abcd\"ef", "\"abcdef"
+        };
+    }
+
+    @Override
+    public FieldParser<StringValue> getParser() {
+        StringValueParser p = new StringValueParser();
+        p.enableQuotedStringParsing((byte)'"');
+        return p;
+    }
+
+    @Override
+    public Class<StringValue> getTypeClass() {
+        return StringValue.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
deleted file mode 100644
index 702f985..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringParserTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.types.parser;
-
-import org.apache.flink.types.parser.StringParser;
-import org.apache.flink.types.parser.FieldParser;
-
-
-public class StringParserTest extends ParserTestBase<String> {
-
-       @Override
-       public String[] getValidTestValues() {
-               return new String[] {
-                       "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", 
"\"jklmno\"", 
-                       "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"",
-                       "  \"abcdefgh\"", "     \"i\"\t\t\t", "\t \t\"jklmno\"  
",
-                       "  \"     abcd    \" \t ", "Hello9"
-               };
-       }
-       
-       @Override
-       public String[] getValidTestResults() {
-               return new String[] {
-                       "abcdefgh", "i", "jklmno", "abcdefgh", "i", "jklmno",
-                       "ab,cde|fg", "hij|m|n|op", "hij&&m&&n&&op",
-                       "abcdefgh", "i", "jklmno",
-                       "     abcd    ", "Hello9"
-               };
-       }
-
-       @Override
-       public String[] getInvalidTestValues() {
-               return new String[] {
-                       "  \"abcdefgh ", "  \"ijklmno\" hj"
-               };
-       }
-
-       @Override
-       public FieldParser<String> getParser() {
-               return new StringParser();
-       }
-
-       @Override
-       public Class<String> getTypeClass() {
-               return String.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
deleted file mode 100644
index 66ad32d..0000000
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/StringValueParserTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.types.parser;
-
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.StringValueParser;
-
-
-public class StringValueParserTest extends ParserTestBase<StringValue> {
-
-       @Override
-       public String[] getValidTestValues() {
-               return new String[] {
-                       "abcdefgh", "i", "jklmno", "\"abcdefgh\"", "\"i\"", 
"\"jklmno\"", 
-                       "\"ab,cde|fg\"", "\"hij|m|n|op\"", "\"hij&&m&&n&&op\"",
-                       "  \"abcdefgh\"", "     \"i\"\t\t\t", "\t \t\"jklmno\"  
",
-                       "  \"     abcd    \" \t ", "Hello9"
-               };
-       }
-       
-       @Override
-       public StringValue[] getValidTestResults() {
-               return new StringValue[] {
-                       new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"),
-                       new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"), 
-                       new StringValue("ab,cde|fg"), new 
StringValue("hij|m|n|op"), new StringValue("hij&&m&&n&&op"),
-                       new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"),
-                       new StringValue("     abcd    "), new 
StringValue("Hello9")
-               };
-       }
-
-       @Override
-       public String[] getInvalidTestValues() {
-               return new String[] {
-                       "  \"abcdefgh ", "  \"ijklmno\" hj"
-               };
-       }
-
-       @Override
-       public FieldParser<StringValue> getParser() {
-               return new StringValueParser();
-       }
-
-       @Override
-       public Class<StringValue> getTypeClass() {
-               return StringValue.class;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
new file mode 100644
index 0000000..cadd021
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import org.apache.flink.types.parser.StringParser;
+import org.apache.flink.types.parser.FieldParser;
+
+
+public class UnquotedStringParserTest extends ParserTestBase<String> {
+
+    @Override
+    public String[] getValidTestValues() {
+        return new String[] {
+                "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9"
+        };
+    }
+
+    @Override
+    public String[] getValidTestResults() {
+        return new String[] {
+                "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9"
+        };
+    }
+
+    @Override
+    public String[] getInvalidTestValues() {
+        return new String[] { };
+    }
+
+    @Override
+    public FieldParser<String> getParser() {
+        return new StringParser();
+    }
+
+    @Override
+    public Class<String> getTypeClass() {
+        return String.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
new file mode 100644
index 0000000..d66e852
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringValueParserTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import org.apache.flink.types.StringValue;
+
+
+public class UnquotedStringValueParserTest extends ParserTestBase<StringValue> 
{
+
+    @Override
+    public String[] getValidTestValues() {
+        return new String[] {
+                "abcdefgh", "i", "jklmno", "\"abc\"defgh\"", "\"i\"", "Hello9"
+        };
+    }
+
+    @Override
+    public StringValue[] getValidTestResults() {
+        return new StringValue[] {
+                new StringValue("abcdefgh"), new StringValue("i"), new 
StringValue("jklmno"),
+                new StringValue("\"abc\"defgh\""), new StringValue("\"i\""), 
new StringValue("Hello9")
+        };
+    }
+
+    @Override
+    public String[] getInvalidTestValues() {
+        return new String[] { };
+    }
+
+    @Override
+    public FieldParser<StringValue> getParser() {
+        return new StringValueParser();
+    }
+
+    @Override
+    public Class<StringValue> getTypeClass() {
+        return StringValue.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 56b1dd0..4f9069e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -38,6 +38,8 @@ public class VarLengthStringParserTest {
        
        @Test
        public void testParseValidUnquotedStrings() {
+
+               this.parser = new StringValueParser();
                
                // check valid strings with out whitespaces and trailing 
delimiter
                byte[] recBytes = "abcdefgh|i|jklmno|".getBytes();
@@ -77,8 +79,11 @@ public class VarLengthStringParserTest {
        }
        
        @Test
-       public void testParseValidQuotedStringsWithoutWhitespaces() {
-               
+       public void testParseValidQuotedStrings() {
+
+               this.parser = new StringValueParser();
+               this.parser.enableQuotedStringParsing((byte)'"');
+
                // check valid strings with out whitespaces and trailing 
delimiter
                byte[] recBytes = "\"abcdefgh\"|\"i\"|\"jklmno\"|".getBytes();
                StringValue s = new StringValue();
@@ -137,68 +142,57 @@ public class VarLengthStringParserTest {
                assertTrue(startPos == 25);
                assertTrue(s.getValue().equals("hij|kl|mn|op"));
        }
-       
+
        @Test
-       public void testParseValidQuotedStringsWithWhitespaces() {
-               
+       public void testParseValidMixedStrings() {
+
+               this.parser = new StringValueParser();
+               this.parser.enableQuotedStringParsing((byte)'@');
+
                // check valid strings with out whitespaces and trailing 
delimiter
-               byte[] recBytes = "  \"abcdefgh\"|     \"i\"\t\t\t|\t 
\t\"jklmno\"  |".getBytes();
+               byte[] recBytes = "@abcde|gh@|@i@|jklmnopq|@rs@|tuv".getBytes();
                StringValue s = new StringValue();
-               
+
                int startPos = 0;
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 13);
-               assertTrue(s.getValue().equals("abcdefgh"));
-               
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 25);
-               assertTrue(s.getValue().equals("i"));
-               
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 39);
-               assertTrue(s.getValue().equals("jklmno"));
-               
-               // check valid strings with out whitespaces without trailing 
delimiter
-               recBytes = "  \"abcdefgh\"|     \"i\"\t\t\t|\t \t\"jklmno\"  
".getBytes();
-               s = new StringValue();
-               
-               startPos = 0;
-               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 13);
-               assertTrue(s.getValue().equals("abcdefgh"));
-               
+               assertTrue(startPos == 11);
+               assertTrue(s.getValue().equals("abcde|gh"));
+
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 25);
+               assertTrue(startPos == 15);
                assertTrue(s.getValue().equals("i"));
-               
+
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 38);
-               assertTrue(s.getValue().equals("jklmno"));
-               
-               // check single field not terminated
-               recBytes = "  \t\"abcde\"\t\t  \t ".getBytes();
-               startPos = 0;
+               assertTrue(startPos == 24);
+               assertTrue(s.getValue().equals("jklmnopq"));
+
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 16);
-               assertTrue(s.getValue().equals("abcde"));
-               
-               // check single field terminated
-               recBytes = "  \t\"abcde\"\t\t  \t |".getBytes();
-               startPos = 0;
+               assertTrue(startPos == 29);
+               assertTrue(s.getValue().equals("rs"));
+
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
-               assertTrue(startPos == 17);
-               assertTrue(s.getValue().equals("abcde"));
+               assertTrue(startPos == 32);
+               assertTrue(s.getValue().equals("tuv"));
+
        }
-       
+
+
        @Test
        public void testParseInvalidQuotedStrings() {
-               
+
+               this.parser = new StringValueParser();
+               this.parser.enableQuotedStringParsing((byte)'"');
+
                // check valid strings with out whitespaces and trailing 
delimiter
-               byte[] recBytes = "  \"abcdefgh\" gh |     \"i\"\t\t\t|\t 
\t\"jklmno\"  |".getBytes();
+               byte[] recBytes = "\"abcdefgh\"-|\"jklmno  ".getBytes();
                StringValue s = new StringValue();
-               
+
                int startPos = 0;
                startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
                assertTrue(startPos < 0);
+
+               startPos = 12;
+               startPos = parser.parseField(recBytes, startPos, 
recBytes.length, new byte[] {'|'}, s);
+               assertTrue(startPos < 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
index b31c9a4..ac879b7 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
@@ -50,9 +50,13 @@ public class CsvReader {
        protected String lineDelimiter = CsvInputFormat.DEFAULT_LINE_DELIMITER;
        
        protected String fieldDelimiter = 
CsvInputFormat.DEFAULT_FIELD_DELIMITER;
-       
+
        protected String commentPrefix = null; //default: no comments
 
+       protected boolean parseQuotedStrings = false;
+
+       protected char quoteCharacter = '"';
+
        protected boolean skipFirstLineAsHeader = false;
        
        protected boolean ignoreInvalidLines = false;
@@ -117,7 +121,21 @@ public class CsvReader {
                this.fieldDelimiter = delimiter;
                return this;
        }
-       
+
+       /**
+        * Enables quoted String parsing. Field delimiters in quoted Strings 
are ignored.
+        * A String is parsed as quoted if it starts and ends with a quoting 
character and as unquoted otherwise.
+        * Leading or tailing whitespaces are not allowed.
+        *
+        * @param quoteCharacter The character which is used as quoting 
character.
+        * @return The CSV reader instance itself, to allow for fluent function 
chaining.
+        */
+       public CsvReader parseQuotedStrings(char quoteCharacter) {
+               this.parseQuotedStrings = true;
+               this.quoteCharacter = quoteCharacter;
+               return this;
+       }
+
        /**
         * Configures the string that starts comments.
         * By default comments will be treated as invalid lines.
@@ -297,6 +315,9 @@ public class CsvReader {
                format.setCommentPrefix(this.commentPrefix);
                format.setSkipFirstLineAsHeader(skipFirstLineAsHeader);
                format.setLenient(ignoreInvalidLines);
+               if (this.parseQuotedStrings) {
+                       format.enableQuotedStringParsing(this.quoteCharacter);
+               }
                if (this.includedMask == null) {
                        format.setFieldTypes(types);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 906e6d9..6306f6e 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -232,6 +232,49 @@ public class CsvInputFormatTest {
                        fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
                }
        }
+
+       @Test
+       public void readMixedQuotedStringFields() {
+               try {
+                       final String fileContent = 
"@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
+                       final FileInputSplit split = 
createTempFile(fileContent);
+
+                       final CsvInputFormat<Tuple3<String, String, String>> 
format = new CsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", "|", 
String.class, String.class, String.class);
+
+                       final Configuration parameters = new Configuration();
+                       format.configure(parameters);
+                       format.enableQuotedStringParsing('@');
+                       format.open(split);
+
+                       Tuple3<String, String, String> result = new 
Tuple3<String, String, String>();
+
+                       result = format.nextRecord(result);
+                       assertNotNull(result);
+                       assertEquals("a|b|c", result.f0);
+                       assertEquals("def", result.f1);
+                       assertEquals("ghijk", result.f2);
+
+                       result = format.nextRecord(result);
+                       assertNotNull(result);
+                       assertEquals("abc", result.f0);
+                       assertEquals("", result.f1);
+                       assertEquals("|hhg", result.f2);
+
+                       result = format.nextRecord(result);
+                       assertNotNull(result);
+                       assertEquals("", result.f0);
+                       assertEquals("", result.f1);
+                       assertEquals("", result.f2);
+
+                       result = format.nextRecord(result);
+                       assertNull(result);
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       ex.printStackTrace();
+                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
+               }
+       }
        
        @Test
        public void readStringFieldsWithTrailingDelimiters() {
@@ -493,6 +536,7 @@ public class CsvInputFormatTest {
        @Test
        public void testParseStringErrors() throws Exception {
                StringParser stringParser = new StringParser();
+               stringParser.enableQuotedStringParsing((byte)'"');
 
                Object[][] failures = {
                                {"\"string\" trailing", 
FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
@@ -511,7 +555,8 @@ public class CsvInputFormatTest {
 
        }
 
-       @Test
+       // Test disabled becase we do not support double-quote escaped quotes 
right now.
+       // @Test
        public void testParserCorrectness() throws Exception {
                // RFC 4180 Compliance Test content
                // Taken from 
http://en.wikipedia.org/wiki/Comma-separated_values#Example

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 63e5ba1..6305619 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -197,6 +197,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    *                 "hdfs://host:port/file/path").   * @param lineDelimiter
    * @param lineDelimiter The string that separates lines, defaults to newline.
    * @param fieldDelimiter The string that separates individual fields, 
defaults to ",".
+   * @param quoteCharacter The character to use for quoted String parsing, 
disabled by default.
    * @param ignoreFirstLine Whether the first line in the file should be 
ignored.
    * @param ignoreComments Lines that start with the given String are ignored, 
disabled by default.
    * @param lenient Whether the parser should silently ignore malformed lines.
@@ -207,6 +208,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
       filePath: String,
       lineDelimiter: String = "\n",
       fieldDelimiter: String = ",",
+      quoteCharacter: Character = null,
       ignoreFirstLine: Boolean = false,
       ignoreComments: String = null,
       lenient: Boolean = false,
@@ -221,6 +223,10 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
     inputFormat.setLenient(lenient)
     inputFormat.setCommentPrefix(ignoreComments)
 
+    if (quoteCharacter != null) {
+      inputFormat.enableQuotedStringParsing(quoteCharacter);
+    }
+
     val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
     for (i <- 0 until typeInfo.getArity) {
       classes(i) = typeInfo.getTypeAt(i).getTypeClass

http://git-wip-us.apache.org/repos/asf/flink/blob/2665cf4e/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index e6e1e21..9b60f3f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -162,6 +162,47 @@ class CsvInputFormatTest {
   }
 
   @Test
+  def readMixedQuotedStringFields():Unit = {
+    try {
+      val fileContent = "abc|\"de|f\"|ghijk\n\"a|bc\"||hhg\n|||"
+      val split = createTempFile(fileContent)
+      val format = new ScalaCsvInputFormat[(String, String, String)](
+        PATH, createTypeInformation[(String, String, String)])
+      format.setDelimiter("\n")
+      format.enableQuotedStringParsing('"')
+      format.setFieldDelimiter("|")
+      val parameters = new Configuration
+      format.configure(parameters)
+      format.open(split)
+      var result: (String, String, String) = null
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result._1)
+      assertEquals("de|f", result._2)
+      assertEquals("ghijk", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("a|bc", result._1)
+      assertEquals("", result._2)
+      assertEquals("hhg", result._3)
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("", result._1)
+      assertEquals("", result._2)
+      assertEquals("", result._3)
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+    }
+    catch {
+      case ex: Exception => {
+        ex.printStackTrace()
+        fail("Test failed due to a " + ex.getClass.getName + ": " + 
ex.getMessage)
+      }
+    }
+  }
+
+  @Test
   def readStringFieldsWithTrailingDelimiters(): Unit = {
     try {
       val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"

Reply via email to