Repository: flink
Updated Branches:
  refs/heads/master 7cb35ea91 -> 12cd15622


[FLINK-2061] [java api] Fix GenericCsvInputFormat skipping fields error with 
quoted string

This closes #734


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12cd1562
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12cd1562
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12cd1562

Branch: refs/heads/master
Commit: 12cd15622afbbcbb40c9dd80151f4ca459a26922
Parents: 7cb35ea
Author: Chiwan Park <chiwanp...@icloud.com>
Authored: Wed May 27 22:24:59 2015 +0900
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri May 29 21:34:25 2015 +0200

----------------------------------------------------------------------
 .../api/common/io/GenericCsvInputFormat.java    |  1 +
 .../flink/api/java/io/CsvInputFormatTest.java   | 32 ++++++++++++++++++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/12cd1562/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 1803a2b..b132ca2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -444,6 +444,7 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
 
                        // quoted string parsing enabled and field is quoted
                        // search for ending quote character
+                       i++;
                        while(i < limit && bytes[i] != quoteCharacter) {
                                i++;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/12cd1562/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 3d87984..11cd6da 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -981,6 +981,38 @@ public class CsvInputFormatTest {
                }
        }
 
+       @Test
+       public void testQuotedStringParsingWithIncludeFields() throws Exception 
{
+               final String fileContent = "\"20:41:52-1-3-2015\"|\"Re: 
Taskmanager memory error in Eclipse\"|" +
+                               "\"Blahblah 
<b...@blahblah.org>\"|\"bla\"|\"blubb\"";
+
+               final File tempFile = 
File.createTempFile("CsvReaderQuotedString", "tmp");
+               tempFile.deleteOnExit();
+               tempFile.setWritable(true);
+
+               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+               writer.write(fileContent);
+               writer.close();
+
+               TypeInformation<Tuple2<String, String>> typeInfo = 
TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
+               CsvInputFormat<Tuple2<String, String>> inputFormat = new 
CsvInputFormat<Tuple2<String, String>>(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+               inputFormat.enableQuotedStringParsing('"');
+               inputFormat.setFieldDelimiter('|');
+               inputFormat.setDelimiter('\n');
+               inputFormat.setFields(new boolean[]{true, false, true}, new 
Class[]{String.class, String.class});
+
+               inputFormat.configure(new Configuration());
+               FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+               inputFormat.open(splits[0]);
+
+               Tuple2<String, String> record = inputFormat.nextRecord(new 
Tuple2<String, String>());
+
+               assertEquals("20:41:52-1-3-2015", record.f0);
+               assertEquals("Blahblah <b...@blahblah.org>", record.f1);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        // Custom types for testing
        // 
--------------------------------------------------------------------------------------------

Reply via email to