AaronLeon commented on a change in pull request #2718: NIFI-5213: Allow AvroReader to process files w embedded schema even when the access strategy is explicit schema URL: https://github.com/apache/nifi/pull/2718#discussion_r261772316
########## File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java ########## @@ -17,33 +17,61 @@ package org.apache.nifi.avro; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.SequenceInputStream; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.commons.io.input.TeeInputStream; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; private final DatumReader<GenericRecord> datumReader; - private final BinaryDecoder decoder; + private BinaryDecoder decoder; private GenericRecord genericRecord; + private DataFileStream<GenericRecord> dataFileStream; - public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { + public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException { this.in = in; this.recordSchema = recordSchema; - datumReader = new GenericDatumReader<GenericRecord>(avroSchema); - decoder = DecoderFactory.get().binaryDecoder(in, null); + datumReader = new GenericDatumReader<>(avroSchema); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + TeeInputStream teeInputStream = new TeeInputStream(in, baos); + // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream Review comment: It might be of interest to reverse the try/catch block. I understand the intuition to determine first whether the schema is embedded since that's the order in which the bytes are encoded, but there is an embedded Avro schema strategy for a reason. From my understanding, a user who chooses to use AvroReaderWithExplicitSchema over AvroReaderWithEmbeddedSchema is expecting to use an Explicit Schema to decode Avro data without an embedded schema and the exceptional case is that the data contains an embedded schema. The current implementation forces that user's Flow to propagate an exception each time, which may be expensive. Maybe it's negligible with sufficient batching of records, but just a nit-picking thought. The TeeInputStream construct is pretty neat though :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services