This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 24e50953a3e2dde4cda0261e4f7dba083d5ed602 Author: Peter Turcsanyi <turcsa...@cloudera.com> AuthorDate: Thu Jul 4 18:48:36 2019 +0200 NIFI-6419: Fixed AvroWriter single record with external schema results in data loss This closes #3573. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> --- .../nifi-record-serialization-services/pom.xml | 1 + .../org/apache/nifi/avro/TestWriteAvroResult.java | 69 ++++++++++++++++++++++ .../nifi/avro/TestWriteAvroResultWithSchema.java | 18 +++++- .../avro/TestWriteAvroResultWithoutSchema.java | 15 ++++- .../src/test/resources/avro/simple.avsc | 8 +++ 5 files changed, 109 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 06fdb6d..e32ec28 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -136,6 +136,7 @@ <exclude>src/test/resources/avro/logical-types.avsc</exclude> <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude> <exclude>src/test/resources/avro/multiple-types.avsc</exclude> + <exclude>src/test/resources/avro/simple.avsc</exclude> <exclude>src/test/resources/csv/extra-white-space.csv</exclude> <exclude>src/test/resources/csv/multi-bank-account.csv</exclude> <exclude>src/test/resources/csv/single-bank-account.csv</exclude> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java index 4751f74..d3e5f6c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java @@ -26,6 +26,7 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.ListRecordSet; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -66,10 +67,78 @@ public abstract class TestWriteAvroResult { protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException; + protected abstract List<GenericRecord> readRecords(InputStream in, Schema schema, int recordCount) throws IOException; + protected void verify(final WriteResult writeResult) { } @Test + public void testWriteRecord() throws IOException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc")); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType())); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final Map<String, Object> values = new HashMap<>(); + values.put("msg", "nifi"); + final Record record = new MapRecord(recordSchema, values); + + try (final RecordSetWriter writer = createWriter(schema, baos)) { + writer.write(record); + } + + final byte[] data = baos.toByteArray(); + + try (final InputStream in = new ByteArrayInputStream(data)) { + final GenericRecord avroRecord = readRecord(in, schema); + + assertNotNull(avroRecord); + assertNotNull(avroRecord.get("msg")); + assertEquals("nifi", avroRecord.get("msg").toString()); + } + } + + @Test + public void testWriteRecordSet() throws IOException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc")); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType())); + final RecordSchema recordSchema = new SimpleRecordSchema(fields); + + final int recordCount = 3; + List<Record> records = new ArrayList<>(); + for (int i = 0; i < recordCount; i++){ + final Map<String, Object> values = new HashMap<>(); + values.put("msg", "nifi" + i); + final Record record = new MapRecord(recordSchema, values); + records.add(record); + } + + try (final RecordSetWriter writer = createWriter(schema, baos)) { + writer.write(new ListRecordSet(recordSchema, records)); + } + + final byte[] data = baos.toByteArray(); + + try (final InputStream in = new ByteArrayInputStream(data)) { + final List<GenericRecord> avroRecords = readRecords(in, schema, recordCount); + for (int i = 0; i < recordCount; i++) { + final GenericRecord avroRecord = avroRecords.get(i); + + assertNotNull(avroRecord); + assertNotNull(avroRecord.get("msg")); + assertEquals("nifi" + i, avroRecord.get("msg").toString()); + } + } + } + + @Test public void testLogicalTypes() throws IOException, ParseException { final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc")); testLogicalTypes(schema); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java index b3eecde..21787b4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java @@ -20,6 +20,8 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; @@ -39,11 +41,25 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult { @Override protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException { - final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>()); + final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>()); final Schema avroSchema = dataFileStream.getSchema(); GenericData.setStringType(avroSchema, StringType.String); final GenericRecord avroRecord = dataFileStream.next(); return avroRecord; } + + @Override + protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException { + final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>()); + final Schema avroSchema = dataFileStream.getSchema(); + GenericData.setStringType(avroSchema, StringType.String); + + List<GenericRecord> records = new ArrayList<>(); + for (int i = 0; i < recordCount; i++) { + records.add(dataFileStream.next()); + } + + return records; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java index c592df0..9d86df5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java @@ -65,11 +65,24 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult { @Override protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException { final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); - final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); + final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); return reader.read(null, decoder); } @Override + protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException { + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null); + final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); + + List<GenericRecord> records = new ArrayList<>(); + for (int i = 0; i < recordCount; i++) { + records.add(reader.read(null, decoder)); + } + + return records; + } + + @Override protected void verify(final WriteResult writeResult) { final Map<String, String> attributes = writeResult.getAttributes(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc new file mode 100644 index 0000000..06f7aa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc @@ -0,0 +1,8 @@ +{ + "namespace": "nifi", + "name": "simple", + "type": "record", + "fields": [ + {"name": "msg", "type": "string"} + ] +}