Repository: nifi
Updated Branches:
  refs/heads/master c59a96762 -> c91d99884


NIFI-4717: Several minor bug fixes and performance improvements around 
record-oriented processors

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #2359


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c91d9988
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c91d9988
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c91d9988

Branch: refs/heads/master
Commit: c91d99884a3263d88908c97a1a48ca6178ea7379
Parents: c59a967
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Dec 21 12:31:32 2017 -0500
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Fri Dec 29 10:43:21 2017 -0500

----------------------------------------------------------------------
 .../nifi/serialization/SimpleRecordSchema.java  | 31 +++------
 .../nifi/serialization/record/MapRecord.java    | 11 +++-
 .../nifi/serialization/record/RecordField.java  | 10 ++-
 .../record/ResultSetRecordSet.java              | 10 ++-
 .../record/util/DataTypeUtils.java              |  3 +
 .../org/apache/nifi/util/MockPropertyValue.java |  2 +-
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 26 +++++++-
 .../nifi/processors/standard/QueryRecord.java   |  2 +
 .../nifi/processors/standard/UpdateRecord.java  | 24 ++++---
 .../apache/nifi/queryrecord/FlowFileTable.java  | 36 ++++++++++-
 .../processors/standard/TestUpdateRecord.java   |  3 +
 .../org/apache/nifi/csv/CSVRecordReader.java    | 57 ++++++++++------
 .../apache/nifi/csv/JacksonCSVRecordReader.java | 39 +++++------
 .../nifi/json/JsonTreeRowRecordReader.java      | 68 ++++++++++++++------
 .../org/apache/nifi/json/WriteJsonResult.java   | 10 ++-
 15 files changed, 236 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 871c7bf..5b85f03 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -23,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.OptionalInt;
 import java.util.stream.Collectors;
 
 import org.apache.nifi.serialization.record.DataType;
