http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
new file mode 100644
index 0000000..286326a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.JsonToken;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
+
+
+public abstract class AbstractJsonRowRecordReader implements RecordReader {
+    private final ComponentLog logger;
+    private final JsonParser jsonParser;
+    private final JsonFactory jsonFactory;
+    private final boolean array;
+    private final JsonNode firstJsonNode;
+
+    private boolean firstObjectConsumed = false;
+
+    private static final TimeZone gmt = TimeZone.getTimeZone("GMT");
+
+
+    public AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
+        this.logger = logger;
+
+        jsonFactory = new JsonFactory();
+        try {
+            jsonParser = jsonFactory.createJsonParser(in);
+            jsonParser.setCodec(new ObjectMapper());
+
+            JsonToken token = jsonParser.nextToken();
+            if (token == JsonToken.START_ARRAY) {
+                array = true;
+                token = jsonParser.nextToken(); // advance to START_OBJECT 
token
+            } else {
+                array = false;
+            }
+
+            if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
+                firstJsonNode = jsonParser.readValueAsTree();
+            } else {
+                firstJsonNode = null;
+            }
+        } catch (final JsonParseException e) {
+            throw new MalformedRecordException("Could not parse data as JSON", 
e);
+        }
+    }
+
+    @Override
+    public Record nextRecord() throws IOException, MalformedRecordException {
+        if (firstObjectConsumed && !array) {
+            return null;
+        }
+
+        final JsonNode nextNode = getNextJsonNode();
+        final RecordSchema schema = getSchema();
+        try {
+            return convertJsonNodeToRecord(nextNode, schema);
+        } catch (final MalformedRecordException mre) {
+            throw mre;
+        } catch (final IOException ioe) {
+            throw ioe;
+        } catch (final Exception e) {
+            logger.debug("Failed to convert JSON Element {} into a Record 
object using schema {} due to {}", new Object[] {nextNode, schema, 
e.toString(), e});
+            throw new MalformedRecordException("Successfully parsed a JSON 
object from input but failed to convert into a Record object with the given 
schema", e);
+        }
+    }
+
+    protected DataType determineFieldType(final JsonNode node) {
+        if (node.isDouble()) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (node.isBoolean()) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (node.isFloatingPointNumber()) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (node.isBigInteger()) {
+            return RecordFieldType.BIGINT.getDataType();
+        }
+        if (node.isBigDecimal()) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (node.isLong()) {
+            return RecordFieldType.LONG.getDataType();
+        }
+        if (node.isInt()) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (node.isTextual()) {
+            return RecordFieldType.STRING.getDataType();
+        }
+        if (node.isArray()) {
+            return RecordFieldType.ARRAY.getDataType();
+        }
+
+        final RecordSchema childSchema = determineSchema(node);
+        return RecordFieldType.RECORD.getDataType(childSchema);
+    }
+
+    protected RecordSchema determineSchema(final JsonNode jsonNode) {
+        final List<RecordField> recordFields = new ArrayList<>();
+
+        final Iterator<Map.Entry<String, JsonNode>> itr = jsonNode.getFields();
+        while (itr.hasNext()) {
+            final Map.Entry<String, JsonNode> entry = itr.next();
+            final String elementName = entry.getKey();
+            final JsonNode node = entry.getValue();
+
+            DataType dataType = determineFieldType(node);
+            recordFields.add(new RecordField(elementName, dataType));
+        }
+
+        return new SimpleRecordSchema(recordFields);
+    }
+
+    protected Object convertField(final JsonNode fieldNode, final String 
fieldName, final DataType desiredType) throws IOException, 
MalformedRecordException {
+        if (fieldNode == null || fieldNode.isNull()) {
+            return null;
+        }
+
+        switch (desiredType.getFieldType()) {
+            case BOOLEAN:
+                return fieldNode.asBoolean();
+            case BYTE:
+                return (byte) fieldNode.asInt();
+            case CHAR:
+                final String text = fieldNode.asText();
+                if (text.isEmpty()) {
+                    return null;
+                }
+                return text.charAt(0);
+            case DOUBLE:
+                return fieldNode.asDouble();
+            case FLOAT:
+                return (float) fieldNode.asDouble();
+            case INT:
+                return fieldNode.asInt();
+            case LONG:
+                return fieldNode.asLong();
+            case SHORT:
+                return (short) fieldNode.asInt();
+            case STRING:
+                return fieldNode.asText();
+            case DATE: {
+                final String string = fieldNode.asText();
+                if (string.isEmpty()) {
+                    return null;
+                }
+
+                try {
+                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
+                    dateFormat.setTimeZone(gmt);
+                    final Date date = dateFormat.parse(string);
+                    return new java.sql.Date(date.getTime());
+                } catch (ParseException e) {
+                    logger.warn("Failed to convert JSON field to Date for 
field {} (value {})", new Object[] {fieldName, string, e});
+                    return null;
+                }
+            }
+            case TIME: {
+                final String string = fieldNode.asText();
+                if (string.isEmpty()) {
+                    return null;
+                }
+
+                try {
+                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
+                    dateFormat.setTimeZone(gmt);
+                    final Date date = dateFormat.parse(string);
+                    return new java.sql.Date(date.getTime());
+                } catch (ParseException e) {
+                    logger.warn("Failed to convert JSON field to Time for 
field {} (value {})", new Object[] {fieldName, string, e});
+                    return null;
+                }
+            }
+            case TIMESTAMP: {
+                final String string = fieldNode.asText();
+                if (string.isEmpty()) {
+                    return null;
+                }
+
+                try {
+                    final DateFormat dateFormat = new 
SimpleDateFormat(desiredType.getFormat());
+                    dateFormat.setTimeZone(gmt);
+                    final Date date = dateFormat.parse(string);
+                    return new java.sql.Date(date.getTime());
+                } catch (ParseException e) {
+                    logger.warn("Failed to convert JSON field to Timestamp for 
field {} (value {})", new Object[] {fieldName, string, e});
+                    return null;
+                }
+            }
+            case ARRAY: {
+                final ArrayNode arrayNode = (ArrayNode) fieldNode;
+                final int numElements = arrayNode.size();
+                final Object[] arrayElements = new Object[numElements];
+                int count = 0;
+                for (final JsonNode node : arrayNode) {
+                    final Object converted = convertField(node, fieldName, 
determineFieldType(node));
+                    arrayElements[count++] = converted;
+                }
+
+                return arrayElements;
+            }
+            case RECORD: {
+                if (fieldNode.isObject()) {
+                    final Optional<RecordSchema> childSchema = 
desiredType.getChildRecordSchema();
+                    if (!childSchema.isPresent()) {
+                        return null;
+                    }
+
+                    return convertJsonNodeToRecord(fieldNode, 
childSchema.get());
+                } else {
+                    return fieldNode.toString();
+                }
+            }
+        }
+
+        return fieldNode.toString();
+    }
+
+    private JsonNode getNextJsonNode() throws JsonParseException, IOException, 
MalformedRecordException {
+        if (!firstObjectConsumed) {
+            firstObjectConsumed = true;
+            return firstJsonNode;
+        }
+
+        while (true) {
+            final JsonToken token = jsonParser.nextToken();
+            if (token == null) {
+                return null;
+            }
+
+            switch (token) {
+                case END_OBJECT:
+                    continue;
+                case START_OBJECT:
+                    return jsonParser.readValueAsTree();
+                case END_ARRAY:
+                case START_ARRAY:
+                    return null;
+                default:
+                    throw new MalformedRecordException("Expected to get a JSON 
Object but got a token of type " + token.name());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        jsonParser.close();
+    }
+
+    protected JsonParser getJsonParser() {
+        return jsonParser;
+    }
+
+    protected JsonFactory getJsonFactory() {
+        return jsonFactory;
+    }
+
+    protected Optional<JsonNode> getFirstJsonNode() {
+        return Optional.ofNullable(firstJsonNode);
+    }
+
+    protected abstract Record convertJsonNodeToRecord(final JsonNode nextNode, 
final RecordSchema schema) throws IOException, MalformedRecordException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
new file mode 100644
index 0000000..b43b1c1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.record.DataType;
+
+import com.jayway.jsonpath.JsonPath;
+
+@Tags({"json", "jsonpath", "record", "reader", "parser"})
+@CapabilityDescription("Parses JSON records and evaluates user-defined JSON 
Path's against each JSON object. The root element may be either "
+    + "a single JSON object or a JSON array. If a JSON array is found, each 
JSON object within that array is treated as a separate record. "
+    + "User-defined properties define the fields that should be extracted from 
the JSON in order to form the fields of a Record. Any JSON field "
+    + "that is not extracted via a JSONPath will not be returned in the JSON 
Records.")
+@SeeAlso(JsonTreeReader.class)
+@DynamicProperty(name = "The field name for the record. If it is desirable to 
enforce that the value be coerced into a given type, its type can be included "
+    + "in the name by using a syntax of <field name>:<field type>. For 
example, \"balance:double\".",
+    value="A JSONPath Expression that will be evaluated against each JSON 
record. The result of the JSONPath will be the value of the "
+        + "field whose name is the same as the property name.",
+    description="User-defined properties identifiy how to extract specific 
fields from a JSON object in order to create a Record",
+    supportsExpressionLanguage=false)
+public class JsonPathReader extends AbstractControllerService implements 
RowRecordReaderFactory {
+
+    private volatile LinkedHashMap<String, JsonPath> jsonPaths;
+    private volatile Map<String, DataType> fieldTypeOverrides;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .description("JsonPath Expression that indicates how to retrieve 
the value from a JSON Object for the '" + propertyDescriptorName + "' column")
+            .dynamic(true)
+            .required(false)
+            .addValidator(new JsonPathValidator())
+            .build();
+    }
+
+    @OnEnabled
+    public void compileJsonPaths(final ConfigurationContext context) {
+        final Map<String, DataType> fieldTypes = new 
HashMap<>(context.getProperties().size());
+
+        final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>();
+        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
+            if (!descriptor.isDynamic()) {
+                continue;
+            }
+
+            final String fieldName = 
PropertyNameUtil.getFieldName(descriptor.getName());
+            final Optional<DataType> dataTypeOption = 
PropertyNameUtil.getDataType(descriptor.getName());
+            if (dataTypeOption.isPresent()) {
+                fieldTypes.put(fieldName, dataTypeOption.get());
+            }
+
+            final String expression = 
context.getProperty(descriptor).getValue();
+            final JsonPath jsonPath = JsonPath.compile(expression);
+            compiled.put(fieldName, jsonPath);
+        }
+
+        jsonPaths = compiled;
+        fieldTypeOverrides = fieldTypes;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        boolean pathSpecified = false;
+        for (final PropertyDescriptor property : 
validationContext.getProperties().keySet()) {
+            if (property.isDynamic()) {
+                pathSpecified = true;
+                break;
+            }
+        }
+
+        if (pathSpecified) {
+            return Collections.emptyList();
+        }
+
+        return Collections.singleton(new ValidationResult.Builder()
+            .subject("JSON Paths")
+            .valid(false)
+            .explanation("No JSON Paths were specified")
+            .build());
+    }
+
+    @Override
+    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
+        return new JsonPathRowRecordReader(jsonPaths, fieldTypeOverrides, in, 
logger);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
new file mode 100644
index 0000000..9654b97
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DataTypeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.codehaus.jackson.JsonNode;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+
+public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
+    private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
+
+    private static final String TIME_FORMAT_DATE = "yyyy-MM-dd";
+    private static final String TIME_FORMAT_TIME = "HH:mm:ss";
+    private static final String TIME_FORMAT_TIMESTAMP = "yyyy-MM-dd HH:mm:ss";
+    private static final TimeZone gmt = TimeZone.getTimeZone("GMT");
+
+    private final LinkedHashMap<String, JsonPath> jsonPaths;
+    private final Map<String, DataType> fieldTypeOverrides;
+    private final InputStream in;
+    private RecordSchema schema;
+
+    public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> 
jsonPaths, final Map<String, DataType> fieldTypeOverrides, final InputStream 
in, final ComponentLog logger)
+        throws MalformedRecordException, IOException {
+        super(in, logger);
+
+        this.jsonPaths = jsonPaths;
+        this.fieldTypeOverrides = fieldTypeOverrides;
+        this.in = in;
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+
+        final Optional<JsonNode> firstNodeOption = getFirstJsonNode();
+
+        final List<RecordField> recordFields = new ArrayList<>();
+        if (firstNodeOption.isPresent()) {
+            final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(firstNodeOption.get().toString());
+            for (final Map.Entry<String, JsonPath> entry : 
jsonPaths.entrySet()) {
+                final String fieldName = 
PropertyNameUtil.getFieldName(entry.getKey());
+                final JsonPath jsonPath = entry.getValue();
+
+                final DataType dataType;
+                final DataType dataTypeOverride = 
fieldTypeOverrides.get(fieldName);
+                if (dataTypeOverride == null) {
+                    Object value;
+                    try {
+                        value = ctx.read(jsonPath);
+                    } catch (final PathNotFoundException pnfe) {
+                        value = null;
+                    }
+
+                    if (value == null) {
+                        dataType = RecordFieldType.STRING.getDataType();
+                    } else {
+                        dataType = DataTypeUtils.inferDataType(value);
+                    }
+                } else {
+                    dataType = dataTypeOverride;
+                }
+
+                recordFields.add(new RecordField(fieldName, dataType));
+            }
+        }
+
+        // If there are any overridden field types that we didn't find, add as 
the last fields.
+        final Set<String> knownFieldNames = recordFields.stream()
+            .map(f -> f.getFieldName())
+            .collect(Collectors.toSet());
+
+        for (final Map.Entry<String, DataType> entry : 
fieldTypeOverrides.entrySet()) {
+            if (!knownFieldNames.contains(entry.getKey())) {
+                recordFields.add(new RecordField(entry.getKey(), 
entry.getValue()));
+            }
+        }
+
+        schema = new SimpleRecordSchema(recordFields);
+        return schema;
+    }
+
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException {
+        if (jsonNode == null) {
+            return null;
+        }
+
+        final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
+        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
+
+        for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
+            final JsonPath jsonPath = entry.getValue();
+
+            Object value;
+            try {
+                value = ctx.read(jsonPath);
+            } catch (final PathNotFoundException pnfe) {
+                value = null;
+            }
+
+            final String fieldName = entry.getKey();
+            if (value != null) {
+                final DataType determinedType = 
DataTypeUtils.inferDataType(value);
+                final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
+
+                if (value instanceof List) {
+                    value = ((List<Object>) value).toArray();
+                } else if (value instanceof Map && desiredType.getFieldType() 
== RecordFieldType.RECORD) {
+                    value = convert(desiredType, value);
+                } else if (desiredType != null && 
!determinedType.equals(desiredType) && shouldConvert(value, 
determinedType.getFieldType())) {
+                    value = convert(desiredType, value);
+                }
+            }
+
+            values.put(fieldName, value);
+        }
+
+        return new MapRecord(schema, values);
+    }
+
+    private boolean shouldConvert(final Object value, final RecordFieldType 
determinedType) {
+        return determinedType != null
+            && determinedType != RecordFieldType.ARRAY;
+    }
+
+
+    protected Object convert(final DataType dataType, final Object value) {
+        if (dataType.getFieldType() == RecordFieldType.RECORD && 
dataType.getChildRecordSchema().isPresent() && value instanceof Map) {
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> map = (Map<String, Object>) value;
+            return new MapRecord(dataType.getChildRecordSchema().get(), map);
+        } else {
+            return convertString(dataType, value.toString());
+        }
+    }
+
+    /**
+     * Coerces the given string into the provided data type, if possible
+     *
+     * @param dataType the desired type
+     * @param string the string representation of the value
+     * @return an Object representing the same value as the given string but 
in the requested data type
+     */
+    protected Object convertString(final DataType dataType, final String 
string) {
+        if (dataType == null) {
+            return string;
+        }
+
+        switch (dataType.getFieldType()) {
+            case BOOLEAN:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Boolean.parseBoolean(string);
+            case BYTE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Byte.parseByte(string);
+            case SHORT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Short.parseShort(string);
+            case INT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Integer.parseInt(string);
+            case LONG:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Long.parseLong(string);
+            case FLOAT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Float.parseFloat(string);
+            case DOUBLE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Double.parseDouble(string);
+            case DATE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_DATE);
+                    format.setTimeZone(gmt);
+                    Date date = format.parse(string);
+                    return new java.sql.Date(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case TIME:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_TIME);
+                    format.setTimeZone(gmt);
+                    Date date = format.parse(string);
+                    return new java.sql.Time(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case TIMESTAMP:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    final DateFormat format = new 
SimpleDateFormat(TIME_FORMAT_TIMESTAMP);
+                    format.setTimeZone(gmt);
+                    Date date = format.parse(string);
+                    return new java.sql.Timestamp(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case STRING:
+            default:
+                return string;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
new file mode 100644
index 0000000..626f56c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import com.jayway.jsonpath.JsonPath;
+
+public class JsonPathValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, 
final ValidationContext context) {
+        if (PropertyNameUtil.hasFieldType(subject) && 
!PropertyNameUtil.isFieldTypeValid(subject)) {
+            final String fieldType = 
PropertyNameUtil.getFieldTypeName(subject).get();
+
+            return new ValidationResult.Builder()
+                .subject(subject)
+                .input(input)
+                .valid(false)
+                .explanation("Invalid field type. If property name contains a 
colon (:) it must use syntax of "
+                    + "<field name>:<field type> but the specified field type 
('" + fieldType + "') is not a valid field type")
+                .build();
+        }
+
+        try {
+            JsonPath.compile(input);
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .subject(subject)
+                .input(input)
+                .valid(false)
+                .explanation("Invalid JSON Path Expression: " + e.getMessage())
+                .build();
+        }
+
+        return new ValidationResult.Builder()
+            .subject(subject)
+            .input(input)
+            .valid(true)
+            .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
new file mode 100644
index 0000000..dc75a51
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"json", "resultset", "writer", "serialize", "record", "row"})
+@CapabilityDescription("Writes the results of a Database ResultSet as a JSON 
Array. Even if the ResultSet "
+    + "consists of a single row, it will be written as an array with a single 
element.")
+public class JsonRecordSetWriter extends AbstractRecordSetWriter implements 
RecordSetWriterFactory {
+
+    static final PropertyDescriptor PRETTY_PRINT_JSON = new 
PropertyDescriptor.Builder()
+        .name("Pretty Print JSON")
+        .description("Specifies whether or not the JSON should be pretty 
printed")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
+
+    private boolean prettyPrint;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(PRETTY_PRINT_JSON);
+        return properties;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean();
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger) {
+        return new WriteJsonResult(logger, prettyPrint, getDateFormat(), 
getTimeFormat(), getTimestampFormat());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
new file mode 100644
index 0000000..2d7072a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+
+@Tags({"json", "tree", "record", "reader", "parser"})
+@CapabilityDescription("Parses JSON into individual Record objects. The Record 
that is produced will contain all top-level "
+    + "elements of the corresponding JSON Object. If the JSON has nested 
arrays, those values will be represented as an Object array for that field. "
+    + "Nested JSON objects will be represented as a Map. "
+    + "The root JSON element can be either a single element or an array of 
JSON elements, and each "
+    + "element in that array will be treated as a separate record. If any of 
the elements has a nested array or a nested "
+    + "element, they will be returned as OBJECT or ARRAY types (respectively), 
not flattened out into individual fields. "
+    + "The schema for the record is determined by the first JSON element in 
the array, if the incoming FlowFile is a JSON array. "
+    + "This means that if a field does not exist in the first JSON object, 
then it will be skipped in all subsequent JSON objects. "
+    + "The data type of a field can be overridden by adding a property to "
+    + "the controller service where the name of the property matches the JSON 
field name and the value of the property is "
+    + "the data type to use. If that field does not exist in a JSON element, 
the field will be assumed to be null. "
+    + "See the Usage of the Controller Service for more information.")
+@SeeAlso(JsonPathReader.class)
+@DynamicProperty(name = "<name of JSON field>", value = "<data type of JSON 
field>",
+    description = "User-defined properties are used to indicate that the 
values of a specific field should be interpreted as a "
+    + "user-defined data type (e.g., int, double, float, date, etc.)", 
supportsExpressionLanguage = false)
+public class JsonTreeReader extends UserTypeOverrideRowReader implements 
RowRecordReaderFactory {
+
+    @Override
+    public RecordReader createRecordReader(final InputStream in, final 
ComponentLog logger) throws IOException, MalformedRecordException {
+        return new JsonTreeRowRecordReader(in, logger, 
getFieldTypeOverrides());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
new file mode 100644
index 0000000..4a2d212
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.codehaus.jackson.JsonNode;
+
+
+public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
+    private final Map<String, DataType> fieldTypeOverrides;
+    private RecordSchema schema;
+
+    public JsonTreeRowRecordReader(final InputStream in, final ComponentLog 
logger, final Map<String, DataType> fieldTypeOverrides) throws IOException, 
MalformedRecordException {
+        super(in, logger);
+        this.fieldTypeOverrides = fieldTypeOverrides;
+    }
+
+    @Override
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema) throws IOException, MalformedRecordException {
+        if (jsonNode == null) {
+            return null;
+        }
+
+        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            final RecordField field = schema.getField(i);
+            final String fieldName = field.getFieldName();
+            final JsonNode fieldNode = jsonNode.get(fieldName);
+
+            final DataType desiredType = field.getDataType();
+            final Object value = convertField(fieldNode, fieldName, 
desiredType);
+            values.put(fieldName, value);
+        }
+
+        return new MapRecord(schema, values);
+    }
+
+
+    @Override
+    public RecordSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+
+        final List<RecordField> recordFields = new ArrayList<>();
+        final Optional<JsonNode> firstNodeOption = getFirstJsonNode();
+
+        if (firstNodeOption.isPresent()) {
+            final Iterator<Map.Entry<String, JsonNode>> itr = 
firstNodeOption.get().getFields();
+            while (itr.hasNext()) {
+                final Map.Entry<String, JsonNode> entry = itr.next();
+                final String elementName = entry.getKey();
+                final JsonNode node = entry.getValue();
+
+                DataType dataType;
+                final DataType overriddenDataType = 
fieldTypeOverrides.get(elementName);
+                if (overriddenDataType == null) {
+                    dataType = determineFieldType(node);
+                } else {
+                    dataType = overriddenDataType;
+                }
+
+                recordFields.add(new RecordField(elementName, dataType));
+            }
+        }
+
+        // If there are any overridden field types that we didn't find, add as 
the last fields.
+        final Set<String> knownFieldNames = recordFields.stream()
+            .map(f -> f.getFieldName())
+            .collect(Collectors.toSet());
+
+        for (final Map.Entry<String, DataType> entry : 
fieldTypeOverrides.entrySet()) {
+            if (!knownFieldNames.contains(entry.getKey())) {
+                recordFields.add(new RecordField(entry.getKey(), 
entry.getValue()));
+            }
+        }
+
+        schema = new SimpleRecordSchema(recordFields);
+        return schema;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
new file mode 100644
index 0000000..3b7dcf9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.util.Optional;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public class PropertyNameUtil {
+
+    public static String getFieldName(final String propertyName) {
+        final int colonIndex = propertyName.indexOf(":");
+        if (colonIndex > -1 && colonIndex < propertyName.length() - 1) {
+            return propertyName.substring(0, colonIndex);
+        }
+
+        return propertyName;
+    }
+
+    public static boolean hasFieldType(final String propertyName) {
+        final int colonIndex = propertyName.indexOf(":");
+        return (colonIndex > -1 && colonIndex < propertyName.length() - 1);
+    }
+
+    public static Optional<String> getFieldTypeName(final String propertyName) 
{
+        if (hasFieldType(propertyName)) {
+            final String[] splits = propertyName.split("\\:");
+            if (splits.length > 1) {
+                return Optional.of(splits[1]);
+            }
+            return Optional.empty();
+        }
+
+        return Optional.empty();
+    }
+
+    public static Optional<String> getFieldFormat(final String propertyName) {
+        final String[] splits = propertyName.split("\\:");
+        if (splits.length != 3) {
+            return Optional.empty();
+        }
+
+        return Optional.of(splits[2]);
+    }
+
+    public static boolean isFieldTypeValid(final String propertyName) {
+        final Optional<String> fieldType = getFieldTypeName(propertyName);
+        if (!fieldType.isPresent()) {
+            return false;
+        }
+
+        final String typeName = fieldType.get();
+        final RecordFieldType recordFieldType = RecordFieldType.of(typeName);
+        return recordFieldType != null;
+    }
+
+    public static Optional<DataType> getDataType(final String propertyName) {
+        if (isFieldTypeValid(propertyName)) {
+            final String typeName = getFieldTypeName(propertyName).get();
+            final RecordFieldType fieldType = RecordFieldType.of(typeName);
+
+            final Optional<String> format = getFieldFormat(propertyName);
+            if (format.isPresent()) {
+                return Optional.of(fieldType.getDataType(format.get()));
+            } else {
+                return Optional.of(fieldType.getDataType());
+            }
+        }
+
+        return Optional.empty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
new file mode 100644
index 0000000..cf72b19
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.sql.Array;
+import java.sql.SQLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DataTypeUtils;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonGenerator;
+
+public class WriteJsonResult implements RecordSetWriter {
+    private final boolean prettyPrint;
+
+    private final ComponentLog logger;
+    private final JsonFactory factory = new JsonFactory();
+    private final DateFormat dateFormat;
+    private final DateFormat timeFormat;
+    private final DateFormat timestampFormat;
+
+    public WriteJsonResult(final ComponentLog logger, final boolean 
prettyPrint, final String dateFormat, final String timeFormat, final String 
timestampFormat) {
+        this.prettyPrint = prettyPrint;
+        this.dateFormat = new SimpleDateFormat(dateFormat);
+        this.timeFormat = new SimpleDateFormat(timeFormat);
+        this.timestampFormat = new SimpleDateFormat(timestampFormat);
+        this.logger = logger;
+    }
+
+    @Override
+    public WriteResult write(final RecordSet rs, final OutputStream rawOut) 
throws IOException {
+        int count = 0;
+
+        try (final JsonGenerator generator = factory.createJsonGenerator(new 
NonCloseableOutputStream(rawOut))) {
+            if (prettyPrint) {
+                generator.useDefaultPrettyPrinter();
+            }
+
+            generator.writeStartArray();
+
+            Record record;
+            while ((record = rs.next()) != null) {
+                count++;
+                writeRecord(record, generator, g -> g.writeStartObject(), g -> 
g.writeEndObject());
+            }
+
+            generator.writeEndArray();
+        } catch (final SQLException e) {
+            throw new IOException("Failed to serialize Result Set to stream", 
e);
+        }
+
+        return WriteResult.of(count, Collections.emptyMap());
+    }
+
+    @Override
+    public WriteResult write(final Record record, final OutputStream rawOut) 
throws IOException {
+        try (final JsonGenerator generator = factory.createJsonGenerator(new 
NonCloseableOutputStream(rawOut))) {
+            if (prettyPrint) {
+                generator.useDefaultPrettyPrinter();
+            }
+
+            writeRecord(record, generator, g -> g.writeStartObject(), g -> 
g.writeEndObject());
+        } catch (final SQLException e) {
+            throw new IOException("Failed to write records to stream", e);
+        }
+
+        return WriteResult.of(1, Collections.emptyMap());
+    }
+
+    private void writeRecord(final Record record, final JsonGenerator 
generator, final GeneratorTask startTask, final GeneratorTask endTask)
+        throws JsonGenerationException, IOException, SQLException {
+
+        try {
+            final RecordSchema schema = record.getSchema();
+            startTask.apply(generator);
+            for (int i = 0; i < schema.getFieldCount(); i++) {
+                final String fieldName = schema.getField(i).getFieldName();
+                final Object value = record.getValue(fieldName);
+                if (value == null) {
+                    generator.writeNullField(fieldName);
+                    continue;
+                }
+
+                generator.writeFieldName(fieldName);
+                final DataType dataType = schema.getDataType(fieldName).get();
+
+                writeValue(generator, value, dataType, i < 
schema.getFieldCount() - 1);
+            }
+
+            endTask.apply(generator);
+        } catch (final Exception e) {
+            logger.error("Failed to write {} with schema {} as a JSON Object 
due to {}", new Object[] {record, record.getSchema(), e.toString(), e});
+            throw e;
+        }
+    }
+
+    private String createDate(final Object value, final DateFormat format) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            return format.format((Date) value);
+        }
+        if (value instanceof java.sql.Date) {
+            return format.format(new Date(((java.sql.Date) value).getTime()));
+        }
+        if (value instanceof java.sql.Time) {
+            return format.format(new Date(((java.sql.Time) value).getTime()));
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return format.format(new Date(((java.sql.Timestamp) 
value).getTime()));
+        }
+
+        return null;
+    }
+
+    private void writeValue(final JsonGenerator generator, final Object value, 
final DataType dataType, final boolean moreCols)
+        throws JsonGenerationException, IOException, SQLException {
+        if (value == null) {
+            generator.writeNull();
+            return;
+        }
+
+        final DataType resolvedDataType;
+        if (dataType.getFieldType() == RecordFieldType.CHOICE) {
+            resolvedDataType = DataTypeUtils.inferDataType(value);
+        } else {
+            resolvedDataType = dataType;
+        }
+
+        switch (resolvedDataType.getFieldType()) {
+            case DATE:
+                generator.writeString(createDate(value, dateFormat));
+                break;
+            case TIME:
+                generator.writeString(createDate(value, timeFormat));
+                break;
+            case TIMESTAMP:
+                generator.writeString(createDate(value, timestampFormat));
+                break;
+            case DOUBLE:
+                generator.writeNumber(DataTypeUtils.toDouble(value, 0D));
+                break;
+            case FLOAT:
+                generator.writeNumber(DataTypeUtils.toFloat(value, 0F));
+                break;
+            case LONG:
+                generator.writeNumber(DataTypeUtils.toLong(value, 0L));
+                break;
+            case INT:
+            case BYTE:
+            case SHORT:
+                generator.writeNumber(DataTypeUtils.toInteger(value, 0));
+                break;
+            case CHAR:
+            case STRING:
+                generator.writeString(value.toString());
+                break;
+            case BIGINT:
+                if (value instanceof Long) {
+                    generator.writeNumber(((Long) value).longValue());
+                } else {
+                    generator.writeNumber((BigInteger) value);
+                }
+                break;
+            case BOOLEAN:
+                final String stringValue = value.toString();
+                if ("true".equalsIgnoreCase(stringValue)) {
+                    generator.writeBoolean(true);
+                } else if ("false".equalsIgnoreCase(stringValue)) {
+                    generator.writeBoolean(false);
+                } else {
+                    generator.writeString(stringValue);
+                }
+                break;
+            case RECORD: {
+                final Record record = (Record) value;
+                writeRecord(record, generator, gen -> gen.writeStartObject(), 
gen -> gen.writeEndObject());
+                break;
+            }
+            case ARRAY:
+            default:
+                if ("null".equals(value.toString())) {
+                    generator.writeNull();
+                } else if (value instanceof Map) {
+                    final Map<?, ?> map = (Map<?, ?>) value;
+                    generator.writeStartObject();
+
+                    int i = 0;
+                    for (final Map.Entry<?, ?> entry : map.entrySet()) {
+                        generator.writeFieldName(entry.getKey().toString());
+                        final boolean moreEntries = ++i < map.size();
+                        writeValue(generator, entry.getValue(), 
getColType(entry.getValue()), moreEntries);
+                    }
+                    generator.writeEndObject();
+                } else if (value instanceof List) {
+                    final List<?> list = (List<?>) value;
+                    writeArray(list.toArray(), generator);
+                } else if (value instanceof Array) {
+                    final Array array = (Array) value;
+                    final Object[] values = (Object[]) array.getArray();
+                    writeArray(values, generator);
+                } else if (value instanceof Object[]) {
+                    final Object[] values = (Object[]) value;
+                    writeArray(values, generator);
+                } else {
+                    generator.writeString(value.toString());
+                }
+                break;
+        }
+    }
+
+    private void writeArray(final Object[] values, final JsonGenerator 
generator) throws JsonGenerationException, IOException, SQLException {
+        generator.writeStartArray();
+        for (int i = 0; i < values.length; i++) {
+            final boolean moreEntries = i < values.length - 1;
+            final Object element = values[i];
+            writeValue(generator, element, getColType(element), moreEntries);
+        }
+        generator.writeEndArray();
+    }
+
+    private DataType getColType(final Object value) {
+        if (value instanceof String) {
+            return RecordFieldType.STRING.getDataType();
+        }
+        if (value instanceof Double) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (value instanceof Float) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (value instanceof Integer) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (value instanceof Long) {
+            return RecordFieldType.LONG.getDataType();
+        }
+        if (value instanceof BigInteger) {
+            return RecordFieldType.BIGINT.getDataType();
+        }
+        if (value instanceof Boolean) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (value instanceof Byte || value instanceof Short) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (value instanceof Character) {
+            return RecordFieldType.STRING.getDataType();
+        }
+        if (value instanceof java.util.Date || value instanceof java.sql.Date) 
{
+            return RecordFieldType.DATE.getDataType();
+        }
+        if (value instanceof java.sql.Time) {
+            return RecordFieldType.TIME.getDataType();
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return RecordFieldType.TIMESTAMP.getDataType();
+        }
+        if (value instanceof Object[] || value instanceof List || value 
instanceof Array) {
+            return RecordFieldType.ARRAY.getDataType();
+        }
+
+        return RecordFieldType.RECORD.getDataType();
+    }
+
+    @Override
+    public String getMimeType() {
+        return "application/json";
+    }
+
+    private static interface GeneratorTask {
+        void apply(JsonGenerator generator) throws JsonGenerationException, 
IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
new file mode 100644
index 0000000..b58a22e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public abstract class AbstractRecordSetWriter extends 
AbstractControllerService {
+    static final PropertyDescriptor DATE_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("Date Format")
+        .description("Specifies the format to use when writing out Date 
fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.DATE.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor TIME_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("Time Format")
+        .description("Specifies the format to use when writing out Time 
fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.TIME.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+
+    static final PropertyDescriptor TIMESTAMP_FORMAT = new 
PropertyDescriptor.Builder()
+        .name("Timestamp Format")
+        .description("Specifies the format to use when writing out Timestamp 
(date/time) fields")
+        .expressionLanguageSupported(false)
+        .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat())
+        .addValidator(new SimpleDateFormatValidator())
+        .required(true)
+        .build();
+
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT);
+    }
+
+    @OnEnabled
+    public void captureValues(final ConfigurationContext context) {
+        this.dateFormat = context.getProperty(DATE_FORMAT).getValue();
+        this.timeFormat = context.getProperty(TIME_FORMAT).getValue();
+        this.timestampFormat = 
context.getProperty(TIMESTAMP_FORMAT).getValue();
+    }
+
+    protected String getDateFormat() {
+        return dateFormat;
+    }
+
+    protected String getTimeFormat() {
+        return timeFormat;
+    }
+
+    protected String getTimestampFormat() {
+        return timestampFormat;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java
new file mode 100644
index 0000000..de207f4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class DataTypeUtils {
+
+    public static Double toDouble(final Object value, final Double 
defaultValue) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).doubleValue();
+        }
+
+        if (value instanceof String) {
+            return Double.parseDouble((String) value);
+        }
+
+        return defaultValue;
+    }
+
+    public static Float toFloat(final Object value, final Float defaultValue) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).floatValue();
+        }
+
+        if (value instanceof String) {
+            return Float.parseFloat((String) value);
+        }
+
+        return defaultValue;
+    }
+
+    public static Long toLong(final Object value, final Long defaultValue) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).longValue();
+        }
+
+        if (value instanceof String) {
+            return Long.parseLong((String) value);
+        }
+
+        return defaultValue;
+    }
+
+
+
+    public static Integer toInteger(final Object value, final Integer 
defaultValue) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+
+        if (value instanceof String) {
+            return Integer.parseInt((String) value);
+        }
+
+        return defaultValue;
+    }
+
+
+    /**
+     * Deduces the type of RecordFieldType that should be used for a value of 
the given type,
+     * or returns <code>null</code> if the value is null
+     *
+     * @param value the value whose type should be deduced
+     * @return the type of RecordFieldType that should be used for a value of 
the given type,
+     *         or <code>null</code> if the value is null
+     */
+    public static DataType inferDataType(final Object value) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return RecordFieldType.STRING.getDataType();
+        }
+        if (value instanceof Long) {
+            return RecordFieldType.LONG.getDataType();
+        }
+        if (value instanceof Integer) {
+            return RecordFieldType.INT.getDataType();
+        }
+        if (value instanceof Double) {
+            return RecordFieldType.DOUBLE.getDataType();
+        }
+        if (value instanceof Float) {
+            return RecordFieldType.FLOAT.getDataType();
+        }
+        if (value instanceof Boolean) {
+            return RecordFieldType.BOOLEAN.getDataType();
+        }
+        if (value instanceof Byte) {
+            return RecordFieldType.BYTE.getDataType();
+        }
+        if (value instanceof Character) {
+            return RecordFieldType.CHAR.getDataType();
+        }
+        if (value instanceof Short) {
+            return RecordFieldType.SHORT.getDataType();
+        }
+        if (value instanceof Date) {
+            return RecordFieldType.DATE.getDataType();
+        }
+        if (value instanceof Object[] || value instanceof List) {
+            return RecordFieldType.ARRAY.getDataType();
+        }
+        if (value instanceof Map) {
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> map = (Map<String, Object>) value;
+            final RecordSchema childSchema = determineSchema(map);
+            return RecordFieldType.RECORD.getDataType(childSchema);
+        }
+
+        return RecordFieldType.RECORD.getDataType();
+    }
+
+    public static RecordSchema determineSchema(final Map<String, Object> 
valueMap) {
+        final List<RecordField> fields = new ArrayList<>(valueMap.size());
+        for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
+            final DataType valueType = inferDataType(entry.getValue());
+            final String fieldName = entry.getKey();
+            final RecordField field = new RecordField(fieldName, valueType);
+            fields.add(field);
+        }
+        return new SimpleRecordSchema(fields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
new file mode 100644
index 0000000..f25749b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.text.SimpleDateFormat;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class SimpleDateFormatValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, 
final ValidationContext context) {
+        try {
+            new SimpleDateFormat(input);
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Invalid Date format: " + e.getMessage())
+                .build();
+        }
+
+        return new ValidationResult.Builder()
+            .input(input)
+            .subject(subject)
+            .valid(true)
+            .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java
new file mode 100644
index 0000000..be0b8ad
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.serialization.DataTypeValidator;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+public abstract class UserTypeOverrideRowReader extends 
AbstractControllerService {
+    private volatile Map<String, DataType> fieldTypeOverrides;
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+            .name(propertyDescriptorName)
+            .dynamic(true)
+            .addValidator(new DataTypeValidator())
+            .build();
+    }
+
+    @OnEnabled
+    public void createFieldTypeOverrides(final ConfigurationContext context) {
+        final Map<String, DataType> overrides = new 
HashMap<>(context.getProperties().size());
+        for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
+            if (!entry.getKey().isDynamic()) {
+                continue;
+            }
+
+            final String fieldName = entry.getKey().getName();
+            final String dataTypeName = entry.getValue();
+            if (dataTypeName == null) {
+                continue;
+            }
+
+            final DataType dataType;
+            final String[] splits = dataTypeName.split("\\:");
+            if (splits.length == 2) {
+                final RecordFieldType fieldType = 
RecordFieldType.of(splits[0]);
+                final String format = splits[1];
+                dataType = fieldType.getDataType(format);
+            } else {
+                final RecordFieldType fieldType = 
RecordFieldType.of(dataTypeName);
+                dataType = fieldType.getDataType();
+            }
+
+            overrides.put(fieldName, dataType);
+        }
+
+        this.fieldTypeOverrides = Collections.unmodifiableMap(overrides);
+    }
+
+    protected Map<String, DataType> getFieldTypeOverrides() {
+        return fieldTypeOverrides;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
new file mode 100644
index 0000000..07da00e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.text;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"text", "freeform", "expression", "language", "el", "resultset", 
"writer", "serialize"})
+@CapabilityDescription("Writes the contents of a Database ResultSet as 
free-form text. The configured "
+    + "text is able to make use of the Expression Language to reference each 
of the columns that are available "
+    + "in the ResultSet. Each record in the ResultSet will be separated by a 
single newline character.")
+public class FreeFormTextRecordSetWriter extends AbstractControllerService 
implements RecordSetWriterFactory {
+    static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
+        .name("Text")
+        .description("The text to use when writing the results. This property 
will evaluate the Expression Language using any of the columns available to the 
Result Set. For example, if the "
+            + "following SQL Query is used: \"SELECT Name, COUNT(*) AS Count\" 
then the Expression can reference \"Name\" and \"Count\", such as 
\"${Name:toUpper()} ${Count:minus(1)}\"")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(true)
+        .build();
+    static final PropertyDescriptor CHARACTER_SET = new 
PropertyDescriptor.Builder()
+        .name("Character Set")
+        .description("The Character set to use when writing the data to the 
FlowFile")
+        .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+        .defaultValue("UTF-8")
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    private volatile PropertyValue textValue;
+    private volatile Charset characterSet;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(TEXT);
+        properties.add(CHARACTER_SET);
+        return properties;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        textValue = context.getProperty(TEXT);
+        characterSet = 
Charset.forName(context.getProperty(CHARACTER_SET).getValue());
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger) {
+        return new FreeFormTextWriter(textValue, characterSet);
+    }
+
+}

Reply via email to