http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
index 1848020..f5f0bb1 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
@@ -64,11 +64,14 @@ The following binary components are provided under the 
Apache Software License v
     The following NOTICE information applies:
       Copyright 2011 JSON-SMART authors
 
-   (ASLv2) JsonPath
-     The following NOTICE information applies:
-       Copyright 2011 JsonPath authors
+  (ASLv2) JsonPath
+    The following NOTICE information applies:
+      Copyright 2011 JsonPath authors
 
-  (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+  (ASLv2) Apache Commons CSV
+       The following NOTICE information applies:
+         Apache Commons CSV
+         Copyright 2005-2016 The Apache Software Foundation
 
   (ASLv2) Apache Avro
     The following NOTICE information applies:

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
deleted file mode 100644
index ae3c172..0000000
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/bin/

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 9b2a56c..d86a8c5 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -34,6 +34,10 @@
             <artifactId>nifi-record-serialization-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.jayway.jsonpath</groupId>
             <artifactId>json-path</artifactId>
         </dependency>
@@ -50,9 +54,13 @@
             <artifactId>commons-lang3</artifactId>
         </dependency>
         <dependency>
-            <groupId>net.sf.opencsv</groupId>
-            <artifactId>opencsv</artifactId>
-            <version>2.3</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+            <version>1.4</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
         </dependency>
         <dependency>
             <groupId>io.thekraken</groupId>
@@ -62,6 +70,7 @@
         <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
+            <version>1.8.1</version>
         </dependency>
     </dependencies>
 
@@ -72,6 +81,8 @@
                 <artifactId>apache-rat-plugin</artifactId>
                 <configuration>
                     <excludes combine.children="append">
+                        
<exclude>src/test/resources/avro/datatypes.avsc</exclude>
+                        
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
@@ -80,12 +91,14 @@
                         
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
                         
<exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
                         
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
+                        
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
                         
<exclude>src/test/resources/json/bank-account-array.json</exclude>
                         
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
                         
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
                         
<exclude>src/test/resources/json/single-bank-account.json</exclude>
                         
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
                         
<exclude>src/test/resources/json/single-element-nested.json</exclude>
+                        
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index fc0c598..f92816f 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -23,18 +23,27 @@ import java.io.InputStream;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", 
"separated", "values"})
-@CapabilityDescription("Parses Avro data and returns each Avro record as an 
separate record.")
+@CapabilityDescription("Parses Avro data and returns each Avro record as an 
separate Record object. The Avro data must contain "
+    + "the schema itself.")
 public class AvroReader extends AbstractControllerService implements 
RowRecordReaderFactory {
 
     @Override
-    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws MalformedRecordException, IOException {
+    public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws MalformedRecordException, 
IOException {
         return new AvroRecordReader(in);
     }
 
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile) throws 
MalformedRecordException, IOException {
+        // TODO: Need to support retrieving schema from registry instead of 
requiring that it be in the Avro file.
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index e98a5ad..d725cbf 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -24,11 +24,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Array;
@@ -44,8 +45,8 @@ import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
 public class AvroRecordReader implements RecordReader {
     private final InputStream in;
@@ -53,6 +54,7 @@ public class AvroRecordReader implements RecordReader {
     private final DataFileStream<GenericRecord> dataFileStream;
     private RecordSchema recordSchema;
 
+
     public AvroRecordReader(final InputStream in) throws IOException, 
MalformedRecordException {
         this.in = in;
 
@@ -79,65 +81,78 @@ public class AvroRecordReader implements RecordReader {
         }
 
         final RecordSchema schema = getSchema();
-        final Map<String, Object> values = convertRecordToObjectArray(record, 
schema);
+        final Map<String, Object> values = convertAvroRecordToMap(record, 
schema);
         return new MapRecord(schema, values);
     }
 
 
-    private Map<String, Object> convertRecordToObjectArray(final GenericRecord 
record, final RecordSchema schema) {
-        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
+    private Map<String, Object> convertAvroRecordToMap(final GenericRecord 
avroRecord, final RecordSchema recordSchema) {
+        final Map<String, Object> values = new 
HashMap<>(recordSchema.getFieldCount());
 
-        for (final String fieldName : schema.getFieldNames()) {
-            final Object value = record.get(fieldName);
+        for (final String fieldName : recordSchema.getFieldNames()) {
+            final Object value = avroRecord.get(fieldName);
 
-            final Field avroField = record.getSchema().getField(fieldName);
+            final Field avroField = avroRecord.getSchema().getField(fieldName);
             if (avroField == null) {
                 values.put(fieldName, null);
                 continue;
             }
 
             final Schema fieldSchema = avroField.schema();
-            final DataType dataType = 
schema.getDataType(fieldName).orElse(null);
-            final Object converted = convertValue(value, fieldSchema, 
avroField.name(), dataType);
-            values.put(fieldName, converted);
-        }
+            final Object rawValue = normalizeValue(value, fieldSchema);
 
-        return values;
-    }
-
-
-    @Override
-    public RecordSchema getSchema() throws MalformedRecordException {
-        if (recordSchema != null) {
-            return recordSchema;
-        }
-
-        recordSchema = createSchema(avroSchema);
-        return recordSchema;
-    }
+            final DataType desiredType = 
recordSchema.getDataType(fieldName).get();
+            final Object coercedValue = DataTypeUtils.convertType(rawValue, 
desiredType);
 
-    private RecordSchema createSchema(final Schema avroSchema) {
-        final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
-        for (final Field field : avroSchema.getFields()) {
-            final String fieldName = field.name();
-            final DataType dataType = determineDataType(field.schema());
-            recordFields.add(new RecordField(fieldName, dataType));
+            values.put(fieldName, coercedValue);
         }
 
-        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
-        return recordSchema;
+        return values;
     }
 
-    private Object convertValue(final Object value, final Schema avroSchema, 
final String fieldName, final DataType desiredType) {
+    private Object normalizeValue(final Object value, final Schema avroSchema) 
{
         if (value == null) {
             return null;
         }
 
         switch (avroSchema.getType()) {
+            case INT: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return value;
+                }
+
+                final String logicalName = logicalType.getName();
+                if (LogicalTypes.date().getName().equals(logicalName)) {
+                    // date logical name means that the value is number of 
days since Jan 1, 1970
+                    return new java.sql.Date(TimeUnit.DAYS.toMillis((int) 
value));
+                } else if (LogicalTypes.timeMillis().equals(logicalName)) {
+                    // time-millis logical name means that the value is number 
of milliseconds since midnight.
+                    return new java.sql.Time((int) value);
+                }
+
+                break;
+            }
+            case LONG: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return value;
+                }
+
+                final String logicalName = logicalType.getName();
+                if (LogicalTypes.timeMicros().getName().equals(logicalName)) {
+                    return new 
java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
+                } else if 
(LogicalTypes.timestampMillis().getName().equals(logicalName)) {
+                    return new java.sql.Timestamp((long) value);
+                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalName)) {
+                    return new 
java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
+                }
+                break;
+            }
             case UNION:
                 if (value instanceof GenericData.Record) {
-                    final GenericData.Record record = (GenericData.Record) 
value;
-                    return convertValue(value, record.getSchema(), fieldName, 
desiredType);
+                    final GenericData.Record avroRecord = (GenericData.Record) 
value;
+                    return normalizeValue(value, avroRecord.getSchema());
                 }
                 break;
             case RECORD:
@@ -146,19 +161,18 @@ public class AvroRecordReader implements RecordReader {
                 final List<Field> recordFields = recordSchema.getFields();
                 final Map<String, Object> values = new 
HashMap<>(recordFields.size());
                 for (final Field field : recordFields) {
-                    final DataType desiredFieldType = 
determineDataType(field.schema());
                     final Object avroFieldValue = record.get(field.name());
-                    final Object fieldValue = convertValue(avroFieldValue, 
field.schema(), field.name(), desiredFieldType);
+                    final Object fieldValue = normalizeValue(avroFieldValue, 
field.schema());
                     values.put(field.name(), fieldValue);
                 }
-                final RecordSchema childSchema = createSchema(recordSchema);
+                final RecordSchema childSchema = 
AvroTypeUtil.createSchema(recordSchema);
                 return new MapRecord(childSchema, values);
             case BYTES:
                 final ByteBuffer bb = (ByteBuffer) value;
-                return bb.array();
+                return AvroTypeUtil.convertByteArray(bb.array());
             case FIXED:
                 final GenericFixed fixed = (GenericFixed) value;
-                return fixed.bytes();
+                return AvroTypeUtil.convertByteArray(fixed.bytes());
             case ENUM:
                 return value.toString();
             case NULL:
@@ -170,7 +184,7 @@ public class AvroRecordReader implements RecordReader {
                 final Object[] valueArray = new Object[array.size()];
                 for (int i = 0; i < array.size(); i++) {
                     final Schema elementSchema = avroSchema.getElementType();
-                    valueArray[i] = convertValue(array.get(i), elementSchema, 
fieldName, determineDataType(elementSchema));
+                    valueArray[i] = normalizeValue(array.get(i), 
elementSchema);
                 }
                 return valueArray;
             case MAP:
@@ -182,73 +196,32 @@ public class AvroRecordReader implements RecordReader {
                         obj = obj.toString();
                     }
 
-                    map.put(entry.getKey().toString(), obj);
+                    final String key = entry.getKey().toString();
+                    obj = normalizeValue(obj, avroSchema.getValueType());
+
+                    map.put(key, obj);
+                }
+
+                final DataType elementType = 
AvroTypeUtil.determineDataType(avroSchema.getValueType());
+                final List<RecordField> mapFields = new ArrayList<>();
+                for (final String key : map.keySet()) {
+                    mapFields.add(new RecordField(key, elementType));
                 }
-                return map;
+                final RecordSchema mapSchema = new 
SimpleRecordSchema(mapFields);
+                return new MapRecord(mapSchema, map);
         }
 
         return value;
     }
 
 
-    private DataType determineDataType(final Schema avroSchema) {
-        final Type avroType = avroSchema.getType();
-
-        switch (avroType) {
-            case ARRAY:
-            case BYTES:
-            case FIXED:
-                return RecordFieldType.ARRAY.getDataType();
-            case BOOLEAN:
-                return RecordFieldType.BOOLEAN.getDataType();
-            case DOUBLE:
-                return RecordFieldType.DOUBLE.getDataType();
-            case ENUM:
-            case STRING:
-                return RecordFieldType.STRING.getDataType();
-            case FLOAT:
-                return RecordFieldType.FLOAT.getDataType();
-            case INT:
-                return RecordFieldType.INT.getDataType();
-            case LONG:
-                return RecordFieldType.LONG.getDataType();
-            case RECORD: {
-                final List<Field> avroFields = avroSchema.getFields();
-                final List<RecordField> recordFields = new 
ArrayList<>(avroFields.size());
-
-                for (final Field field : avroFields) {
-                    final String fieldName = field.name();
-                    final Schema fieldSchema = field.schema();
-                    final DataType fieldType = determineDataType(fieldSchema);
-                    recordFields.add(new RecordField(fieldName, fieldType));
-                }
-
-                final RecordSchema recordSchema = new 
SimpleRecordSchema(recordFields);
-                return RecordFieldType.RECORD.getDataType(recordSchema);
-            }
-            case NULL:
-            case MAP:
-                return RecordFieldType.RECORD.getDataType();
-            case UNION: {
-                final List<Schema> nonNullSubSchemas = 
avroSchema.getTypes().stream()
-                    .filter(s -> s.getType() != Type.NULL)
-                    .collect(Collectors.toList());
-
-                if (nonNullSubSchemas.size() == 1) {
-                    return determineDataType(nonNullSubSchemas.get(0));
-                }
-
-                final List<DataType> possibleChildTypes = new 
ArrayList<>(nonNullSubSchemas.size());
-                for (final Schema subSchema : nonNullSubSchemas) {
-                    final DataType childDataType = 
determineDataType(subSchema);
-                    possibleChildTypes.add(childDataType);
-                }
-
-                return RecordFieldType.CHOICE.getDataType(possibleChildTypes);
-            }
+    @Override
+    public RecordSchema getSchema() throws MalformedRecordException {
+        if (recordSchema != null) {
+            return recordSchema;
         }
 
-        return null;
+        recordSchema = AvroTypeUtil.createSchema(avroSchema);
+        return recordSchema;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index d56c716..03d766c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -25,18 +25,15 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
-@Tags({"avro", "result", "set", "writer", "serializer", "record", "row"})
-@CapabilityDescription("Writes the contents of a Database ResultSet in Binary 
Avro format. The data types in the Result Set must match those "
-    + "specified by the Avro Schema. No type coercion will occur, with the 
exception of Date, Time, and Timestamps fields because Avro does not provide "
-    + "support for these types specifically. As a result, they will be 
converted to String fields using the configured formats. In addition, the label"
-    + "of the column must be a valid Avro field name.")
-public class AvroRecordSetWriter extends AbstractRecordSetWriter implements 
RecordSetWriterFactory {
+@Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", 
"row"})
+@CapabilityDescription("Writes the contents of a RecordSet in Binary Avro 
format.")
+public class AvroRecordSetWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
     static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
         .name("Avro Schema")
         .description("The Avro Schema to use when writing out the Result Set")
@@ -49,7 +46,7 @@ public class AvroRecordSetWriter extends 
AbstractRecordSetWriter implements Reco
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(SCHEMA);
         return properties;
     }
@@ -61,7 +58,7 @@ public class AvroRecordSetWriter extends 
AbstractRecordSetWriter implements Reco
 
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger) {
-        return new WriteAvroResult(schema, getDateFormat(), getTimeFormat(), 
getTimestampFormat());
+        return new WriteAvroResult(schema);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
new file mode 100644
index 0000000..1810c83
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+
+public class AvroTypeUtil {
+
+    public static DataType determineDataType(final Schema avroSchema) {
+        final Type avroType = avroSchema.getType();
+
+        switch (avroType) {
+            case BYTES:
+            case FIXED:
+                return 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
+            case ARRAY:
+                final DataType elementType = 
determineDataType(avroSchema.getElementType());
+                return RecordFieldType.ARRAY.getArrayDataType(elementType);
+            case BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case ENUM:
+            case STRING:
+                return RecordFieldType.STRING.getDataType();
+            case FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case INT: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return RecordFieldType.INT.getDataType();
+                }
+
+                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.DATE.getDataType();
+                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIME.getDataType();
+                }
+
+                return RecordFieldType.INT.getDataType();
+            }
+            case LONG: {
+                final LogicalType logicalType = avroSchema.getLogicalType();
+                if (logicalType == null) {
+                    return RecordFieldType.LONG.getDataType();
+                }
+
+                if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIMESTAMP.getDataType();
+                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIMESTAMP.getDataType();
+                } else if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
+                    return RecordFieldType.TIME.getDataType();
+                }
+
+                return RecordFieldType.LONG.getDataType();
+            }
+            case RECORD: {
+                final List<Field> avroFields = avroSchema.getFields();
+                final List<RecordField> recordFields = new 
ArrayList<>(avroFields.size());
+
+                for (final Field field : avroFields) {
+                    final String fieldName = field.name();
+                    final Schema fieldSchema = field.schema();
+                    final DataType fieldType = determineDataType(fieldSchema);
+                    recordFields.add(new RecordField(fieldName, fieldType));
+                }
+
+                final RecordSchema recordSchema = new 
SimpleRecordSchema(recordFields);
+                return RecordFieldType.RECORD.getRecordDataType(recordSchema);
+            }
+            case NULL:
+            case MAP:
+                return RecordFieldType.RECORD.getDataType();
+            case UNION: {
+                final List<Schema> nonNullSubSchemas = 
avroSchema.getTypes().stream()
+                    .filter(s -> s.getType() != Type.NULL)
+                    .collect(Collectors.toList());
+
+                if (nonNullSubSchemas.size() == 1) {
+                    return determineDataType(nonNullSubSchemas.get(0));
+                }
+
+                final List<DataType> possibleChildTypes = new 
ArrayList<>(nonNullSubSchemas.size());
+                for (final Schema subSchema : nonNullSubSchemas) {
+                    final DataType childDataType = 
determineDataType(subSchema);
+                    possibleChildTypes.add(childDataType);
+                }
+
+                return 
RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
+            }
+        }
+
+        return null;
+    }
+
+    public static RecordSchema createSchema(final Schema avroSchema) {
+        final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
+        for (final Field field : avroSchema.getFields()) {
+            final String fieldName = field.name();
+            final DataType dataType = 
AvroTypeUtil.determineDataType(field.schema());
+            recordFields.add(new RecordField(fieldName, dataType));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+        return recordSchema;
+    }
+
+    public static Object[] convertByteArray(final byte[] bytes) {
+        final Object[] array = new Object[bytes.length];
+        for (int i = 0; i < bytes.length; i++) {
+            array[i] = Byte.valueOf(bytes[i]);
+        }
+        return array;
+    }
+
+    public static ByteBuffer convertByteArray(final Object[] bytes) {
+        final ByteBuffer bb = ByteBuffer.allocate(bytes.length);
+        for (final Object o : bytes) {
+            if (o instanceof Byte) {
+                bb.put(((Byte) o).byteValue());
+            } else {
+                throw new IllegalTypeConversionException("Cannot convert value 
" + bytes + " of type " + bytes.getClass() + " to ByteBuffer");
+            }
+        }
+        bb.flip();
+        return bb;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
index d75d86d..b512b82 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
@@ -18,47 +18,41 @@
 package org.apache.nifi.avro;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.sql.Blob;
-import java.sql.Clob;
-import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.EnumSymbol;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 
 public class WriteAvroResult implements RecordSetWriter {
     private final Schema schema;
-    private final DateFormat dateFormat;
-    private final DateFormat timeFormat;
-    private final DateFormat timestampFormat;
 
-    public WriteAvroResult(final Schema schema, final String dateFormat, final 
String timeFormat, final String timestampFormat) {
+    public WriteAvroResult(final Schema schema) {
         this.schema = schema;
-        this.dateFormat = new SimpleDateFormat(dateFormat);
-        this.timeFormat = new SimpleDateFormat(timeFormat);
-        this.timestampFormat = new SimpleDateFormat(timestampFormat);
     }
 
     @Override
@@ -68,34 +62,13 @@ public class WriteAvroResult implements RecordSetWriter {
             return WriteResult.of(0, Collections.emptyMap());
         }
 
-        final GenericRecord rec = new GenericData.Record(schema);
-
         int nrOfRows = 0;
         final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
         try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
             dataFileWriter.create(schema, outStream);
 
-            final RecordSchema recordSchema = rs.getSchema();
-
             do {
-                for (final String fieldName : recordSchema.getFieldNames()) {
-                    final Object value = record.getValue(fieldName);
-
-                    final Field field = schema.getField(fieldName);
-                    if (field == null) {
-                        continue;
-                    }
-
-                    final Object converted;
-                    try {
-                        converted = convert(value, field.schema(), fieldName);
-                    } catch (final SQLException e) {
-                        throw new IOException("Failed to write records to 
stream", e);
-                    }
-
-                    rec.put(fieldName, converted);
-                }
-
+                final GenericRecord rec = createAvroRecord(record, schema);
                 dataFileWriter.append(rec);
                 nrOfRows++;
             } while ((record = rs.next()) != null);
@@ -104,169 +77,149 @@ public class WriteAvroResult implements RecordSetWriter {
         return WriteResult.of(nrOfRows, Collections.emptyMap());
     }
 
-    @Override
-    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
-        final GenericRecord rec = new GenericData.Record(schema);
+    private GenericRecord createAvroRecord(final Record record, final Schema 
avroSchema) throws IOException {
+        final GenericRecord rec = new GenericData.Record(avroSchema);
+        final RecordSchema recordSchema = record.getSchema();
 
-        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
-        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
-            dataFileWriter.create(schema, out);
-            final RecordSchema recordSchema = record.getSchema();
-
-            for (final String fieldName : recordSchema.getFieldNames()) {
-                final Object value = record.getValue(fieldName);
-
-                final Field field = schema.getField(fieldName);
-                if (field == null) {
-                    continue;
-                }
+        for (final String fieldName : recordSchema.getFieldNames()) {
+            final Object rawValue = record.getValue(fieldName);
 
-                final Object converted;
-                try {
-                    converted = convert(value, field.schema(), fieldName);
-                } catch (final SQLException e) {
-                    throw new IOException("Failed to write records to stream", 
e);
-                }
-
-                rec.put(fieldName, converted);
+            final Field field = avroSchema.getField(fieldName);
+            if (field == null) {
+                continue;
             }
 
-            dataFileWriter.append(rec);
+            final Object converted = convertToAvroObject(rawValue, 
field.schema());
+            rec.put(fieldName, converted);
         }
 
-        return WriteResult.of(1, Collections.emptyMap());
+        return rec;
     }
 
-
-    private Object convert(final Object value, final Schema schema, final 
String fieldName) throws SQLException, IOException {
-        if (value == null) {
+    private Object convertToAvroObject(final Object rawValue, final Schema 
fieldSchema) throws IOException {
+        if (rawValue == null) {
             return null;
         }
 
-        // Need to handle CLOB and BLOB before getObject() is called, due to 
ResultSet's maximum portability statement
-        if (value instanceof Clob) {
-            final Clob clob = (Clob) value;
-
-            long numChars = clob.length();
-            char[] buffer = new char[(int) numChars];
-            InputStream is = clob.getAsciiStream();
-            int index = 0;
-            int c = is.read();
-            while (c > 0) {
-                buffer[index++] = (char) c;
-                c = is.read();
-            }
-
-            clob.free();
-            return new String(buffer);
-        }
+        switch (fieldSchema.getType()) {
+            case INT: {
+                final LogicalType logicalType = fieldSchema.getLogicalType();
+                if (logicalType == null) {
+                    return DataTypeUtils.toInteger(rawValue);
+                }
 
-        if (value instanceof Blob) {
-            final Blob blob = (Blob) value;
+                if 
(LogicalTypes.date().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue);
+                    final Date date = new Date(longValue);
+                    final Duration duration = Duration.between(new 
Date(0L).toInstant(), date.toInstant());
+                    final long days = duration.toDays();
+                    return (int) days;
+                } else if 
(LogicalTypes.timeMillis().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue);
+                    final Date date = new Date(longValue);
+                    final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
+                    final long millisSinceMidnight = duration.toMillis();
+                    return (int) millisSinceMidnight;
+                }
 
-            final long numChars = blob.length();
-            final byte[] buffer = new byte[(int) numChars];
-            final InputStream is = blob.getBinaryStream();
-            int index = 0;
-            int c = is.read();
-            while (c > 0) {
-                buffer[index++] = (byte) c;
-                c = is.read();
+                return DataTypeUtils.toInteger(rawValue);
             }
+            case LONG: {
+                final LogicalType logicalType = fieldSchema.getLogicalType();
+                if (logicalType == null) {
+                    return DataTypeUtils.toLong(rawValue);
+                }
 
-            final ByteBuffer bb = ByteBuffer.wrap(buffer);
-            blob.free();
-            return bb;
-        }
+                if 
(LogicalTypes.timeMicros().getName().equals(logicalType.getName())) {
+                    final long longValue = DataTypeUtils.toLong(rawValue);
+                    final Date date = new Date(longValue);
+                    final Duration duration = 
Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), 
date.toInstant());
+                    return duration.toMillis() * 1000L;
+                } else if 
(LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) {
+                    return DataTypeUtils.toLong(rawValue);
+                } else if 
(LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) {
+                    return DataTypeUtils.toLong(rawValue) * 1000L;
+                }
 
-        if (value instanceof byte[]) {
-            // bytes requires little bit different handling
-            return ByteBuffer.wrap((byte[]) value);
-        } else if (value instanceof Byte) {
-            // tinyint(1) type is returned by JDBC driver as 
java.sql.Types.TINYINT
-            // But value is returned by JDBC as java.lang.Byte
-            // (at least H2 JDBC works this way)
-            // direct put to avro record results:
-            // org.apache.avro.AvroRuntimeException: Unknown datum type 
java.lang.Byte
-            return ((Byte) value).intValue();
-        } else if (value instanceof Short) {
-            //MS SQL returns TINYINT as a Java Short, which Avro doesn't 
understand.
-            return ((Short) value).intValue();
-        } else if (value instanceof BigDecimal) {
-            // Avro can't handle BigDecimal as a number - it will throw an 
AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
-            return value.toString();
-        } else if (value instanceof BigInteger) {
-            // Check the precision of the BIGINT. Some databases allow 
arbitrary precision (> 19), but Avro won't handle that.
-            // It the SQL type is BIGINT and the precision is between 0 and 19 
(inclusive); if so, the BigInteger is likely a
-            // long (and the schema says it will be), so try to get its value 
as a long.
-            // Otherwise, Avro can't handle BigInteger as a number - it will 
throw an AvroRuntimeException
-            // such as: "Unknown datum type: java.math.BigInteger: 38". In 
this case the schema is expecting a string.
-            final BigInteger bigInt = (BigInteger) value;
-            if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
-                return value.toString();
-            } else {
-                return bigInt.longValue();
+                return DataTypeUtils.toLong(rawValue);
             }
-        } else if (value instanceof Boolean) {
-            return value;
-        } else if (value instanceof Map) {
-            // TODO: Revisit how we handle a lot of these cases....
-            switch (schema.getType()) {
-                case MAP:
-                    return value;
-                case RECORD:
-                    final GenericData.Record avroRecord = new 
GenericData.Record(schema);
+            case BYTES:
+            case FIXED:
+                if (rawValue instanceof byte[]) {
+                    return ByteBuffer.wrap((byte[]) rawValue);
+                }
+                if (rawValue instanceof Object[]) {
+                    return AvroTypeUtil.convertByteArray((Object[]) rawValue);
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
+                }
+            case MAP:
+                if (rawValue instanceof Record) {
+                    final Record recordValue = (Record) rawValue;
+                    final Map<String, Object> map = new HashMap<>();
+                    for (final String recordFieldName : 
recordValue.getSchema().getFieldNames()) {
+                        final Object v = recordValue.getValue(recordFieldName);
+                        if (v != null) {
+                            map.put(recordFieldName, v);
+                        }
+                    }
 
-                    final Record record = (Record) value;
-                    for (final String recordFieldName : 
record.getSchema().getFieldNames()) {
-                        final Object recordFieldValue = 
record.getValue(recordFieldName);
+                    return map;
+                } else {
+                    throw new IllegalTypeConversionException("Cannot convert 
value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
+                }
+            case RECORD:
+                final GenericData.Record avroRecord = new 
GenericData.Record(fieldSchema);
 
-                        final Field field = schema.getField(recordFieldName);
-                        if (field == null) {
-                            continue;
-                        }
+                final Record record = (Record) rawValue;
+                for (final String recordFieldName : 
record.getSchema().getFieldNames()) {
+                    final Object recordFieldValue = 
record.getValue(recordFieldName);
 
-                        final Object converted = convert(recordFieldValue, 
field.schema(), recordFieldName);
-                        avroRecord.put(recordFieldName, converted);
+                    final Field field = fieldSchema.getField(recordFieldName);
+                    if (field == null) {
+                        continue;
                     }
-                    return avroRecord;
-            }
 
-            return value.toString();
+                    final Object converted = 
convertToAvroObject(recordFieldValue, field.schema());
+                    avroRecord.put(recordFieldName, converted);
+                }
+                return avroRecord;
+            case ARRAY:
+                final Object[] objectArray = (Object[]) rawValue;
+                final List<Object> list = new ArrayList<>(objectArray.length);
+                for (final Object o : objectArray) {
+                    final Object converted = convertToAvroObject(o, 
fieldSchema.getElementType());
+                    list.add(converted);
+                }
+                return list;
+            case BOOLEAN:
+                return DataTypeUtils.toBoolean(rawValue);
+            case DOUBLE:
+                return DataTypeUtils.toDouble(rawValue);
+            case FLOAT:
+                return DataTypeUtils.toFloat(rawValue);
+            case NULL:
+                return null;
+            case ENUM:
+                return new EnumSymbol(fieldSchema, rawValue);
+            case STRING:
+                return DataTypeUtils.toString(rawValue, 
RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat());
+        }
+
+        return rawValue;
+    }
 
-        } else if (value instanceof List) {
-            return value;
-        } else if (value instanceof Object[]) {
-            final List<Object> list = new ArrayList<>();
-            for (final Object o : ((Object[]) value)) {
-                final Object converted = convert(o, schema.getElementType(), 
fieldName);
-                list.add(converted);
-            }
-            return list;
-        } else if (value instanceof Number) {
-            return value;
-        } else if (value instanceof java.util.Date) {
-            final java.util.Date date = (java.util.Date) value;
-            return dateFormat.format(date);
-        } else if (value instanceof java.sql.Date) {
-            final java.sql.Date sqlDate = (java.sql.Date) value;
-            final java.util.Date date = new java.util.Date(sqlDate.getTime());
-            return dateFormat.format(date);
-        } else if (value instanceof Time) {
-            final Time time = (Time) value;
-            final java.util.Date date = new java.util.Date(time.getTime());
-            return timeFormat.format(date);
-        } else if (value instanceof Timestamp) {
-            final Timestamp time = (Timestamp) value;
-            final java.util.Date date = new java.util.Date(time.getTime());
-            return timestampFormat.format(date);
+    @Override
+    public WriteResult write(final Record record, final OutputStream out) 
throws IOException {
+        final GenericRecord rec = createAvroRecord(record, schema);
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, out);
+            dataFileWriter.append(rec);
         }
 
-        // The different types that we support are numbers (int, long, double, 
float),
-        // as well as boolean values and Strings. Since Avro doesn't provide
-        // timestamp types, we want to convert those to Strings. So we will 
cast anything other
-        // than numbers or booleans to strings by using the toString() method.
-        return value.toString();
+        return WriteResult.of(1, Collections.emptyMap());
     }
 
 
@@ -275,7 +228,6 @@ public class WriteAvroResult implements RecordSetWriter {
         return "application/avro-binary";
     }
 
-
     public static String normalizeNameForAvro(String inputName) {
         String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
         if (Character.isDigit(normalizedName.charAt(0))) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index eccad7d..6b06ebf 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -19,31 +19,63 @@ package org.apache.nifi.csv;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RowRecordReaderFactory;
-import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", 
"separated", "values"})
 @CapabilityDescription("Parses CSV-formatted data, returning each row in the 
CSV file as a separate record. "
     + "This reader assumes that the first line in the content is the column 
names and all subsequent lines are "
-    + "the values. By default, the reader will assume that all columns are of 
'String' type, but this can be "
-    + "overridden by adding a user-defined Property where the key is the name 
of a column and the value is the "
-    + "type of the column. For example, if a Property has the name \"balance\" 
with a value of float, it the "
-    + "reader will attempt to coerce all values in the \"balance\" column into 
a floating-point number. See "
-    + "Controller Service's Usage for further documentation.")
-@DynamicProperty(name = "<name of column in CSV>", value = "<type of column 
values in CSV>",
-    description = "User-defined properties are used to indicate that the 
values of a specific column should be interpreted as a "
-    + "user-defined data type (e.g., int, double, float, date, etc.)", 
supportsExpressionLanguage = false)
-public class CSVReader extends UserTypeOverrideRowReader implements 
RowRecordReaderFactory {
+    + "the values. See Controller Service's Usage for further documentation.")
+public class CSVReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
+
+    private volatile CSVFormat csvFormat;
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(DateTimeUtils.DATE_FORMAT);
+        properties.add(DateTimeUtils.TIME_FORMAT);
+        properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+        properties.add(CSVUtils.CSV_FORMAT);
+        properties.add(CSVUtils.VALUE_SEPARATOR);
+        properties.add(CSVUtils.QUOTE_CHAR);
+        properties.add(CSVUtils.ESCAPE_CHAR);
+        properties.add(CSVUtils.COMMENT_MARKER);
+        properties.add(CSVUtils.NULL_STRING);
+        properties.add(CSVUtils.TRIM_FIELDS);
+        return properties;
+    }
+
+    @OnEnabled
+    public void storeCsvFormat(final ConfigurationContext context) {
+        this.csvFormat = CSVUtils.createCSVFormat(context);
+        this.dateFormat = 
context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+        this.timeFormat = 
context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+    }
 
     @Override
-    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException {
-        return new CSVRecordReader(in, logger, getFieldTypeOverrides());
+    public RecordReader createRecordReader(final FlowFile flowFile, final 
InputStream in, final ComponentLog logger) throws IOException {
+        final RecordSchema schema = getSchema(flowFile);
+        return new CSVRecordReader(in, logger, schema, csvFormat, dateFormat, 
timeFormat, timestampFormat);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index c2e8963..d02768c 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -17,200 +17,91 @@
 
 package org.apache.nifi.csv;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
+import java.io.Reader;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.io.input.BOMInputStream;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
-import au.com.bytecode.opencsv.CSVReader;
 
 public class CSVRecordReader implements RecordReader {
-    private final ComponentLog logger;
-    private final CSVReader reader;
-    private final String[] firstLine;
-    private final Map<String, DataType> fieldTypeOverrides;
-    private RecordSchema schema;
-
-    public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final Map<String, DataType> fieldTypeOverrides) throws IOException {
-        this.logger = logger;
-        reader = new CSVReader(new InputStreamReader(new 
BufferedInputStream(in)));
-        firstLine = reader.readNext();
-        this.fieldTypeOverrides = fieldTypeOverrides;
+    private final CSVParser csvParser;
+    private final RecordSchema schema;
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
+
+    public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final RecordSchema schema, final CSVFormat csvFormat,
+        final String dateFormat, final String timeFormat, final String 
timestampFormat) throws IOException {
+
+        final Reader reader = new InputStreamReader(new BOMInputStream(in));
+        csvParser = new CSVParser(reader, csvFormat);
+
+        this.schema = schema;
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
     }
 
     @Override
     public Record nextRecord() throws IOException, MalformedRecordException {
         final RecordSchema schema = getSchema();
 
-        while (true) {
-            final String[] line = reader.readNext();
-            if (line == null) {
-                return null;
-            }
-
-            final List<DataType> fieldTypes = schema.getDataTypes();
-            if (fieldTypes.size() != line.length) {
-                logger.warn("Found record with incorrect number of fields. 
Expected {} but found {}; skipping record", new Object[] {fieldTypes.size(), 
line.length});
-                continue;
-            }
-
-            try {
-                final Map<String, Object> rowValues = new 
HashMap<>(schema.getFieldCount());
+        for (final CSVRecord csvRecord : csvParser) {
+            final Map<String, Object> rowValues = new 
HashMap<>(schema.getFieldCount());
 
-                int i = 0;
-                for (final String fieldName : schema.getFieldNames()) {
-                    if (i >= line.length) {
-                        rowValues.put(fieldName, null);
-                        continue;
-                    }
-
-                    final String rawValue = line[i++].trim();
-                    final Object converted = 
convert(schema.getDataType(fieldName).orElse(null), rawValue);
-                    rowValues.put(fieldName, converted);
+            for (final String fieldName : schema.getFieldNames()) {
+                final String rawValue = csvRecord.get(fieldName);
+                if (rawValue == null) {
+                    rowValues.put(fieldName, null);
+                    continue;
                 }
 
-                return new MapRecord(schema, rowValues);
-            } catch (final Exception e) {
-                throw new MalformedRecordException("Found invalid CSV record", 
e);
+                final Object converted = convert(rawValue, 
schema.getDataType(fieldName).orElse(null));
+                rowValues.put(fieldName, converted);
             }
+
+            return new MapRecord(schema, rowValues);
         }
+
+        return null;
     }
 
     @Override
     public RecordSchema getSchema() {
-        if (schema != null) {
-            return schema;
-        }
-
-        final List<RecordField> recordFields = new ArrayList<>();
-        for (final String element : firstLine) {
-
-            final String name = element.trim();
-            final DataType dataType;
-
-            final DataType overriddenDataType = fieldTypeOverrides.get(name);
-            if (overriddenDataType != null) {
-                dataType = overriddenDataType;
-            } else {
-                dataType = RecordFieldType.STRING.getDataType();
-            }
-
-            final RecordField field = new RecordField(name, dataType);
-            recordFields.add(field);
-        }
-
-        if (recordFields.isEmpty()) {
-            recordFields.add(new RecordField("line", 
RecordFieldType.STRING.getDataType()));
-        }
-
-        schema = new SimpleRecordSchema(recordFields);
         return schema;
     }
 
-    protected Object convert(final DataType dataType, final String value) {
-        if (dataType == null) {
+    protected Object convert(final String value, final DataType dataType) {
+        if (dataType == null || value == null) {
             return value;
         }
 
-        switch (dataType.getFieldType()) {
-            case BOOLEAN:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Boolean.parseBoolean(value);
-            case BYTE:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Byte.parseByte(value);
-            case SHORT:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Short.parseShort(value);
-            case INT:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Integer.parseInt(value);
-            case LONG:
-            case BIGINT:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Long.parseLong(value);
-            case FLOAT:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Float.parseFloat(value);
-            case DOUBLE:
-                if (value.length() == 0) {
-                    return null;
-                }
-                return Double.parseDouble(value);
-            case DATE:
-                if (value.length() == 0) {
-                    return null;
-                }
-                try {
-                    final Date date = new 
SimpleDateFormat(dataType.getFormat()).parse(value);
-                    return new java.sql.Date(date.getTime());
-                } catch (final ParseException e) {
-                    logger.warn("Found invalid value for DATE field: " + value 
+ " does not match expected format of "
-                        + dataType.getFormat() + "; will substitute a NULL 
value for this field");
-                    return null;
-                }
-            case TIME:
-                if (value.length() == 0) {
-                    return null;
-                }
-                try {
-                    final Date date = new 
SimpleDateFormat(dataType.getFormat()).parse(value);
-                    return new java.sql.Time(date.getTime());
-                } catch (final ParseException e) {
-                    logger.warn("Found invalid value for TIME field: " + value 
+ " does not match expected format of "
-                        + dataType.getFormat() + "; will substitute a NULL 
value for this field");
-                    return null;
-                }
-            case TIMESTAMP:
-                if (value.length() == 0) {
-                    return null;
-                }
-                try {
-                    final Date date = new 
SimpleDateFormat(dataType.getFormat()).parse(value);
-                    return new java.sql.Timestamp(date.getTime());
-                } catch (final ParseException e) {
-                    logger.warn("Found invalid value for TIMESTAMP field: " + 
value + " does not match expected format of "
-                        + dataType.getFormat() + "; will substitute a NULL 
value for this field");
-                    return null;
-                }
-            case STRING:
-            default:
-                return value;
+        final String trimmed = value.startsWith("\"") && value.endsWith("\"") 
? value.substring(1, value.length() - 1) : value;
+
+        if (trimmed.isEmpty()) {
+            return null;
         }
+
+        return DataTypeUtils.convertType(trimmed, dataType, dateFormat, 
timeFormat, timestampFormat);
     }
 
     @Override
     public void close() throws IOException {
-        reader.close();
+        csvParser.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 906e9c4..6a7b758 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -17,21 +17,51 @@
 
 package org.apache.nifi.csv;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.csv.CSVFormat;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
-@Tags({"csv", "result", "set", "writer", "serializer", "record", "row"})
-@CapabilityDescription("Writes the contents of a Database ResultSet as CSV 
data. The first line written "
+@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", 
"row", "tsv", "tab", "separated", "delimited"})
+@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The 
first line written "
     + "will be the column names. All subsequent lines will be the values 
corresponding to those columns.")
-public class CSVRecordSetWriter extends AbstractRecordSetWriter implements 
RecordSetWriterFactory {
+public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements 
RecordSetWriterFactory {
+
+    private volatile CSVFormat csvFormat;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(CSVUtils.CSV_FORMAT);
+        properties.add(CSVUtils.VALUE_SEPARATOR);
+        properties.add(CSVUtils.QUOTE_CHAR);
+        properties.add(CSVUtils.ESCAPE_CHAR);
+        properties.add(CSVUtils.COMMENT_MARKER);
+        properties.add(CSVUtils.NULL_STRING);
+        properties.add(CSVUtils.TRIM_FIELDS);
+        properties.add(CSVUtils.QUOTE_MODE);
+        properties.add(CSVUtils.RECORD_SEPARATOR);
+        properties.add(CSVUtils.TRAILING_DELIMITER);
+        return properties;
+    }
+
+    @OnEnabled
+    public void storeCsvFormat(final ConfigurationContext context) {
+        this.csvFormat = CSVUtils.createCSVFormat(context);
+    }
 
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger) {
-        return new WriteCSVResult(getDateFormat(), getTimeFormat(), 
getTimestampFormat());
+        return new WriteCSVResult(csvFormat, getDateFormat(), getTimeFormat(), 
getTimestampFormat());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
new file mode 100644
index 0000000..e23b6e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.QuoteMode;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public class CSVUtils {
+
+    static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom 
Format",
+        "The format of the CSV is configured by using the properties of this 
Controller Service, such as Value Separator");
+    static final AllowableValue RFC_4180 = new AllowableValue("rfc-4180", "RFC 
4180", "CSV data follows the RFC 4180 Specification defined at 
https://tools.ietf.org/html/rfc4180";);
+    static final AllowableValue EXCEL = new AllowableValue("excel", "Microsoft 
Excel", "CSV data follows the format used by Microsoft Excel");
+    static final AllowableValue TDF = new AllowableValue("tdf", 
"Tab-Delimited", "CSV data is Tab-Delimited instead of Comma Delimited");
+    static final AllowableValue INFORMIX_UNLOAD = new 
AllowableValue("informix-unload", "Informix Unload", "The format used by 
Informix when issuing the UNLOAD TO file_name command");
+    static final AllowableValue INFORMIX_UNLOAD_CSV = new 
AllowableValue("informix-unload", "Informix Unload Escape Disabled",
+        "The format used by Informix when issuing the UNLOAD TO file_name 
command with escaping disabled");
+    static final AllowableValue MYSQL = new AllowableValue("mysql", "MySQL 
Format", "CSV data follows the format used by MySQL");
+
+    static final PropertyDescriptor CSV_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("CSV Format")
+        .description("Specifies which \"format\" the CSV data is in, or 
specifies if custom formatting should be used.")
+        .expressionLanguageSupported(false)
+        .allowableValues(CUSTOM, RFC_4180, EXCEL, TDF, MYSQL, INFORMIX_UNLOAD, 
INFORMIX_UNLOAD_CSV)
+        .defaultValue(CUSTOM.getValue())
+        .required(true)
+        .build();
+    static final PropertyDescriptor VALUE_SEPARATOR = new 
PropertyDescriptor.Builder()
+        .name("Value Separator")
+        .description("The character that is used to separate values/fields in 
a CSV Record")
+        .addValidator(new SingleCharacterValidator())
+        .expressionLanguageSupported(false)
+        .defaultValue(",")
+        .required(true)
+        .build();
+    static final PropertyDescriptor QUOTE_CHAR = new 
PropertyDescriptor.Builder()
+        .name("Quote Character")
+        .description("The character that is used to quote values so that 
escape characters do not have to be used")
+        .addValidator(new SingleCharacterValidator())
+        .expressionLanguageSupported(false)
+        .defaultValue("\"")
+        .required(true)
+        .build();
+    static final PropertyDescriptor COMMENT_MARKER = new 
PropertyDescriptor.Builder()
+        .name("Comment Marker")
+        .description("The character that is used to denote the start of a 
comment. Any line that begins with this comment will be ignored.")
+        .addValidator(new SingleCharacterValidator())
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor ESCAPE_CHAR = new 
PropertyDescriptor.Builder()
+        .name("Escape Character")
+        .description("The character that is used to escape characters that 
would otherwise have a specific meaning to the CSV Parser.")
+        .addValidator(new SingleCharacterValidator())
+        .expressionLanguageSupported(false)
+        .defaultValue("\\")
+        .required(true)
+        .build();
+    static final PropertyDescriptor NULL_STRING = new 
PropertyDescriptor.Builder()
+        .name("Null String")
+        .description("Specifies a String that, if present as a value in the 
CSV, should be considered a null field instead of using the literal value.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor TRIM_FIELDS = new 
PropertyDescriptor.Builder()
+        .name("Trim Fields")
+        .description("Whether or not white space should be removed from the 
beginning and end of fields")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+
+    // CSV Format fields for writers only
+    static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote 
All Values", "All values will be quoted using the configured quote character.");
+    static final AllowableValue QUOTE_MINIMAL = new AllowableValue("MINIMAL", 
"Quote Minimal",
+        "Values will be quoted only if they are contain special characters 
such as newline characters or field separators.");
+    static final AllowableValue QUOTE_NON_NUMERIC = new 
AllowableValue("NON_NUMERIC", "Quote Non-Numeric Values", "Values will be 
quoted unless the value is a number.");
+    static final AllowableValue QUOTE_NONE = new AllowableValue("NONE", "Do 
Not Quote Values",
+        "Values will not be quoted. Instead, all special characters will be 
escaped using the configured escape character.");
+
+    static final PropertyDescriptor QUOTE_MODE = new 
PropertyDescriptor.Builder()
+        .name("Quote Mode")
+        .description("Specifies how fields should be quoted when they are 
written")
+        .expressionLanguageSupported(false)
+        .allowableValues(QUOTE_ALL, QUOTE_MINIMAL, QUOTE_NON_NUMERIC, 
QUOTE_NONE)
+        .defaultValue(QUOTE_MINIMAL.getValue())
+        .required(true)
+        .build();
+    static final PropertyDescriptor TRAILING_DELIMITER = new 
PropertyDescriptor.Builder()
+        .name("Include Trailing Delimiter")
+        .description("If true, a trailing delimiter will be added to each CSV 
Record that is written. If false, the trailing delimiter will be omitted.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_SEPARATOR = new 
PropertyDescriptor.Builder()
+        .name("Record Separator")
+        .description("Specifies the characters to use in order to separate CSV 
Records")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("\\n")
+        .required(true)
+        .build();
+
+
+    static CSVFormat createCSVFormat(final ConfigurationContext context) {
+        final String formatName = context.getProperty(CSV_FORMAT).getValue();
+        if (formatName.equalsIgnoreCase(CUSTOM.getValue())) {
+            return buildCustomFormat(context);
+        }
+        if (formatName.equalsIgnoreCase(RFC_4180.getValue())) {
+            return CSVFormat.RFC4180;
+        } else if (formatName.equalsIgnoreCase(EXCEL.getValue())) {
+            return CSVFormat.EXCEL;
+        } else if (formatName.equalsIgnoreCase(TDF.getValue())) {
+            return CSVFormat.TDF;
+        } else if (formatName.equalsIgnoreCase(MYSQL.getValue())) {
+            return CSVFormat.MYSQL;
+        } else if (formatName.equalsIgnoreCase(INFORMIX_UNLOAD.getValue())) {
+            return CSVFormat.INFORMIX_UNLOAD;
+        } else if 
(formatName.equalsIgnoreCase(INFORMIX_UNLOAD_CSV.getValue())) {
+            return CSVFormat.INFORMIX_UNLOAD_CSV;
+        } else {
+            return CSVFormat.DEFAULT;
+        }
+    }
+
+    private static char getChar(final ConfigurationContext context, final 
PropertyDescriptor property) {
+        return 
CSVUtils.unescape(context.getProperty(property).getValue()).charAt(0);
+    }
+
+    private static CSVFormat buildCustomFormat(final ConfigurationContext 
context) {
+        final char valueSeparator = getChar(context, VALUE_SEPARATOR);
+        CSVFormat format = CSVFormat.newFormat(valueSeparator)
+            .withAllowMissingColumnNames()
+            .withIgnoreEmptyLines()
+            .withFirstRecordAsHeader();
+
+        format = format.withQuote(getChar(context, QUOTE_CHAR));
+        format = format.withEscape(getChar(context, ESCAPE_CHAR));
+        format = format.withTrim(context.getProperty(TRIM_FIELDS).asBoolean());
+
+        if (context.getProperty(COMMENT_MARKER).isSet()) {
+            format = format.withCommentMarker(getChar(context, 
COMMENT_MARKER));
+        }
+        if (context.getProperty(NULL_STRING).isSet()) {
+            format = 
format.withNullString(CSVUtils.unescape(context.getProperty(NULL_STRING).getValue()));
+        }
+
+        final PropertyValue quoteValue = context.getProperty(QUOTE_MODE);
+        if (quoteValue != null) {
+            final QuoteMode quoteMode = 
QuoteMode.valueOf(quoteValue.getValue());
+            format = format.withQuoteMode(quoteMode);
+        }
+
+        final PropertyValue trailingDelimiterValue = 
context.getProperty(TRAILING_DELIMITER);
+        if (trailingDelimiterValue != null) {
+            final boolean trailingDelimiter = 
trailingDelimiterValue.asBoolean();
+            format = format.withTrailingDelimiter(trailingDelimiter);
+        }
+
+        final PropertyValue recordSeparator = 
context.getProperty(RECORD_SEPARATOR);
+        if (recordSeparator != null) {
+            final String separator = unescape(recordSeparator.getValue());
+            format = format.withRecordSeparator(separator);
+        }
+
+        return format;
+    }
+
+
+    public static String unescape(final String input) {
+        if (input == null) {
+            return input;
+        }
+
+        return input.replace("\\t", "\t")
+            .replace("\\n", "\n")
+            .replace("\\r", "\r");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java
new file mode 100644
index 0000000..b24dea9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class SingleCharacterValidator implements Validator {
+    private static final Set<String> illegalChars = new HashSet<>();
+    static {
+        illegalChars.add("\r");
+        illegalChars.add("\n");
+    }
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, 
final ValidationContext context) {
+        final String unescaped = CSVUtils.unescape(input);
+        if (unescaped.length() != 1) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Value must be exactly 1 character but was " + 
input.length() + " in length")
+                .build();
+        }
+
+        if (illegalChars.contains(input)) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation(input + " is not a valid character for this 
property")
+                .build();
+        }
+
+        return new ValidationResult.Builder()
+            .input(input)
+            .subject(subject)
+            .valid(true)
+            .build();
+    }
+
+}

Reply via email to