This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new a7a281c215 NIFI-12023: Add FastCSV parser to CSVReader
a7a281c215 is described below

commit a7a281c215b2ca2c172981159cb7ca9399ba6486
Author: Matt Burgess <mattyb...@apache.org>
AuthorDate: Tue Sep 12 22:04:47 2023 -0400

    NIFI-12023: Add FastCSV parser to CSVReader
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #7685.
---
 .../main/java/org/apache/nifi/csv/CSVUtils.java    |   2 +
 .../nifi-record-serialization-services/pom.xml     |   8 +
 .../apache/nifi/csv/AbstractCSVRecordReader.java   |   2 +-
 .../main/java/org/apache/nifi/csv/CSVReader.java   |  10 +-
 .../org/apache/nifi/csv/FastCSVRecordReader.java   | 199 ++++++++
 .../apache/nifi/csv/TestFastCSVRecordReader.java   | 506 +++++++++++++++++++++
 .../resources/csv/multi-bank-account_RFC4180.csv   |   3 +
 .../csv/multi-bank-account_escapechar_RFC4180.csv  |   3 +
 .../resources/csv/single-bank-account_RFC4180.csv  |   2 +
 9 files changed, 733 insertions(+), 2 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
index 50efc8dc8b..cd3b16a3f1 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -151,6 +151,8 @@ public class CSVUtils {
                 "* Apache Commons CSV - duplicate headers will result in 
column data \"shifting\" right with new fields " +
                 "created for \"unknown_field_index_X\" where \"X\" is the CSV 
column index number\n" +
                 "* Jackson CSV - duplicate headers will be de-duplicated with 
the field value being that of the right-most " +
+                "duplicate CSV column\n" +
+                "* FastCSV - duplicate headers will be de-duplicated with the 
field value being that of the left-most " +
                 "duplicate CSV column")
         .expressionLanguageSupported(ExpressionLanguageScope.NONE)
         .allowableValues("true", "false")
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index c636c23f63..5bc6e4e968 100755
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -75,6 +75,11 @@
             <groupId>com.fasterxml.jackson.dataformat</groupId>
             <artifactId>jackson-dataformat-csv</artifactId>
         </dependency>
+        <dependency>
+            <groupId>de.siegmar</groupId>
+            <artifactId>fastcsv</artifactId>
+            <version>2.2.2</version>
+        </dependency>
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
@@ -183,8 +188,11 @@
 
                         
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
+                        
<exclude>src/test/resources/csv/multi-bank-account_RFC4180.csv</exclude>
                         
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
+                        
<exclude>src/test/resources/csv/single-bank-account_RFC4180.csv</exclude>
                         
<exclude>src/test/resources/csv/multi-bank-account_escapechar.csv</exclude>
+                        
<exclude>src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv</exclude>
                         
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
                         
<exclude>src/test/resources/csv/prov-events.csv</exclude>
                         
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
index 4174e9d476..6a2651bd9e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/AbstractCSVRecordReader.java
@@ -150,7 +150,7 @@ abstract public class AbstractCSVRecordReader implements 
RecordReader {
         return value;
     }
 
