AaronLeon commented on a change in pull request #2718: NIFI-5213: Allow 
AvroReader to process files w embedded schema even when the access strategy is 
explicit schema
URL: https://github.com/apache/nifi/pull/2718#discussion_r261772316
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 ##########
 @@ -17,33 +17,61 @@
 
 package org.apache.nifi.avro;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.SequenceInputStream;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 public class AvroReaderWithExplicitSchema extends AvroRecordReader {
     private final InputStream in;
     private final RecordSchema recordSchema;
     private final DatumReader<GenericRecord> datumReader;
-    private final BinaryDecoder decoder;
+    private BinaryDecoder decoder;
     private GenericRecord genericRecord;
+    private DataFileStream<GenericRecord> dataFileStream;
 
-    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
+    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
         this.in = in;
         this.recordSchema = recordSchema;
 
-        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
-        decoder = DecoderFactory.get().binaryDecoder(in, null);
+        datumReader = new GenericDatumReader<>(avroSchema);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
 
 Review comment:
   It might be of interest to reverse the try/catch block. I understand the 
intuition to determine first whether the schema is embedded since that's the 
order in which the bytes are encoded, but there is an embedded Avro schema 
strategy for a reason. 
   
   From my understanding, a user who chooses to use 
AvroReaderWithExplicitSchema over AvroReaderWithEmbeddedSchema  is expecting to 
use an Explicit Schema to decode Avro data without an embedded schema and the 
exceptional case is that the data contains an embedded schema. The current 
implementation forces that user's Flow to propagate an exception each time, 
which may be expensive. Maybe it's negligible with sufficient batching of 
records, but just a nit-picking thought. The TeeInputStream construct is pretty 
neat though :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to