@@ -33,7 +32,7 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 public class SimpleRecordSchema implements RecordSchema {
     private List<RecordField> fields = null;
-    private Map<String, Integer> fieldIndices = null;
+    private Map<String, RecordField> fieldMap = null;
     private final boolean textAvailable;
     private final String text;
     private final String schemaFormat;
@@ -88,29 +87,25 @@ public class SimpleRecordSchema implements RecordSchema {
     }
 
     public void setFields(final List<RecordField> fields) {
-
         if (this.fields != null) {
             throw new IllegalArgumentException("Fields have already been 
set.");
         }
 
         this.fields = Collections.unmodifiableList(new ArrayList<>(fields));
-        this.fieldIndices = new HashMap<>(fields.size());
+        this.fieldMap = new HashMap<>(fields.size() * 2);
 
-        int index = 0;
         for (final RecordField field : fields) {
-            Integer previousValue = fieldIndices.put(field.getFieldName(), 
index);
+            RecordField previousValue = fieldMap.put(field.getFieldName(), 
field);
             if (previousValue != null) {
                 throw new IllegalArgumentException("Two fields are given with 
the same name (or alias) of '" + field.getFieldName() + "'");
             }
 
             for (final String alias : field.getAliases()) {
-                previousValue = fieldIndices.put(alias, index);
+                previousValue = fieldMap.put(alias, field);
                 if (previousValue != null) {
                     throw new IllegalArgumentException("Two fields are given 
with the same name (or alias) of '" + field.getFieldName() + "'");
                 }
             }
-
-            index++;
         }
     }
 
@@ -138,24 +133,18 @@ public class SimpleRecordSchema implements RecordSchema {
 
     @Override
     public Optional<DataType> getDataType(final String fieldName) {
-        final OptionalInt idx = getFieldIndex(fieldName);
-        return idx.isPresent() ? 
Optional.of(fields.get(idx.getAsInt()).getDataType()) : Optional.empty();
+        final RecordField field = fieldMap.get(fieldName);
+        if (field == null) {
+            return Optional.empty();
+        }
+        return Optional.of(field.getDataType());
     }
 
     @Override
     public Optional<RecordField> getField(final String fieldName) {
-        final OptionalInt indexOption = getFieldIndex(fieldName);
-        if (indexOption.isPresent()) {
-            return Optional.of(fields.get(indexOption.getAsInt()));
-        }
-
-        return Optional.empty();
+        return Optional.ofNullable(fieldMap.get(fieldName));
     }
 
-    private OptionalInt getFieldIndex(final String fieldName) {
-        final Integer index = fieldIndices.get(fieldName);
-        return index == null ? OptionalInt.empty() : OptionalInt.of(index);
-    }
 
     @Override
     public boolean equals(final Object obj) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index c3444ed..0335bd2 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -70,10 +70,10 @@ public class MapRecord implements Record {
 
     private Map<String, Object> checkTypes(final Map<String, Object> values, 
final RecordSchema schema) {
         for (final RecordField field : schema.getFields()) {
-            final Object value = getExplicitValue(field, values);
+            Object value = getExplicitValue(field, values);
 
             if (value == null) {
-                if (field.isNullable()) {
+                if (field.isNullable() || field.getDefaultValue() != null) {
                     continue;
                 }
 
@@ -109,7 +109,12 @@ public class MapRecord implements Record {
         final Object[] values = new Object[schema.getFieldCount()];
         int i = 0;
         for (final RecordField recordField : schema.getFields()) {
-            values[i++] = getValue(recordField);
+            Object value = getExplicitValue(recordField);
+            if (value == null) {
+                value = recordField.getDefaultValue();
+            }
+
+            values[i++] = value;
         }
         return values;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
index 41da6be..b4ff848 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordField.java
@@ -68,7 +68,15 @@ public class RecordField {
 
         this.fieldName = Objects.requireNonNull(fieldName);
         this.dataType = Objects.requireNonNull(dataType);
-        this.aliases = 
Collections.unmodifiableSet(Objects.requireNonNull(aliases));
+
+        // If aliases is the empty set, don't bother with the expense of 
wrapping in an unmodifiableSet.
+        Objects.requireNonNull(aliases);
+        if ((Set<?>) aliases == Collections.EMPTY_SET) {
+            this.aliases = aliases;
+        } else {
+            this.aliases = Collections.unmodifiableSet(aliases);
+        }
+
         this.defaultValue = defaultValue;
         this.nullable = nullable;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
index ad26d79..571bf77 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
@@ -31,6 +31,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.slf4j.Logger;
@@ -174,7 +176,13 @@ public class ResultSetRecordSet implements RecordSet, 
Closeable {
 
                 final Object obj = rs.getObject(columnIndex);
                 if (obj == null || !(obj instanceof Record)) {
-                    return RecordFieldType.RECORD.getDataType();
+                    final List<DataType> dataTypes = 
Stream.of(RecordFieldType.BIGINT, RecordFieldType.BOOLEAN, 
RecordFieldType.BYTE, RecordFieldType.CHAR, RecordFieldType.DATE,
+                        RecordFieldType.DOUBLE, RecordFieldType.FLOAT, 
RecordFieldType.INT, RecordFieldType.LONG, RecordFieldType.SHORT, 
RecordFieldType.STRING, RecordFieldType.TIME,
+                        RecordFieldType.TIMESTAMP)
+                    .map(recordFieldType -> recordFieldType.getDataType())
+                    .collect(Collectors.toList());
+
+                    return RecordFieldType.CHOICE.getChoiceDataType(dataTypes);
                 }
 
                 final Record record = (Record) obj;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index ccd9270..6063d3b 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -891,6 +891,9 @@ public class DataTypeUtils {
         if (otherSchema == null) {
             return thisSchema;
         }
+        if (thisSchema == otherSchema) {
+            return thisSchema;
+        }
 
         final List<RecordField> otherFields = otherSchema.getFields();
         if (otherFields.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
index c55ad23..08c8e97 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockPropertyValue.java
@@ -225,7 +225,7 @@ public class MockPropertyValue implements PropertyValue {
 
     @Override
     public boolean isExpressionLanguagePresent() {
-        if (!expectExpressions) {
+        if (!Boolean.TRUE.equals(expectExpressions)) {
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index c5256c4..cc1fd38 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -27,9 +27,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -150,9 +152,20 @@ public class AvroTypeUtil {
                 final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
                 final List<DataType> options = 
choiceDataType.getPossibleSubTypes();
 
+                // We need to keep track of which types have been added to the 
union, because if we have
+                // two elements in the UNION with the same type, it will fail 
- even if the logical type is
+                // different. So if we have an int and a logical type date 
(which also has a 'concrete type' of int)
+                // then an Exception will be thrown when we try to create the 
union. To avoid this, we just keep track
+                // of the Types and avoid adding it in such a case.
                 final List<Schema> unionTypes = new 
ArrayList<>(options.size());
+                final Set<Type> typesAdded = new HashSet<>();
+
                 for (final DataType option : options) {
-                    unionTypes.add(buildAvroSchema(option, fieldName, false));
+                    final Schema optionSchema = buildAvroSchema(option, 
fieldName, false);
+                    if (!typesAdded.contains(optionSchema.getType())) {
+                        unionTypes.add(optionSchema);
+                        typesAdded.add(optionSchema.getType());
+                    }
                 }
 
                 schema = Schema.createUnion(unionTypes);
@@ -213,6 +226,17 @@ public class AvroTypeUtil {
     }
 
     private static Schema nullable(final Schema schema) {
+        if (schema.getType() == Type.UNION) {
+            final List<Schema> unionTypes = new ArrayList<>(schema.getTypes());
+            final Schema nullSchema = Schema.create(Type.NULL);
+            if (unionTypes.contains(nullSchema)) {
+                return schema;
+            }
+
+            unionTypes.add(nullSchema);
+            return Schema.createUnion(unionTypes);
+        }
+
         return Schema.createUnion(Schema.create(Type.NULL), schema);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 3084b72..5798323 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -453,6 +453,8 @@ public class QueryRecord extends AbstractProcessor {
         return new QueryResult() {
             @Override
             public void close() throws IOException {
+                table.close();
+
                 final BlockingQueue<CachedStatement> statementQueue = 
statementQueues.get(sql);
                 if (statementQueue == null || 
!statementQueue.offer(cachedStatement)) {
                     try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index b2c8002..63e05ab 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -165,17 +165,23 @@ public class UpdateRecord extends AbstractRecordProcessor 
{
                 }
             } else {
                 final PropertyValue replacementValue = 
context.getProperty(recordPathText);
-                final Map<String, String> fieldVariables = new HashMap<>(4);
 
-                result.getSelectedFields().forEach(fieldVal -> {
-                    fieldVariables.clear();
-                    fieldVariables.put(FIELD_NAME, 
fieldVal.getField().getFieldName());
-                    fieldVariables.put(FIELD_VALUE, 
DataTypeUtils.toString(fieldVal.getValue(), (String) null));
-                    fieldVariables.put(FIELD_TYPE, 
fieldVal.getField().getDataType().getFieldType().name());
+                if (replacementValue.isExpressionLanguagePresent()) {
+                    final Map<String, String> fieldVariables = new HashMap<>();
 
-                    final String evaluatedReplacementVal = 
replacementValue.evaluateAttributeExpressions(flowFile, 
fieldVariables).getValue();
-                    fieldVal.updateValue(evaluatedReplacementVal);
-                });
+                    result.getSelectedFields().forEach(fieldVal -> {
+                        fieldVariables.clear();
+                        fieldVariables.put(FIELD_NAME, 
fieldVal.getField().getFieldName());
+                        fieldVariables.put(FIELD_VALUE, 
DataTypeUtils.toString(fieldVal.getValue(), (String) null));
+                        fieldVariables.put(FIELD_TYPE, 
fieldVal.getField().getDataType().getFieldType().name());
+
+                        final String evaluatedReplacementVal = 
replacementValue.evaluateAttributeExpressions(flowFile, 
fieldVariables).getValue();
+                        fieldVal.updateValue(evaluatedReplacementVal);
+                    });
+                } else {
+                    final String evaluatedReplacementVal = 
replacementValue.getValue();
+                    result.getSelectedFields().forEach(fieldVal -> 
fieldVal.updateValue(evaluatedReplacementVal));
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index bd15dc2..c40e364 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -18,9 +18,12 @@ package org.apache.nifi.queryrecord;
 
 import java.io.InputStream;
 import java.lang.reflect.Type;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.AbstractEnumerable;
@@ -47,6 +50,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 
@@ -63,6 +67,8 @@ public class FlowFileTable<S, E> extends AbstractTable 
implements QueryableTable
     private volatile FlowFile flowFile;
     private volatile int maxRecordsRead;
 
+    private final Set<FlowFileEnumerator<?>> enumerators = new HashSet<>();
+
     /**
      * Creates a FlowFile table.
      */
@@ -85,6 +91,14 @@ public class FlowFileTable<S, E> extends AbstractTable 
implements QueryableTable
         return "FlowFileTable";
     }
 
+    public void close() {
+        synchronized (enumerators) {
+            for (final FlowFileEnumerator<?> enumerator : enumerators) {
+                enumerator.close();
+            }
+        }
+    }
+
     /**
      * Returns an enumerable over a given projection of the fields.
      *
@@ -96,7 +110,7 @@ public class FlowFileTable<S, E> extends AbstractTable 
implements QueryableTable
             @Override
             @SuppressWarnings({"unchecked", "rawtypes"})
             public Enumerator<Object> enumerator() {
-                return new FlowFileEnumerator(session, flowFile, logger, 
recordParserFactory, fields) {
+                final FlowFileEnumerator flowFileEnumerator = new 
FlowFileEnumerator(session, flowFile, logger, recordParserFactory, fields) {
                     @Override
                     protected void onFinish() {
                         final int recordCount = getRecordsRead();
@@ -104,7 +118,21 @@ public class FlowFileTable<S, E> extends AbstractTable 
implements QueryableTable
                             maxRecordsRead = recordCount;
                         }
                     }
+
+                    @Override
+                    public void close() {
+                        synchronized (enumerators) {
+                            enumerators.remove(this);
+                        }
+                        super.close();
+                    }
                 };
+
+                synchronized (enumerators) {
+                    enumerators.add(flowFileEnumerator);
+                }
+
+                return flowFileEnumerator;
             }
         };
     }
@@ -203,9 +231,13 @@ public class FlowFileTable<S, E> extends AbstractTable 
implements QueryableTable
             case ARRAY:
                 return typeFactory.createJavaType(Object[].class);
             case RECORD:
-                return typeFactory.createJavaType(Object.class);
+                return typeFactory.createJavaType(Record.class);
             case MAP:
                 return typeFactory.createJavaType(HashMap.class);
+            case BIGINT:
+                return typeFactory.createJavaType(BigInteger.class);
+            case CHOICE:
+                return typeFactory.createJavaType(Object.class);
         }
 
         throw new IllegalArgumentException("Unknown Record Field Type: " + 
fieldType);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
index 6669f4b..33bec74 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java
@@ -64,6 +64,7 @@ public class TestUpdateRecord {
     public void testLiteralReplacementValue() {
         runner.setProperty("/name", "Jane Doe");
         runner.enqueue("");
+        runner.setValidateExpressionUsage(false);
 
         readerService.addRecord("John Doe", 35);
         runner.run();
@@ -188,6 +189,7 @@ public class TestUpdateRecord {
     public void testUpdateInArray() throws InitializationException, 
IOException {
         final JsonTreeReader jsonReader = new JsonTreeReader();
         runner.addControllerService("reader", jsonReader);
+        runner.setValidateExpressionUsage(false);
 
         final String inputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
         final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
@@ -218,6 +220,7 @@ public class TestUpdateRecord {
     public void testUpdateInNullArray() throws InitializationException, 
IOException {
         final JsonTreeReader jsonReader = new JsonTreeReader();
         runner.addControllerService("reader", jsonReader);
+        runner.setValidateExpressionUsage(false);
 
         final String inputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));
         final String outputSchemaText = new 
String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-address.avsc")));

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index 70aaba9..f01fc3e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -23,7 +23,7 @@ import java.io.InputStreamReader;
 import java.io.Reader;
 import java.text.DateFormat;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -41,6 +41,8 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
@@ -53,7 +55,7 @@ public class CSVRecordReader implements RecordReader {
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
-    private List<String> rawFieldNames;
+    private List<RecordField> recordFields;
 
     public CSVRecordReader(final InputStream in, final ComponentLog logger, 
final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, 
final boolean ignoreHeader,
         final String dateFormat, final String timeFormat, final String 
timestampFormat, final String encoding) throws IOException {
@@ -87,31 +89,37 @@ public class CSVRecordReader implements RecordReader {
     public Record nextRecord(final boolean coerceTypes, final boolean 
dropUnknownFields) throws IOException, MalformedRecordException {
         final RecordSchema schema = getSchema();
 
-        final List<String> rawFieldNames = getRawFieldNames();
-        final int numFieldNames = rawFieldNames.size();
+        final List<RecordField> recordFields = getRecordFields();
+        final int numFieldNames = recordFields.size();
 
         for (final CSVRecord csvRecord : csvParser) {
-            final Map<String, Object> values = new LinkedHashMap<>();
+            final Map<String, Object> values = new 
HashMap<>(recordFields.size() * 2);
             for (int i = 0; i < csvRecord.size(); i++) {
-                final String rawFieldName = numFieldNames <= i ? 
"unknown_field_index_" + i : rawFieldNames.get(i);
                 final String rawValue = csvRecord.get(i);
 
-                final Optional<DataType> dataTypeOption = 
schema.getDataType(rawFieldName);
+                final String rawFieldName;
+                final DataType dataType;
+                if (i >= numFieldNames) {
+                    if (!dropUnknownFields) {
+                        values.put("unknown_field_index_" + i, rawValue);
+                    }
 
-                if (!dataTypeOption.isPresent() && dropUnknownFields) {
                     continue;
+                } else {
+                    final RecordField recordField = recordFields.get(i);
+                    rawFieldName = recordField.getFieldName();
+                    dataType = recordField.getDataType();
                 }
 
+
                 final Object value;
-                if (coerceTypes && dataTypeOption.isPresent()) {
-                    value = convert(rawValue, dataTypeOption.get(), 
rawFieldName);
-                } else if (dataTypeOption.isPresent()) {
+                if (coerceTypes) {
+                    value = convert(rawValue, dataType, rawFieldName);
+                } else {
                     // The CSV Reader is going to return all fields as 
Strings, because CSV doesn't have any way to
                     // dictate a field type. As a result, we will use the 
schema that we have to attempt to convert
                     // the value into the desired type if it's a simple type.
-                    value = convertSimpleIfPossible(rawValue, 
dataTypeOption.get(), rawFieldName);
-                } else {
-                    value = rawValue;
+                    value = convertSimpleIfPossible(rawValue, dataType, 
rawFieldName);
                 }
 
                 values.put(rawFieldName, value);
@@ -124,9 +132,9 @@ public class CSVRecordReader implements RecordReader {
     }
 
 
-    private List<String> getRawFieldNames() {
-        if (this.rawFieldNames != null) {
-            return this.rawFieldNames;
+    private List<RecordField> getRecordFields() {
+        if (this.recordFields != null) {
+            return this.recordFields;
         }
 
         // Use a SortedMap keyed by index of the field so that we can get a 
List of field names in the correct order
@@ -135,8 +143,19 @@ public class CSVRecordReader implements RecordReader {
             sortedMap.put(entry.getValue(), entry.getKey());
         }
 
-        this.rawFieldNames = new ArrayList<>(sortedMap.values());
-        return this.rawFieldNames;
+        final List<RecordField> fields = new ArrayList<>();
+        final List<String> rawFieldNames = new ArrayList<>(sortedMap.values());
+        for (final String rawFieldName : rawFieldNames) {
+            final Optional<RecordField> option = schema.getField(rawFieldName);
+            if (option.isPresent()) {
+                fields.add(option.get());
+            } else {
+                fields.add(new RecordField(rawFieldName, 
RecordFieldType.STRING.getDataType()));
+            }
+        }
+
+        this.recordFields = fields;
+        return fields;
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
index a273d0c..91cca81 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/JacksonCSVRecordReader.java
@@ -17,11 +17,19 @@
 
 package org.apache.nifi.csv;
 
-import com.fasterxml.jackson.databind.MappingIterator;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.dataformat.csv.CsvMapper;
-import com.fasterxml.jackson.dataformat.csv.CsvParser;
-import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.io.input.BOMInputStream;
 import org.apache.commons.lang3.CharUtils;
@@ -35,18 +43,11 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
 
 
 public class JacksonCSVRecordReader implements RecordReader {
@@ -140,6 +141,7 @@ public class JacksonCSVRecordReader implements RecordReader 
{
                     }
                 }
             }
+
             // Check for empty lines and ignore them
             boolean foundRecord = true;
             if (csvRecord == null || (csvRecord.length == 1 && 
StringUtils.isEmpty(csvRecord[0]))) {
@@ -154,12 +156,13 @@ public class JacksonCSVRecordReader implements 
RecordReader {
                     }
                 }
             }
+
             // If we didn't find a record, then the end of the file was 
comprised of empty lines, so we have no record to return
             if (!foundRecord) {
                 return null;
             }
 
-            final Map<String, Object> values = new LinkedHashMap<>();
+            final Map<String, Object> values = new 
HashMap<>(rawFieldNames.size() * 2);
             final int numFieldNames = rawFieldNames.size();
             for (int i = 0; i < csvRecord.length; i++) {
                 final String rawFieldName = numFieldNames <= i ? 
"unknown_field_index_" + i : rawFieldNames.get(i);

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 489d114..9e2c965 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -23,7 +23,6 @@ import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -84,31 +83,64 @@ public class JsonTreeRowRecordReader extends 
AbstractJsonRowRecordReader {
         return convertJsonNodeToRecord(jsonNode, schema, fieldNamePrefix, 
coerceTypes, dropUnknown);
     }
 
+    private JsonNode getChildNode(final JsonNode jsonNode, final RecordField 
field) {
+        if (jsonNode.has(field.getFieldName())) {
+            return jsonNode.get(field.getFieldName());
+        }
+
+        for (final String alias : field.getAliases()) {
+            if (jsonNode.has(alias)) {
+                return jsonNode.get(alias);
+            }
+        }
+
+        return null;
+    }
 
     private Record convertJsonNodeToRecord(final JsonNode jsonNode, final 
RecordSchema schema, final String fieldNamePrefix,
             final boolean coerceTypes, final boolean dropUnknown) throws 
IOException, MalformedRecordException {
 
-        final Map<String, Object> values = new LinkedHashMap<>();
-        final Iterator<String> fieldNames = jsonNode.getFieldNames();
-        while (fieldNames.hasNext()) {
-            final String fieldName = fieldNames.next();
-            final JsonNode childNode = jsonNode.get(fieldName);
+        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount() * 2);
 
-            final RecordField recordField = 
schema.getField(fieldName).orElse(null);
-            if (recordField == null && dropUnknown) {
-                continue;
-            }
+        if (dropUnknown) {
+            for (final RecordField recordField : schema.getFields()) {
+                final JsonNode childNode = getChildNode(jsonNode, recordField);
+                if (childNode == null) {
+                    continue;
+                }
+
+                final String fieldName = recordField.getFieldName();
+
+                final Object value;
+                if (coerceTypes) {
+                    final DataType desiredType = recordField.getDataType();
+                    final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
+                    value = convertField(childNode, fullFieldName, 
desiredType, dropUnknown);
+                } else {
+                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType());
+                }
 
-            final Object value;
-            if (coerceTypes && recordField != null) {
-                final DataType desiredType = recordField.getDataType();
-                final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
-                value = convertField(childNode, fullFieldName, desiredType, 
dropUnknown);
-            } else {
-                value = getRawNodeValue(childNode, recordField == null ? null 
: recordField.getDataType());
+                values.put(fieldName, value);
             }
+        } else {
+            final Iterator<String> fieldNames = jsonNode.getFieldNames();
+            while (fieldNames.hasNext()) {
+                final String fieldName = fieldNames.next();
+                final JsonNode childNode = jsonNode.get(fieldName);
+
+                final RecordField recordField = 
schema.getField(fieldName).orElse(null);
+
+                final Object value;
+                if (coerceTypes && recordField != null) {
+                    final DataType desiredType = recordField.getDataType();
+                    final String fullFieldName = fieldNamePrefix == null ? 
fieldName : fieldNamePrefix + fieldName;
+                    value = convertField(childNode, fullFieldName, 
desiredType, dropUnknown);
+                } else {
+                    value = getRawNodeValue(childNode, recordField == null ? 
null : recordField.getDataType());
+                }
 
-            values.put(fieldName, value);
+                values.put(fieldName, value);
+            }
         }
 
         final Supplier<String> supplier = () -> jsonNode.toString();

http://git-wip-us.apache.org/repos/asf/nifi/blob/c91d9988/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 5cfd3ac..fc84181 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigInteger;
 import java.text.DateFormat;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -117,26 +118,31 @@ public class WriteJsonResult extends 
AbstractRecordSetWriter implements RecordSe
     public Map<String, String> writeRecord(final Record record) throws 
IOException {
         // If we are not writing an active record set, then we need to ensure 
that we write the
         // schema information.
+        boolean firstRecord = false;
         if (!isActiveRecordSet()) {
             generator.flush();
             schemaAccess.writeHeader(recordSchema, getOutputStream());
+            firstRecord = true;
         }
 
         writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject(), true);
-        return schemaAccess.getAttributes(recordSchema);
+        return firstRecord ? schemaAccess.getAttributes(recordSchema) : 
Collections.emptyMap();
     }
 
     @Override
     public WriteResult writeRawRecord(final Record record) throws IOException {
         // If we are not writing an active record set, then we need to ensure 
that we write the
         // schema information.
+        boolean firstRecord = false;
         if (!isActiveRecordSet()) {
             generator.flush();
             schemaAccess.writeHeader(recordSchema, getOutputStream());
+            firstRecord = true;
         }
 
         writeRecord(record, recordSchema, generator, g -> 
g.writeStartObject(), g -> g.writeEndObject(), false);
-        return WriteResult.of(incrementRecordCount(), 
schemaAccess.getAttributes(recordSchema));
+        final Map<String, String> attributes = firstRecord ? 
schemaAccess.getAttributes(recordSchema) : Collections.emptyMap();
+        return WriteResult.of(incrementRecordCount(), attributes);
     }
 
     private void writeRecord(final Record record, final RecordSchema 
writeSchema, final JsonGenerator generator,

Reply via email to