Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2718#discussion_r194590103
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 ---
    @@ -17,33 +17,61 @@
     
     package org.apache.nifi.avro;
     
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
     import java.io.EOFException;
     import java.io.IOException;
     import java.io.InputStream;
    +import java.io.SequenceInputStream;
     
     import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
     import org.apache.avro.generic.GenericDatumReader;
     import org.apache.avro.generic.GenericRecord;
     import org.apache.avro.io.BinaryDecoder;
     import org.apache.avro.io.DatumReader;
     import org.apache.avro.io.DecoderFactory;
    -import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.commons.io.input.TeeInputStream;
     import org.apache.nifi.serialization.MalformedRecordException;
     import org.apache.nifi.serialization.record.RecordSchema;
     
     public class AvroReaderWithExplicitSchema extends AvroRecordReader {
         private final InputStream in;
         private final RecordSchema recordSchema;
         private final DatumReader<GenericRecord> datumReader;
    -    private final BinaryDecoder decoder;
    +    private BinaryDecoder decoder;
         private GenericRecord genericRecord;
    +    private DataFileStream<GenericRecord> dataFileStream;
     
    -    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException, 
SchemaNotFoundException {
    +    public AvroReaderWithExplicitSchema(final InputStream in, final 
RecordSchema recordSchema, final Schema avroSchema) throws IOException {
             this.in = in;
             this.recordSchema = recordSchema;
     
    -        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
    -        decoder = DecoderFactory.get().binaryDecoder(in, null);
    +        datumReader = new GenericDatumReader<>(avroSchema);
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
    +        // Try to parse as a DataFileStream, if it works, glue the streams 
back together and delegate calls to the DataFileStream
    +        try {
    +            dataFileStream = new DataFileStream<>(teeInputStream, new 
GenericDatumReader<>());
    +        } catch (IOException ioe) {
    +            // Carry on, hopefully a raw Avro file
    +            // Need to be able to re-read the bytes read so far, and the 
InputStream passed in doesn't support reset. Use the TeeInputStream in
    +            // conjunction with SequenceInputStream to glue the two 
streams back together for future reading
    +            ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
    +            SequenceInputStream sis = new SequenceInputStream(bais, in);
    +            decoder = DecoderFactory.get().binaryDecoder(sis, null);
    +        }
    +        if (dataFileStream != null) {
    +            // Verify the schemas are the same
    +            Schema embeddedSchema = dataFileStream.getSchema();
    +            if (!embeddedSchema.equals(avroSchema)) {
    +                throw new IOException("Explicit schema does not match 
embedded schema");
    --- End diff --
    
    I thought schema evolution was supported in other ways such as including 
optional (possibly missing) fields to support a transition to/from 
additional/deleted fields, but I admit I don't have my mind wrapped around the 
whole thing. In this case it was driven by the Avro API, if the file has a 
schema, there is a much more fluent API to read the records than if it does 
not. That is not for the case when someone wants to impose a schema on a file 
that already has a schema; I'm not sure that's a case for schema evolution 
(i.e. the embedded schema is not correct?), the alternate API is for "raw" Avro 
files that don't have an embedded schema, and instead need an external one for 
processing. TBH I don't know how to parse an Avro file that has an embedded 
schema with their API by imposing an external one. This was the middle ground 
to allow it as long as the external schema matched the embedded one. Thoughts?


---

Reply via email to