Repository: nifi
Updated Branches:
  refs/heads/master ad4c886fb -> 2e1005e88


NIFI-5640: Improved efficiency of Avro Reader and some methods of AvroTypeUtil. 
Also switched ServiceStateTransition to using read/write locks instead of 
synchronized blocks because profiling showed that significant time was spent in 
determining state of a Controller Service when attempting to use it. Switching 
to a ReadLock should provide better performance there.

Signed-off-by: Matthew Burgess <mattyb...@apache.org>

This closes #3036


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2e1005e8
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2e1005e8
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2e1005e8

Branch: refs/heads/master
Commit: 2e1005e884cef70ea9c2eb1152d70e546ad2b5c3
Parents: ad4c886
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Sep 27 10:10:48 2018 -0400
Committer: Matthew Burgess <mattyb...@apache.org>
Committed: Thu Sep 27 15:38:47 2018 -0400

----------------------------------------------------------------------
 .../nifi/serialization/SimpleRecordSchema.java  |  39 ++++--
 .../java/org/apache/nifi/avro/AvroTypeUtil.java | 126 ++++++++++---------
 .../service/ServiceStateTransition.java         |  85 +++++++++----
 .../java/org/apache/nifi/avro/AvroReader.java   |  19 ++-
 .../nifi/avro/AvroReaderWithEmbeddedSchema.java |  12 +-
 .../nifi/avro/AvroReaderWithExplicitSchema.java |  17 ++-
 .../apache/nifi/avro/NonCachingDatumReader.java |  65 ++++++++++
 7 files changed, 241 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 5b85f03..6926c93 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -17,24 +17,25 @@
 
 package org.apache.nifi.serialization;
 
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class SimpleRecordSchema implements RecordSchema {
     private List<RecordField> fields = null;
     private Map<String, RecordField> fieldMap = null;
     private final boolean textAvailable;
-    private final String text;
+    private final AtomicReference<String> text = new AtomicReference<>();
     private final String schemaFormat;
     private final SchemaIdentifier schemaIdentifier;
 
@@ -50,6 +51,10 @@ public class SimpleRecordSchema implements RecordSchema {
         this(text, schemaFormat, true, id);
     }
 
+    public SimpleRecordSchema(final SchemaIdentifier id) {
+        this(null, null, false, id);
+    }
+
     public SimpleRecordSchema(final List<RecordField> fields, final String 
text, final String schemaFormat, final SchemaIdentifier id) {
         this(fields, text, schemaFormat, true, id);
     }
@@ -60,7 +65,7 @@ public class SimpleRecordSchema implements RecordSchema {
     }
 
     private SimpleRecordSchema(final String text, final String schemaFormat, 
final boolean textAvailable, final SchemaIdentifier id) {
-        this.text = text;
+        this.text.set(text);
         this.schemaFormat = schemaFormat;
         this.schemaIdentifier = id;
         this.textAvailable = textAvailable;
@@ -69,7 +74,7 @@ public class SimpleRecordSchema implements RecordSchema {
     @Override
     public Optional<String> getSchemaText() {
         if (textAvailable) {
-            return Optional.ofNullable(text);
+            return Optional.ofNullable(text.get());
         } else {
             return Optional.empty();
         }
@@ -121,13 +126,13 @@ public class SimpleRecordSchema implements RecordSchema {
 
     @Override
     public List<DataType> getDataTypes() {
-        return getFields().stream().map(recordField -> 
recordField.getDataType())
+        return getFields().stream().map(RecordField::getDataType)
             .collect(Collectors.toList());
     }
 
     @Override
     public List<String> getFieldNames() {
-        return getFields().stream().map(recordField -> 
recordField.getFieldName())
+        return getFields().stream().map(RecordField::getFieldName)
             .collect(Collectors.toList());
     }
 
@@ -189,7 +194,19 @@ public class SimpleRecordSchema implements RecordSchema {
 
     @Override
     public String toString() {
-        return text;
+        String textValue = text.get();
+        if (textValue != null) {
+            return textValue;
+        }
+
+        textValue = createText(fields);
+        final boolean updated = text.compareAndSet(null, textValue);
+
+        if (updated) {
+            return textValue;
+        } else {
+            return text.get();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 23f74b8..2e8898a 100755
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -17,28 +17,6 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.time.Duration;
-import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import org.apache.avro.Conversions;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.LogicalType;
@@ -72,6 +50,27 @@ import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
 public class AvroTypeUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(AvroTypeUtil.class);
     public static final String AVRO_SCHEMA_FORMAT = "avro";
@@ -308,7 +307,7 @@ public class AvroTypeUtil {
                 if (knownRecordTypes.containsKey(schemaFullName)) {
                     return knownRecordTypes.get(schemaFullName);
                 } else {
-                    SimpleRecordSchema recordSchema = new 
SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, 
SchemaIdentifier.EMPTY);
+                    SimpleRecordSchema recordSchema = new 
SimpleRecordSchema(SchemaIdentifier.EMPTY);
                     DataType recordSchemaType = 
RecordFieldType.RECORD.getRecordDataType(recordSchema);
                     knownRecordTypes.put(schemaFullName, recordSchemaType);
 
@@ -353,23 +352,33 @@ public class AvroTypeUtil {
         return null;
     }
 
-    private static List<Schema> getNonNullSubSchemas(Schema avroSchema) {
-        List<Schema> unionFieldSchemas = avroSchema.getTypes();
+    private static List<Schema> getNonNullSubSchemas(final Schema avroSchema) {
+        final List<Schema> unionFieldSchemas = avroSchema.getTypes();
         if (unionFieldSchemas == null) {
             return Collections.emptyList();
         }
-        return unionFieldSchemas.stream()
-                .filter(s -> s.getType() != Type.NULL)
-                .collect(Collectors.toList());
+
+        final List<Schema> nonNullTypes = new 
ArrayList<>(unionFieldSchemas.size());
+        for (final Schema fieldSchema : unionFieldSchemas) {
+            if (fieldSchema.getType() != Type.NULL) {
+                nonNullTypes.add(fieldSchema);
+            }
+        }
+
+        return nonNullTypes;
     }
 
     public static RecordSchema createSchema(final Schema avroSchema) {
+        return createSchema(avroSchema, true);
+    }
+
+    public static RecordSchema createSchema(final Schema avroSchema, final 
boolean includeText) {
         if (avroSchema == null) {
             throw new IllegalArgumentException("Avro Schema cannot be null");
         }
 
         SchemaIdentifier identifier = new 
StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build();
-        return createSchema(avroSchema, avroSchema.toString(), identifier);
+        return createSchema(avroSchema, includeText ? avroSchema.toString() : 
null, identifier);
     }
 
     /**
@@ -385,10 +394,10 @@ public class AvroTypeUtil {
             throw new IllegalArgumentException("Avro Schema cannot be null");
         }
 
-        String schemaFullName = avroSchema.getNamespace() + "." + 
avroSchema.getName();
-        SimpleRecordSchema recordSchema = new SimpleRecordSchema(schemaText, 
AVRO_SCHEMA_FORMAT, schemaId);
-        DataType recordSchemaType = 
RecordFieldType.RECORD.getRecordDataType(recordSchema);
-        Map<String, DataType> knownRecords = new HashMap<>();
+        final String schemaFullName = avroSchema.getNamespace() + "." + 
avroSchema.getName();
+        final SimpleRecordSchema recordSchema = schemaText == null ? new 
SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, 
AVRO_SCHEMA_FORMAT, schemaId);
+        final DataType recordSchemaType = 
RecordFieldType.RECORD.getRecordDataType(recordSchema);
+        final Map<String, DataType> knownRecords = new HashMap<>();
         knownRecords.put(schemaFullName, recordSchemaType);
 
         final List<RecordField> recordFields = new 
ArrayList<>(avroSchema.getFields().size());
@@ -752,36 +761,39 @@ public class AvroTypeUtil {
      * @param conversion the conversion function which takes a non-null field 
schema within the union field and returns a converted value
      * @return a converted value
      */
-    private static Object convertUnionFieldValue(Object originalValue, Schema 
fieldSchema, Function<Schema, Object> conversion, final String fieldName) {
-        // Ignore null types in union
-        final List<Schema> nonNullFieldSchemas = 
getNonNullSubSchemas(fieldSchema);
-
-        // If at least one non-null type exists, find the first compatible type
-        if (nonNullFieldSchemas.size() >= 1) {
-            for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
-                final DataType desiredDataType = 
AvroTypeUtil.determineDataType(nonNullFieldSchema);
-                try {
-                    final Object convertedValue = 
conversion.apply(nonNullFieldSchema);
-
-                    if (isCompatibleDataType(convertedValue, desiredDataType)) 
{
-                        return convertedValue;
-                    }
+    private static Object convertUnionFieldValue(final Object originalValue, 
final Schema fieldSchema, final Function<Schema, Object> conversion, final 
String fieldName) {
+        boolean foundNonNull = false;
+        for (final Schema subSchema : fieldSchema.getTypes()) {
+            if (subSchema.getType() == Type.NULL) {
+                continue;
+            }
 
-                    // For logical types those store with different type (e.g. 
BigDecimal as ByteBuffer), check compatibility using the original rawValue
-                    if (nonNullFieldSchema.getLogicalType() != null && 
DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
-                        return convertedValue;
-                    }
-                } catch (Exception e) {
-                    // If failed with one of possible types, continue with the 
next available option.
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Cannot convert value {} to type {}", 
originalValue, desiredDataType, e);
-                    }
+            foundNonNull = true;
+            final DataType desiredDataType = 
AvroTypeUtil.determineDataType(subSchema);
+            try {
+                final Object convertedValue = conversion.apply(subSchema);
+
+                if (isCompatibleDataType(convertedValue, desiredDataType)) {
+                    return convertedValue;
+                }
+
+                // For logical types those store with different type (e.g. 
BigDecimal as ByteBuffer), check compatibility using the original rawValue
+                if (subSchema.getLogicalType() != null && 
DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
+                    return convertedValue;
+                }
+            } catch (Exception e) {
+                // If failed with one of possible types, continue with the 
next available option.
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Cannot convert value {} to type {}", 
originalValue, desiredDataType, e);
                 }
             }
+        }
 
+        if (foundNonNull) {
             throw new IllegalTypeConversionException("Cannot convert value " + 
originalValue + " of type " + originalValue.getClass()
                 + " because no compatible types exist in the UNION for field " 
+ fieldName);
         }
+
         return null;
     }
 
@@ -875,7 +887,7 @@ public class AvroTypeUtil {
                     final Object fieldValue = normalizeValue(avroFieldValue, 
field.schema(), fieldName + "/" + field.name());
                     values.put(field.name(), fieldValue);
                 }
-                final RecordSchema childSchema = 
AvroTypeUtil.createSchema(recordSchema);
+                final RecordSchema childSchema = 
AvroTypeUtil.createSchema(recordSchema, false);
                 return new MapRecord(childSchema, values);
             case BYTES:
                 final ByteBuffer bb = (ByteBuffer) value;

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
index 26e3b82..f35550a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java
@@ -22,6 +22,9 @@ import org.apache.nifi.controller.ComponentNode;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class ServiceStateTransition {
     private ControllerServiceState state = ControllerServiceState.DISABLED;
@@ -29,32 +32,45 @@ public class ServiceStateTransition {
     private final List<CompletableFuture<?>> disabledFutures = new 
ArrayList<>();
 
     private final ControllerServiceNode serviceNode;
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    private final Lock writeLock = rwLock.writeLock();
+    private final Lock readLock = rwLock.readLock();
 
     public ServiceStateTransition(final ControllerServiceNode serviceNode) {
         this.serviceNode = serviceNode;
     }
 
-    public synchronized boolean transitionToEnabling(final 
ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) 
{
-        if (expectedState != state) {
-            return false;
+    public boolean transitionToEnabling(final ControllerServiceState 
expectedState, final CompletableFuture<?> enabledFuture) {
+        writeLock.lock();
+        try {
+            if (expectedState != state) {
+                return false;
+            }
+
+            state = ControllerServiceState.ENABLING;
+            enabledFutures.add(enabledFuture);
+            return true;
+        } finally {
+            writeLock.unlock();
         }
-
-        state = ControllerServiceState.ENABLING;
-        enabledFutures.add(enabledFuture);
-        return true;
     }
 
-    public synchronized boolean enable() {
-        if (state != ControllerServiceState.ENABLING) {
-            return false;
-        }
+    public boolean enable() {
+        writeLock.lock();
+        try {
+            if (state != ControllerServiceState.ENABLING) {
+                return false;
+            }
 
-        state = ControllerServiceState.ENABLED;
+            state = ControllerServiceState.ENABLED;
 
-        validateReferences(serviceNode);
+            validateReferences(serviceNode);
 
-        enabledFutures.stream().forEach(future -> future.complete(null));
-        return true;
+            enabledFutures.stream().forEach(future -> future.complete(null));
+            return true;
+        } finally {
+            writeLock.unlock();
+        }
     }
 
     private void validateReferences(final ControllerServiceNode service) {
@@ -64,22 +80,37 @@ public class ServiceStateTransition {
         }
     }
 
-    public synchronized boolean transitionToDisabling(final 
ControllerServiceState expectedState, final CompletableFuture<?> 
disabledFuture) {
-        if (expectedState != state) {
-            return false;
+    public boolean transitionToDisabling(final ControllerServiceState 
expectedState, final CompletableFuture<?> disabledFuture) {
+        writeLock.lock();
+        try {
+            if (expectedState != state) {
+                return false;
+            }
+
+            state = ControllerServiceState.DISABLING;
+            disabledFutures.add(disabledFuture);
+            return true;
+        } finally {
+            writeLock.unlock();
         }
-
-        state = ControllerServiceState.DISABLING;
-        disabledFutures.add(disabledFuture);
-        return true;
     }
 
-    public synchronized void disable() {
-        state = ControllerServiceState.DISABLED;
-        disabledFutures.stream().forEach(future -> future.complete(null));
+    public void disable() {
+        writeLock.lock();
+        try {
+            state = ControllerServiceState.DISABLED;
+            disabledFutures.stream().forEach(future -> future.complete(null));
+        } finally {
+            writeLock.unlock();
+        }
     }
 
-    public synchronized ControllerServiceState getState() {
-        return state;
+    public ControllerServiceState getState() {
+        readLock.lock();
+        try {
+            return state;
+        } finally {
+            readLock.unlock();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index eed37f8..97643aa 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -17,14 +17,6 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
 import org.apache.avro.Schema;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -35,12 +27,19 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.SchemaRegistryService;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
 @Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", 
"separated", "values"})
 @CapabilityDescription("Parses Avro data and returns each Avro record as an 
separate Record object. The Avro data may contain the schema itself, "
     + "or the schema can be externalized and accessed by one of the methods 
offered by the 'Schema Access Strategy' property.")
@@ -83,7 +82,7 @@ public class AvroReader extends SchemaRegistryService 
implements RecordReaderFac
     }
 
     @Override
-    public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final ComponentLog logger) throws 
MalformedRecordException, IOException, SchemaNotFoundException {
+    public RecordReader createRecordReader(final Map<String, String> 
variables, final InputStream in, final ComponentLog logger) throws IOException, 
SchemaNotFoundException {
         final String schemaAccessStrategy = 
getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
             return new AvroReaderWithEmbeddedSchema(in);

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
index aa61e4c..a5e5ce7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java
@@ -17,16 +17,14 @@
 
 package org.apache.nifi.avro;
 
-import java.io.IOException;
-import java.io.InputStream;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.InputStream;
+
 public class AvroReaderWithEmbeddedSchema extends AvroRecordReader {
     private final DataFileStream<GenericRecord> dataFileStream;
     private final InputStream in;
@@ -35,7 +33,7 @@ public class AvroReaderWithEmbeddedSchema extends 
AvroRecordReader {
 
     public AvroReaderWithEmbeddedSchema(final InputStream in) throws 
IOException {
         this.in = in;
-        dataFileStream = new DataFileStream<>(in, new 
GenericDatumReader<GenericRecord>());
+        dataFileStream = new DataFileStream<>(in, new 
NonCachingDatumReader<>());
         this.avroSchema = dataFileStream.getSchema();
         recordSchema = AvroTypeUtil.createSchema(avroSchema);
     }
@@ -56,7 +54,7 @@ public class AvroReaderWithEmbeddedSchema extends 
AvroRecordReader {
     }
 
     @Override
-    public RecordSchema getSchema() throws MalformedRecordException {
+    public RecordSchema getSchema() {
         return recordSchema;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
index ce49443..9197e47 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
@@ -17,20 +17,17 @@
 
 package org.apache.nifi.avro;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
 public class AvroReaderWithExplicitSchema extends AvroRecordReader {
     private final InputStream in;
     private final RecordSchema recordSchema;
@@ -38,11 +35,11 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
     private final BinaryDecoder decoder;
     private GenericRecord genericRecord;
 
-    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
+    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) {
         this.in = in;
         this.recordSchema = recordSchema;
 
-        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
+        datumReader = new NonCachingDatumReader<>(avroSchema);
         decoder = DecoderFactory.get().binaryDecoder(in, null);
     }
 
@@ -67,7 +64,7 @@ public class AvroReaderWithExplicitSchema extends 
AvroRecordReader {
     }
 
     @Override
-    public RecordSchema getSchema() throws MalformedRecordException {
+    public RecordSchema getSchema() {
         return recordSchema;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
new file mode 100644
index 0000000..fa4ab7d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.Decoder;
+
+import java.io.IOException;
+
+/**
+ * Override the GenericDatumReader to provide a much more efficient 
implementation of #readString. The one that is provided by
+ * GenericDatumReader performs very poorly in some cases because it uses an 
IdentityHashMap with the key being the Schema so that
+ * it can stash away the "stringClass" but that performs far worse than just 
calling JsonNode#getProp. I.e., {@link #readString(Object, Schema, Decoder)}
+ * in GenericDatumReader calls #getStringClass, which uses an IdentityHashMap 
to cache results in order to avoid calling {@link #findStringClass(Schema)}.
+ * However, {@link #findStringClass(Schema)} is much more efficient than using 
an IdentityHashMap anyway. Additionally, the performance of {@link 
#findStringClass(Schema)}}
+ * can be improved slightly and made more readable.
+ */
+public class NonCachingDatumReader<T> extends GenericDatumReader<T> {
+    public NonCachingDatumReader() {
+        super();
+    }
+
+    public NonCachingDatumReader(final Schema schema) {
+        super(schema);
+    }
+
+    @Override
+    protected Object readString(final Object old, final Schema expected, final 
Decoder in) throws IOException {
+        final Class<?> stringClass = findStringClass(expected);
+        if (stringClass == String.class) {
+            return in.readString();
+        }
+
+        if (stringClass == CharSequence.class) {
+            return readString(old, in);
+        }
+
+        return newInstanceFromString(stringClass, in.readString());
+    }
+
+    protected Class findStringClass(Schema schema) {
+        final String name = schema.getProp(GenericData.STRING_PROP);
+        if ("String".equals(name)) {
+            return String.class;
+        }
+
+        return CharSequence.class;
+    }
+}

Reply via email to