Repository: nifi Updated Branches: refs/heads/master ad4c886fb -> 2e1005e88
NIFI-5640: Improved efficiency of Avro Reader and some methods of AvroTypeUtil. Also switched ServiceStateTransition to using read/write locks instead of synchronized blocks because profiling showed that significant time was spent in determining state of a Controller Service when attempting to use it. Switching to a ReadLock should provide better performance there. Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3036 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2e1005e8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2e1005e8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2e1005e8 Branch: refs/heads/master Commit: 2e1005e884cef70ea9c2eb1152d70e546ad2b5c3 Parents: ad4c886 Author: Mark Payne <marka...@hotmail.com> Authored: Thu Sep 27 10:10:48 2018 -0400 Committer: Matthew Burgess <mattyb...@apache.org> Committed: Thu Sep 27 15:38:47 2018 -0400 ---------------------------------------------------------------------- .../nifi/serialization/SimpleRecordSchema.java | 39 ++++-- .../java/org/apache/nifi/avro/AvroTypeUtil.java | 126 ++++++++++--------- .../service/ServiceStateTransition.java | 85 +++++++++---- .../java/org/apache/nifi/avro/AvroReader.java | 19 ++- .../nifi/avro/AvroReaderWithEmbeddedSchema.java | 12 +- .../nifi/avro/AvroReaderWithExplicitSchema.java | 17 ++- .../apache/nifi/avro/NonCachingDatumReader.java | 65 ++++++++++ 7 files changed, 241 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 5b85f03..6926c93 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -17,24 +17,25 @@ package org.apache.nifi.serialization; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class SimpleRecordSchema implements RecordSchema { private List<RecordField> fields = null; private Map<String, RecordField> fieldMap = null; private final boolean textAvailable; - private final String text; + private final AtomicReference<String> text = new AtomicReference<>(); private final String schemaFormat; private final SchemaIdentifier schemaIdentifier; @@ -50,6 +51,10 @@ public class SimpleRecordSchema implements RecordSchema { this(text, schemaFormat, true, id); } + public SimpleRecordSchema(final SchemaIdentifier id) { + this(null, null, false, id); + } + public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) { this(fields, text, schemaFormat, true, id); } @@ -60,7 +65,7 @@ public class SimpleRecordSchema implements RecordSchema { } private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) { - this.text = text; + this.text.set(text); this.schemaFormat = schemaFormat; this.schemaIdentifier = id; this.textAvailable = textAvailable; @@ -69,7 +74,7 @@ public class SimpleRecordSchema implements RecordSchema { @Override public Optional<String> getSchemaText() { if (textAvailable) { - return Optional.ofNullable(text); + return Optional.ofNullable(text.get()); } else { return Optional.empty(); } @@ -121,13 +126,13 @@ public class SimpleRecordSchema implements RecordSchema { @Override public List<DataType> getDataTypes() { - return getFields().stream().map(recordField -> recordField.getDataType()) + return getFields().stream().map(RecordField::getDataType) .collect(Collectors.toList()); } @Override public List<String> getFieldNames() { - return getFields().stream().map(recordField -> recordField.getFieldName()) + return getFields().stream().map(RecordField::getFieldName) .collect(Collectors.toList()); } @@ -189,7 +194,19 @@ public class SimpleRecordSchema implements RecordSchema { @Override public String toString() { - return text; + String textValue = text.get(); + if (textValue != null) { + return textValue; + } + + textValue = createText(fields); + final boolean updated = text.compareAndSet(null, textValue); + + if (updated) { + return textValue; + } else { + return text.get(); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 23f74b8..2e8898a 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -17,28 +17,6 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.sql.Time; -import java.sql.Timestamp; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.avro.Conversions; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; @@ -72,6 +50,27 @@ import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + public class AvroTypeUtil { private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); public static final String AVRO_SCHEMA_FORMAT = "avro"; @@ -308,7 +307,7 @@ public class AvroTypeUtil { if (knownRecordTypes.containsKey(schemaFullName)) { return knownRecordTypes.get(schemaFullName); } else { - SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); + SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY); DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); knownRecordTypes.put(schemaFullName, recordSchemaType); @@ -353,23 +352,33 @@ public class AvroTypeUtil { return null; } - private static List<Schema> getNonNullSubSchemas(Schema avroSchema) { - List<Schema> unionFieldSchemas = avroSchema.getTypes(); + private static List<Schema> getNonNullSubSchemas(final Schema avroSchema) { + final List<Schema> unionFieldSchemas = avroSchema.getTypes(); if (unionFieldSchemas == null) { return Collections.emptyList(); } - return unionFieldSchemas.stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); + + final List<Schema> nonNullTypes = new ArrayList<>(unionFieldSchemas.size()); + for (final Schema fieldSchema : unionFieldSchemas) { + if (fieldSchema.getType() != Type.NULL) { + nonNullTypes.add(fieldSchema); + } + } + + return nonNullTypes; } public static RecordSchema createSchema(final Schema avroSchema) { + return createSchema(avroSchema, true); + } + + public static RecordSchema createSchema(final Schema avroSchema, final boolean includeText) { if (avroSchema == null) { throw new IllegalArgumentException("Avro Schema cannot be null"); } SchemaIdentifier identifier = new StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build(); - return createSchema(avroSchema, avroSchema.toString(), identifier); + return createSchema(avroSchema, includeText ? avroSchema.toString() : null, identifier); } /** @@ -385,10 +394,10 @@ public class AvroTypeUtil { throw new IllegalArgumentException("Avro Schema cannot be null"); } - String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); - SimpleRecordSchema recordSchema = new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); - DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); - Map<String, DataType> knownRecords = new HashMap<>(); + final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); + final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); + final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); + final Map<String, DataType> knownRecords = new HashMap<>(); knownRecords.put(schemaFullName, recordSchemaType); final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); @@ -752,36 +761,39 @@ public class AvroTypeUtil { * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value * @return a converted value */ - private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function<Schema, Object> conversion, final String fieldName) { - // Ignore null types in union - final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); - - // If at least one non-null type exists, find the first compatible type - if (nonNullFieldSchemas.size() >= 1) { - for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { - final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); - try { - final Object convertedValue = conversion.apply(nonNullFieldSchema); - - if (isCompatibleDataType(convertedValue, desiredDataType)) { - return convertedValue; - } + private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function<Schema, Object> conversion, final String fieldName) { + boolean foundNonNull = false; + for (final Schema subSchema : fieldSchema.getTypes()) { + if (subSchema.getType() == Type.NULL) { + continue; + } - // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue - if (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { - return convertedValue; - } - } catch (Exception e) { - // If failed with one of possible types, continue with the next available option. - if (logger.isDebugEnabled()) { - logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); - } + foundNonNull = true; + final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema); + try { + final Object convertedValue = conversion.apply(subSchema); + + if (isCompatibleDataType(convertedValue, desiredDataType)) { + return convertedValue; + } + + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + if (subSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { + return convertedValue; + } + } catch (Exception e) { + // If failed with one of possible types, continue with the next available option. + if (logger.isDebugEnabled()) { + logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); } } + } + if (foundNonNull) { throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() + " because no compatible types exist in the UNION for field " + fieldName); } + return null; } @@ -875,7 +887,7 @@ public class AvroTypeUtil { final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name()); values.put(field.name(), fieldValue); } - final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); + final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false); return new MapRecord(childSchema, values); case BYTES: final ByteBuffer bb = (ByteBuffer) value; http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index 26e3b82..f35550a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -22,6 +22,9 @@ import org.apache.nifi.controller.ComponentNode; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class ServiceStateTransition { private ControllerServiceState state = ControllerServiceState.DISABLED; @@ -29,32 +32,45 @@ public class ServiceStateTransition { private final List<CompletableFuture<?>> disabledFutures = new ArrayList<>(); private final ControllerServiceNode serviceNode; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock writeLock = rwLock.writeLock(); + private final Lock readLock = rwLock.readLock(); public ServiceStateTransition(final ControllerServiceNode serviceNode) { this.serviceNode = serviceNode; } - public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) { - if (expectedState != state) { - return false; + public boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture<?> enabledFuture) { + writeLock.lock(); + try { + if (expectedState != state) { + return false; + } + + state = ControllerServiceState.ENABLING; + enabledFutures.add(enabledFuture); + return true; + } finally { + writeLock.unlock(); } - - state = ControllerServiceState.ENABLING; - enabledFutures.add(enabledFuture); - return true; } - public synchronized boolean enable() { - if (state != ControllerServiceState.ENABLING) { - return false; - } + public boolean enable() { + writeLock.lock(); + try { + if (state != ControllerServiceState.ENABLING) { + return false; + } - state = ControllerServiceState.ENABLED; + state = ControllerServiceState.ENABLED; - validateReferences(serviceNode); + validateReferences(serviceNode); - enabledFutures.stream().forEach(future -> future.complete(null)); - return true; + enabledFutures.stream().forEach(future -> future.complete(null)); + return true; + } finally { + writeLock.unlock(); + } } private void validateReferences(final ControllerServiceNode service) { @@ -64,22 +80,37 @@ public class ServiceStateTransition { } } - public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) { - if (expectedState != state) { - return false; + public boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture<?> disabledFuture) { + writeLock.lock(); + try { + if (expectedState != state) { + return false; + } + + state = ControllerServiceState.DISABLING; + disabledFutures.add(disabledFuture); + return true; + } finally { + writeLock.unlock(); } - - state = ControllerServiceState.DISABLING; - disabledFutures.add(disabledFuture); - return true; } - public synchronized void disable() { - state = ControllerServiceState.DISABLED; - disabledFutures.stream().forEach(future -> future.complete(null)); + public void disable() { + writeLock.lock(); + try { + state = ControllerServiceState.DISABLED; + disabledFutures.stream().forEach(future -> future.complete(null)); + } finally { + writeLock.unlock(); + } } - public synchronized ControllerServiceState getState() { - return state; + public ControllerServiceState getState() { + readLock.lock(); + try { + return state; + } finally { + readLock.unlock(); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index eed37f8..97643aa 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -17,14 +17,6 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -35,12 +27,19 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.SchemaRegistryService; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + @Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) @CapabilityDescription("Parses Avro data and returns each Avro record as an separate Record object. The Avro data may contain the schema itself, " + "or the schema can be externalized and accessed by one of the methods offered by the 'Schema Access Strategy' property.") @@ -83,7 +82,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java index aa61e4c..a5e5ce7 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java @@ -17,16 +17,14 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; - import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.InputStream; + public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { private final DataFileStream<GenericRecord> dataFileStream; private final InputStream in; @@ -35,7 +33,7 @@ public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { public AvroReaderWithEmbeddedSchema(final InputStream in) throws IOException { this.in = in; - dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>()); + dataFileStream = new DataFileStream<>(in, new NonCachingDatumReader<>()); this.avroSchema = dataFileStream.getSchema(); recordSchema = AvroTypeUtil.createSchema(avroSchema); } @@ -56,7 +54,7 @@ public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { } @Override - public RecordSchema getSchema() throws MalformedRecordException { + public RecordSchema getSchema() { return recordSchema; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index ce49443..9197e47 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -17,20 +17,17 @@ package org.apache.nifi.avro; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; @@ -38,11 +35,11 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final BinaryDecoder decoder; private GenericRecord genericRecord; - public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { + public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) { this.in = in; this.recordSchema = recordSchema; - datumReader = new GenericDatumReader<GenericRecord>(avroSchema); + datumReader = new NonCachingDatumReader<>(avroSchema); decoder = DecoderFactory.get().binaryDecoder(in, null); } @@ -67,7 +64,7 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { } @Override - public RecordSchema getSchema() throws MalformedRecordException { + public RecordSchema getSchema() { return recordSchema; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/2e1005e8/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java new file mode 100644 index 0000000..fa4ab7d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + +/** + * Override the GenericDatumReader to provide a much more efficient implementation of #readString. The one that is provided by + * GenericDatumReader performs very poorly in some cases because it uses an IdentityHashMap with the key being the Schema so that + * it can stash away the "stringClass" but that performs far worse than just calling JsonNode#getProp. I.e., {@link #readString(Object, Schema, Decoder)} + * in GenericDatumReader calls #getStringClass, which uses an IdentityHashMap to cache results in order to avoid calling {@link #findStringClass(Schema)}. + * However, {@link #findStringClass(Schema)} is much more efficient than using an IdentityHashMap anyway. Additionally, the performance of {@link #findStringClass(Schema)}} + * can be improved slightly and made more readable. + */ +public class NonCachingDatumReader<T> extends GenericDatumReader<T> { + public NonCachingDatumReader() { + super(); + } + + public NonCachingDatumReader(final Schema schema) { + super(schema); + } + + @Override + protected Object readString(final Object old, final Schema expected, final Decoder in) throws IOException { + final Class<?> stringClass = findStringClass(expected); + if (stringClass == String.class) { + return in.readString(); + } + + if (stringClass == CharSequence.class) { + return readString(old, in); + } + + return newInstanceFromString(stringClass, in.readString()); + } + + protected Class findStringClass(Schema schema) { + final String name = schema.getProp(GenericData.STRING_PROP); + if ("String".equals(name)) { + return String.class; + } + + return CharSequence.class; + } +}