-    private String trim(String value) {
+    protected String trim(String value) {
         return (value.length() > 1) && value.startsWith("\"") && 
value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 56ad3b6331..9974b07f39 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -68,6 +68,12 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
     public static final AllowableValue JACKSON_CSV = new 
AllowableValue("jackson-csv", "Jackson CSV",
             "The CSV parser implementation from the Jackson Dataformats 
library.");
 
+    public static final AllowableValue FAST_CSV = new 
AllowableValue("fast-csv", "FastCSV",
+            "The CSV parser implementation from the FastCSV library. NOTE: 
This parser only officially supports RFC-4180, so it recommended to "
+                    + "set the 'CSV Format' property to 'RFC 4180'. It does 
handle some non-compliant CSV data, for that case set the 'CSV Format' property 
to "
+                    + "'CUSTOM' and the other custom format properties (such 
as 'Trim Fields', 'Trim double quote', etc.) as appropriate. Be aware that this 
"
+                    + "may cause errors if FastCSV doesn't handle the property 
settings correctly (such as 'Ignore Header'), but otherwise may process the 
input as expected even "
+                    + "if the data is not fully RFC-4180 compliant.");
 
     public static final PropertyDescriptor CSV_PARSER = new 
PropertyDescriptor.Builder()
             .name("csv-reader-csv-parser")
@@ -75,7 +81,7 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
             .description("Specifies which parser to use to read CSV records. 
NOTE: Different parsers may support different subsets of functionality "
                     + "and may also exhibit different levels of performance.")
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV)
+            .allowableValues(APACHE_COMMONS_CSV, JACKSON_CSV, FAST_CSV)
             .defaultValue(APACHE_COMMONS_CSV.getValue())
             .required(true)
             .build();
@@ -175,6 +181,8 @@ public class CSVReader extends SchemaRegistryService 
implements RecordReaderFact
             return new CSVRecordReader(in, logger, schema, format, 
firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, 
charSet, trimDoubleQuote);
         } else if (JACKSON_CSV.getValue().equals(csvParser)) {
             return new JacksonCSVRecordReader(in, logger, schema, format, 
firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, 
charSet, trimDoubleQuote);
+        } else if (FAST_CSV.getValue().equals(csvParser)) {
+            return new FastCSVRecordReader(in, logger, schema, format, 
firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat, 
charSet, trimDoubleQuote);
         } else {
             throw new IOException("Parser not supported");
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
new file mode 100644
index 0000000000..649e62f5b9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/FastCSVRecordReader.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import de.siegmar.fastcsv.reader.CommentStrategy;
+import de.siegmar.fastcsv.reader.CsvReader;
+import de.siegmar.fastcsv.reader.CsvRow;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class FastCSVRecordReader extends AbstractCSVRecordReader {
+    private final CsvReader csvReader;
+    private final Iterator<CsvRow> csvRowIterator;
+
+    private List<RecordField> recordFields;
+
+    private Map<String, Integer> headerMap;
+
+    private final boolean ignoreHeader;
+    private final boolean trimDoubleQuote;
+    private final CSVFormat csvFormat;
+
+    public FastCSVRecordReader(final InputStream in, final ComponentLog 
logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean 
hasHeader, final boolean ignoreHeader,
+                               final String dateFormat, final String 
timeFormat, final String timestampFormat, final String encoding, final boolean 
trimDoubleQuote) throws IOException {
+        super(logger, schema, hasHeader, ignoreHeader, dateFormat, timeFormat, 
timestampFormat, trimDoubleQuote);
+        this.ignoreHeader = ignoreHeader;
+        this.trimDoubleQuote = trimDoubleQuote;
+        this.csvFormat = csvFormat;
+
+        CsvReader.CsvReaderBuilder builder = CsvReader.builder()
+                .fieldSeparator(csvFormat.getDelimiter())
+                .quoteCharacter(csvFormat.getQuoteCharacter())
+                .commentStrategy(CommentStrategy.SKIP)
+                .skipEmptyRows(csvFormat.getIgnoreEmptyLines())
+                
.errorOnDifferentFieldCount(!csvFormat.getAllowMissingColumnNames());
+
+        if (csvFormat.getCommentMarker() != null) {
+            builder.commentCharacter(csvFormat.getCommentMarker());
+        }
+
+        if (hasHeader && !ignoreHeader) {
+            headerMap = null;
+        } else {
+            headerMap = new HashMap<>();
+            for (int i = 0; i < schema.getFieldCount(); i++) {
+                headerMap.put(schema.getField(i).getFieldName(), i);
+            }
+        }
+
+        csvReader = builder.build(new InputStreamReader(in, encoding));
+        csvRowIterator = csvReader.iterator();
+    }
+
+    @Override
+    public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
+
+        try {
+            final RecordSchema schema = getSchema();
+
+            final List<RecordField> recordFields = getRecordFields();
+            final int numFieldNames = recordFields.size();
+            if (!csvRowIterator.hasNext()) {
+                return null;
+            }
+            final CsvRow csvRecord = csvRowIterator.next();
+            final Map<String, Object> values = new 
LinkedHashMap<>(recordFields.size() * 2);
+            for (int i = 0; i < csvRecord.getFieldCount(); i++) {
+                String rawValue = csvRecord.getField(i);
+                if (csvFormat.getTrim()) {
+                    rawValue = rawValue.trim();
+                }
+                if (trimDoubleQuote) {
+                    rawValue = trim(rawValue);
+                }
+
+                final String rawFieldName;
+                final DataType dataType;
+                if (i >= numFieldNames) {
+                    if (!dropUnknownFields) {
+                        values.put("unknown_field_index_" + i, rawValue);
+                    }
+                    continue;
+                } else {
+                    final RecordField recordField = recordFields.get(i);
+                    rawFieldName = recordField.getFieldName();
+                    dataType = recordField.getDataType();
+                }
+
+                final Object value;
+                if (coerceTypes) {
+                    value = convert(rawValue, dataType, rawFieldName);
+                } else {
+                    // The CSV Reader is going to return all fields as 
Strings, because CSV doesn't have any way to
+                    // dictate a field type. As a result, we will use the 
schema that we have to attempt to convert
+                    // the value into the desired type if it's a simple type.
+                    value = convertSimpleIfPossible(rawValue, dataType, 
rawFieldName);
+                }
+
+                values.putIfAbsent(rawFieldName, value);
+            }
+
+            return new MapRecord(schema, values, coerceTypes, 
dropUnknownFields);
+        } catch (Exception e) {
+            throw new MalformedRecordException("Error while getting next 
record", e);
+        }
+    }
+
+
+    private List<RecordField> getRecordFields() {
+        if (this.recordFields != null) {
+            return this.recordFields;
+        }
+
+        if (ignoreHeader) {
+            logger.debug("With 'Ignore Header' set to true, FastCSV still 
reads the header and keeps track "
+                    + "of the number of fields in the header. This will cause 
an error if the provided schema does not "
+                    + "have the same number of fields, as this is not 
conformant to RFC-4180");
+        }
+
+        // When getting the field names from the first record, it has to be 
read in
+        if (!csvRowIterator.hasNext()) {
+            return Collections.emptyList();
+        }
+        CsvRow headerRow = csvRowIterator.next();
+        headerMap = new HashMap<>();
+        for (int i = 0; i < headerRow.getFieldCount(); i++) {
+            String rawValue = headerRow.getField(i);
+            if (csvFormat.getTrim()) {
+                rawValue = rawValue.trim();
+            }
+            if (this.trimDoubleQuote) {
+                rawValue = trim(rawValue);
+            }
+            headerMap.put(rawValue, i);
+        }
+
+
+        // Use a SortedMap keyed by index of the field so that we can get a 
List of field names in the correct order
+        final SortedMap<Integer, String> sortedMap = new TreeMap<>();
+        for (final Map.Entry<String, Integer> entry : headerMap.entrySet()) {
+            sortedMap.put(entry.getValue(), entry.getKey());
+        }
+
+        final List<RecordField> fields = new ArrayList<>();
+        final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
+        for (final String rawFieldName : rawFieldNames) {
+            final Optional<RecordField> option = schema.getField(rawFieldName);
+            if (option.isPresent()) {
+                fields.add(option.get());
+            } else {
+                fields.add(new RecordField(rawFieldName, 
RecordFieldType.STRING.getDataType()));
+            }
+        }
+
+        this.recordFields = fields;
+        return fields;
+    }
+
+    @Override
+    public void close() throws IOException {
+        csvReader.close();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java
new file mode 100644
index 0000000000..929113eaf8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestFastCSVRecordReader.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.text.StringEscapeUtils;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestFastCSVRecordReader {
+    private final DataType doubleDataType = 
RecordFieldType.DOUBLE.getDataType();
+    private CSVFormat format;
+
+    @BeforeEach
+    public void setUp() {
+        format = CSVFormat.RFC4180;
+    }
+
+    private List<RecordField> getDefaultFields() {
+        final List<RecordField> fields = new ArrayList<>();
+        for (final String fieldName : new String[]{"id", "name", "balance", 
"address", "city", "state", "zipCode", "country"}) {
+            fields.add(new RecordField(fieldName, 
RecordFieldType.STRING.getDataType()));
+        }
+        return fields;
+    }
+
+    private FastCSVRecordReader createReader(final InputStream in, final 
RecordSchema schema, CSVFormat format) throws IOException {
+        return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), 
schema, format, true, false,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", false);
+    }
+
+    private FastCSVRecordReader createReader(final InputStream in, final 
RecordSchema schema, CSVFormat format, final boolean trimDoubleQuote) throws 
IOException {
+        return new FastCSVRecordReader(in, Mockito.mock(ComponentLog.class), 
schema, format, true, false,
+                RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "ASCII", trimDoubleQuote);
+    }
+
+    @Test
+    public void testUTF8() throws IOException, MalformedRecordException {
+        final String text = "name\n黃凱揚";
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes(StandardCharsets.UTF_8));
+             final FastCSVRecordReader reader = new FastCSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) {
+
+            final Record record = reader.nextRecord();
+            final String name = (String) record.getValue("name");
+
+            assertEquals("黃凱揚", name);
+        }
+    }
+
+    @Test
+    public void testISO8859() throws IOException, MalformedRecordException {
+        final String text = "name\nÄËÖÜ";
+        final byte[] bytesUTF = text.getBytes(StandardCharsets.UTF_8);
+        final byte[] bytes8859 = text.getBytes(StandardCharsets.ISO_8859_1);
+        assertEquals(13, bytesUTF.length, "expected size=13 for UTF-8 
representation of test data");
+        assertEquals(9, bytes8859.length, "expected size=9 for ISO-8859-1 
representation of test data");
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("name", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes(StandardCharsets.ISO_8859_1));
+             final FastCSVRecordReader reader = new FastCSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
+                     RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(),
+                     StandardCharsets.ISO_8859_1.name(), true)) {
+
+            final Record record = reader.nextRecord();
+            final String name = (String) record.getValue("name");
+
+            assertEquals("ÄËÖÜ", name);
+        }
+    }
+
+    @Test
+    public void testDate() throws IOException, MalformedRecordException {
+        final String dateValue = "1983-11-30";
+        final String text = "date\n11/30/1983";
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("date", 
RecordFieldType.DATE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream bais = new 
ByteArrayInputStream(text.getBytes());
+             final FastCSVRecordReader reader = new FastCSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, false,
+                     "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) {
+
+            final Record record = reader.nextRecord();
+            final Object date = record.getValue("date");
+            assertEquals(java.sql.Date.valueOf(dateValue), date);
+        }
+    }
+
+    @Test
+    public void testSimpleParse() throws IOException, MalformedRecordException 
{
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/csv/single-bank-account_RFC4180.csv");
+             final FastCSVRecordReader reader = createReader(fis, schema, 
format)) {
+
+            final Object[] record = reader.nextRecord().getValues();
+            final Object[] expectedValues = new Object[]{"1", "John Doe", 
4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+            assertArrayEquals(expectedValues, record);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testExcelFormat() throws IOException, MalformedRecordException 
{
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("fieldA", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("fieldB", 
RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "fieldA,fieldB";
+        final String inputRecord = "valueA,valueB";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
CSVFormat.EXCEL)) {
+
+            final Object[] record = reader.nextRecord().getValues();
+            final Object[] expectedValues = new Object[]{"valueA", "valueB"};
+            assertArrayEquals(expectedValues, record);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testMultipleRecords() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/csv/multi-bank-account_RFC4180.csv");
+             final FastCSVRecordReader reader = createReader(fis, schema, 
format)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 
4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+            assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[]{"2", "Jane 
Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+            assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testMissingField() throws IOException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, country";
+        final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", 
My City, MS, 11111";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
format)) {
+
+            // RFC-4180 does not allow missing column names
+            assertThrows(MalformedRecordException.class, reader::nextRecord);
+        }
+    }
+
+    @Test
+    public void testMissingField_withoutDoubleQuoteTrimming() throws 
IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, country";
+        final String inputRecord = "1, John, 40.80, \"123 My Street\", My 
City, MS, 11111";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
CSVFormat.RFC4180.withTrim().withAllowMissingColumnNames(), false)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals(40.8D, record.getValue("balance"));
+            assertEquals("\"123 My Street\"", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testReadRawWithDifferentFieldName() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = 
"id,name,balance,address,city,state,zipCode,continent";
+        final String inputRecord = "1,John,40.80,\"123 My Street\",My 
City,MS,11111,North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
format)) {
+
+            final Record record = reader.nextRecord(true, true);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertNull(record.getValue("continent"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // test nextRawRecord does contain 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
format)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertEquals("North America", record.getValue("continent"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
+
+    @Test
+    public void testReadRawWithDifferentFieldName_withoutDoubleQuoteTrimming() 
throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, continent";
+        final String inputRecord = "1, John, 40.80, \"123 My Street\", My 
City, MS, 11111, North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
CSVFormat.RFC4180.withTrim(), false)) {
+
+            final Record record = reader.nextRecord(true, true);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("\"123 My Street\"", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertNull(record.getValue("continent"));
+
+            assertNull(reader.nextRecord());
+        }
+
+        // test nextRawRecord does contain 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
CSVFormat.RFC4180.withTrim(), false)) {
+
+            final Record record = reader.nextRecord(false, false);
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("\"123 My Street\"", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+            assertNull(record.getValue("country"));
+            assertEquals("North America", record.getValue("continent"));
+
+            assertNull(reader.nextRecord(false, false));
+        }
+    }
+
+
+    @Test
+    public void testFieldInSchemaButNotHeader() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode";
+        final String inputRecord = "1, John, 40.80, \"\"123 My Street\"\", My 
City, MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // Use a CUSTOM format as FastCSV sometimes handles non-compliant data 
such as trimming spaces from values
+        format = CSVFormat.RFC4180.builder()
+                .setTrim(true)
+                .setIgnoreSurroundingSpaces(true)
+                .setAllowMissingColumnNames(true)
+                .build();
+
+        // Create another Record Reader that indicates that the header line is 
present but should be ignored. This should cause
+        // our schema to be the definitive list of what fields exist.
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = new FastCSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, format, true, true,
+                     RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", true)) {
+
+            final Record record = reader.nextRecord();
+            assertNotNull(record);
+
+            assertEquals("1", record.getValue("id"));
+            assertEquals("John", record.getValue("name"));
+            assertEquals("40.80", record.getValue("balance"));
+            assertEquals("123 My Street", record.getValue("address"));
+            assertEquals("My City", record.getValue("city"));
+            assertEquals("MS", record.getValue("state"));
+            assertEquals("11111", record.getValue("zipCode"));
+
+            // If schema says that there are fields a, b, c
+            // and the CSV has a header line that says field names are a, b
+            // and then the data has values 1,2,3
+            // then a=1, b=2, c=null
+            assertNull(record.getValue("country"));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testFieldInSchemaButNotHeader_withoutDoubleQuoteTrimming() 
throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode";
+        final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", 
My City, MS, 11111, USA";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
CSVFormat.RFC4180.withTrim(), false)) {
+
+            try {
+                final Record record = reader.nextRecord();
+                fail("Should have thrown MalformedRecordException");
+            } catch (MalformedRecordException mre) {
+                // Expected behavior
+            }
+        }
+
+        // Create another Record Reader that indicates that the header line is 
present but should be ignored. This should cause
+        // our schema to be the definitive list of what fields exist.
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = new FastCSVRecordReader(bais, 
Mockito.mock(ComponentLog.class), schema, CSVFormat.RFC4180.withTrim(), true, 
true,
+                     RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8", false)) {
+
+            // RFC-4180 does not allow missing column names
+            assertThrows(MalformedRecordException.class, reader::nextRecord);
+        }
+    }
+
+    @Test
+    public void testExtraFieldNotInHeader() throws IOException, 
MalformedRecordException {
+        final List<RecordField> fields = getDefaultFields();
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String headerLine = "id, name, balance, address, city, state, 
zipCode, country";
+        final String inputRecord = "1, John, 40.80, \"\"\"123 My Street\"\"\", 
My City, MS, 11111, USA, North America";
+        final String csvData = headerLine + "\n" + inputRecord;
+        final byte[] inputData = csvData.getBytes();
+
+        // test nextRecord does not contain a 'continent' field
+        try (final InputStream bais = new ByteArrayInputStream(inputData);
+             final FastCSVRecordReader reader = createReader(bais, schema, 
format)) {
+
+            // RFC-4180 does not allow missing column names
+            assertThrows(MalformedRecordException.class, () -> 
reader.nextRecord(false, false));
+        }
+    }
+
+    @Test
+    public void testMultipleRecordsDelimitedWithSpecialChar() throws 
IOException, MalformedRecordException {
+
+        char delimiter = StringEscapeUtils.unescapeJava("\u0001").charAt(0);
+
+        final CSVFormat format = 
CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"').withDelimiter(delimiter);
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/csv/multi-bank-account_spec_delimiter.csv");
+             final FastCSVRecordReader reader = createReader(fis, schema, 
format)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[]{"1", "John Doe", 
4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+            assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[]{"2", "Jane 
Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+            assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    public void testMultipleRecordsEscapedWithNull() throws IOException, 
MalformedRecordException {
+
+        final CSVFormat format = CSVFormat.DEFAULT.builder()
+                .setHeader()
+                .setSkipHeaderRecord(true)
+                .setTrim(true)
+                .setQuote('"')
+                .setDelimiter(",")
+                .setEscape(null)
+                .build();
+        final List<RecordField> fields = getDefaultFields();
+        fields.replaceAll(f -> f.getFieldName().equals("balance") ? new 
RecordField("balance", doubleDataType) : f);
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream fis = new 
FileInputStream("src/test/resources/csv/multi-bank-account_escapechar.csv");
+             final FastCSVRecordReader reader = createReader(fis, schema, 
format, true)) {
+
+            final Object[] firstRecord = reader.nextRecord().getValues();
+            final Object[] firstExpectedValues = new Object[]{"1", "John 
Doe\\", 4750.89D, "123 My Street", "My City", "MS", "11111", "USA"};
+            assertArrayEquals(firstExpectedValues, firstRecord);
+
+            final Object[] secondRecord = reader.nextRecord().getValues();
+            final Object[] secondExpectedValues = new Object[]{"2", "Jane 
Doe", 4820.09D, "321 Your Street", "Your City", "NY", "33333", "USA"};
+            assertArrayEquals(secondExpectedValues, secondRecord);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv
new file mode 100644
index 0000000000..7800751e61
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_RFC4180.csv
@@ -0,0 +1,3 @@
+id,name,balance,address,city,state,zipCode,country
+1,John Doe,"4750.89",123 My Street,My City,MS,11111,USA
+2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv
new file mode 100644
index 0000000000..1e6c6effef
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/multi-bank-account_escapechar_RFC4180.csv
@@ -0,0 +1,3 @@
+id,name,balance,address,city,state,zipCode,country
+1,John Doe\,"4750.89","123 My Street",My City,MS,11111,USA
+2,Jane Doe,4820.09,321 Your Street,Your City,NY,33333,USA
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv
new file mode 100644
index 0000000000..d0489e07e8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/csv/single-bank-account_RFC4180.csv
@@ -0,0 +1,2 @@
+id,name,balance,address,city,state,zipCode,country
+1,John Doe,4750.89,123 My Street,My City,MS,11111,USA
\ No newline at end of file


Reply via email to