This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 24e50953a3e2dde4cda0261e4f7dba083d5ed602
Author: Peter Turcsanyi
AuthorDate: Thu Jul 4 18:48:36 2019 +0200
NIFI-6419: Fixed AvroWriter single record with external schema results in
data loss
This closes #3573.
Signed-off-by: Koji Kawamura
---
.../nifi-record-serialization-services/pom.xml | 1 +
.../org/apache/nifi/avro/TestWriteAvroResult.java | 69 ++
.../nifi/avro/TestWriteAvroResultWithSchema.java | 18 +-
.../avro/TestWriteAvroResultWithoutSchema.java | 15 -
.../src/test/resources/avro/simple.avsc| 8 +++
5 files changed, 109 insertions(+), 2 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 06fdb6d..e32ec28 100755
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -136,6 +136,7 @@
src/test/resources/avro/logical-types.avsc
src/test/resources/avro/logical-types-nullable.avsc
src/test/resources/avro/multiple-types.avsc
+src/test/resources/avro/simple.avsc
src/test/resources/csv/extra-white-space.csv
src/test/resources/csv/multi-bank-account.csv
src/test/resources/csv/single-bank-account.csv
diff --git
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 4751f74..d3e5f6c 100644
---
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -66,10 +67,78 @@ public abstract class TestWriteAvroResult {
protected abstract GenericRecord readRecord(InputStream in, Schema schema)
throws IOException;
+protected abstract List readRecords(InputStream in, Schema
schema, int recordCount) throws IOException;
+
protected void verify(final WriteResult writeResult) {
}
@Test
+public void testWriteRecord() throws IOException {
+final Schema schema = new Schema.Parser().parse(new
File("src/test/resources/avro/simple.avsc"));
+
+final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+final List fields = new ArrayList<>();
+fields.add(new RecordField("msg",
RecordFieldType.STRING.getDataType()));
+final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+final Map values = new HashMap<>();
+values.put("msg", "nifi");
+final Record record = new MapRecord(recordSchema, values);
+
+try (final RecordSetWriter writer = createWriter(schema, baos)) {
+writer.write(record);
+}
+
+final byte[] data = baos.toByteArray();
+
+try (final InputStream in = new ByteArrayInputStream(data)) {
+final GenericRecord avroRecord = readRecord(in, schema);
+
+assertNotNull(avroRecord);
+assertNotNull(avroRecord.get("msg"));
+assertEquals("nifi", avroRecord.get("msg").toString());
+}
+}
+
+@Test
+public void testWriteRecordSet() throws IOException {
+final Schema schema = new Schema.Parser().parse(new
File("src/test/resources/avro/simple.avsc"));
+
+final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+final List fields = new ArrayList<>();
+fields.add(new RecordField("msg",
RecordFieldType.STRING.getDataType()));
+