NIFI-945 Create a new property (JSON Container) in ConvertAvroToJson, which 
determines how stream of records is exposed: either as a sequence of single 
Objects ("none"),
writing every Object to a new line, or as an array of Objects.

Let's assume you have an Avro content as stream of records (record1, record2, 
...). If JSON container is "none", the converter will expose the records as 
sequence of
single JSON objects:

record1
record2
...
recordN

Please bear in mind, that the final output is not a valid JSON content. You can 
then forward this content e.g. to Kafka, where every record will be a single 
Kafka message.

If JSON container is "array", the output looks like this:

[record1,record2,...,recordN]

It is useful when you want to convert your Avro content to a valid JSON array.

This closes #88

Reviewed and Amended (amendments reviewed by original patch author on github) 
by Tony Kurc (tk...@apache.org)


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a5a5badb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a5a5badb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a5a5badb

Branch: refs/heads/NIFI-655
Commit: a5a5badb88311cb29cbe39088b57b9686314a1c6
Parents: da28b81
Author: Joe <joe.mesza...@impresstv.com>
Authored: Wed Oct 21 20:11:06 2015 -0400
Committer: Tony Kurc <trk...@gmail.com>
Committed: Wed Oct 21 20:15:29 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 49 ++++++++++++++++++--
 .../processors/avro/TestConvertAvroToJSON.java  | 47 +++++++++++++++++--
 2 files changed, 89 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a5a5badb/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8832a73..016750b 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -22,7 +22,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.avro.file.DataFileStream;
@@ -34,11 +37,13 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
@@ -49,9 +54,20 @@ import org.apache.nifi.processor.io.StreamCallback;
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This 
processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as 
the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The 
output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a 
stream of "
-    + "multiple Avro records, the resultant FlowFile will contain a JSON Array 
containing all of the Avro records.")
+    + "multiple Avro records, the resultant FlowFile will contain a JSON Array 
containing all of the Avro records or a sequence of JSON Objects")
 @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to 
application/json")
 public class ConvertAvroToJSON extends AbstractProcessor {
+    protected static final String CONTAINER_ARRAY = "array";
+    protected static final String CONTAINER_NONE = "none";
+
+    static final PropertyDescriptor CONTAINER_OPTIONS
+            = new PropertyDescriptor.Builder()
+            .name("JSON container options")
+            .description("Determines how stream of records is exposed: either 
as a sequence of single Objects (" + CONTAINER_NONE + ") (i.e. writing every 
Object to a new line), or as an array of Objects (" + CONTAINER_ARRAY + ").")
+            .allowableValues(CONTAINER_NONE, CONTAINER_ARRAY)
+            .required(true)
+            .defaultValue(CONTAINER_ARRAY)
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -62,6 +78,23 @@ public class ConvertAvroToJSON extends AbstractProcessor {
             .description("A FlowFile is routed to this relationship if it 
cannot be parsed as Avro or cannot be converted to JSON for any reason")
             .build();
 
+    
+
+    private List<PropertyDescriptor> properties;
+    
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        super.init(context);
+        
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(CONTAINER_OPTIONS);
+        this.properties = Collections.unmodifiableList(properties);
+    
+    }
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
     @Override
     public Set<Relationship> getRelationships() {
         final Set<Relationship> rels = new HashSet<>();
@@ -77,11 +110,14 @@ public class ConvertAvroToJSON extends AbstractProcessor {
             return;
         }
 
+        final String containerOption = 
context.getProperty(CONTAINER_OPTIONS).getValue();
+
         try {
             flowFile = session.write(flowFile, new StreamCallback() {
                 @Override
                 public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn);
+
                          final OutputStream out = new 
BufferedOutputStream(rawOut);
                          final DataFileStream<GenericRecord> reader = new 
DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
@@ -90,7 +126,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                         final String json = genericData.toString(record);
 
                         int recordCount = 0;
-                        if (reader.hasNext()) {
+                        if (reader.hasNext() && 
containerOption.equals(CONTAINER_ARRAY)) {
                             out.write('[');
                         }
 
@@ -98,13 +134,18 @@ public class ConvertAvroToJSON extends AbstractProcessor {
                         recordCount++;
 
                         while (reader.hasNext()) {
-                            out.write(',');
+                            if (containerOption.equals(CONTAINER_ARRAY)) {
+                                out.write(',');
+                            } else {
+                                
out.write(System.lineSeparator().getBytes(StandardCharsets.UTF_8));
+                            }
+
                             final GenericRecord nextRecord = 
reader.next(record);
                             
out.write(genericData.toString(nextRecord).getBytes(StandardCharsets.UTF_8));
                             recordCount++;
                         }
 
-                        if (recordCount > 1) {
+                        if (recordCount > 1 && 
containerOption.equals(CONTAINER_ARRAY)) {
                             out.write(']');
                         }
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a5a5badb/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index cfd26c1..302528e 100644
--- 
a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ 
b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -16,7 +16,11 @@
  */
 package org.apache.nifi.processors.avro;
 
+import java.io.File;
+import java.io.IOException;
+
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -27,9 +31,6 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-
 public class TestConvertAvroToJSON {
 
     @Test
@@ -57,6 +58,8 @@ public class TestConvertAvroToJSON {
         final TestRunner runner = TestRunners.newTestRunner(new 
ConvertAvroToJSON());
         final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
 
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, 
ConvertAvroToJSON.CONTAINER_ARRAY);
+
         final GenericRecord user1 = new GenericData.Record(schema);
         user1.put("name", "Alyssa");
         user1.put("favorite_number", 256);
@@ -85,4 +88,42 @@ public class TestConvertAvroToJSON {
         runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_FAILURE, 1);
     }
 
+    private ByteArrayOutputStream serializeAvroRecord(final Schema schema, 
final DatumWriter<GenericRecord> datumWriter, final GenericRecord... users) 
throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter);
+        dataFileWriter.create(schema, out);
+        for (final GenericRecord user : users) {
+            dataFileWriter.append(user);
+        }
+
+        dataFileWriter.close();
+        return out;
+    }
+
+    @Test
+    public void testMultipleAvroMessagesContainerNone() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ConvertAvroToJSON());
+        final Schema schema = new Schema.Parser().parse(new 
File("src/test/resources/user.avsc"));
+
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, 
ConvertAvroToJSON.CONTAINER_NONE);
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final GenericRecord user2 = new GenericData.Record(schema);
+        user2.put("name", "George");
+        user2.put("favorite_number", 1024);
+        user2.put("favorite_color", "red");
+
+        final DatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, 
datumWriter, user1, user2);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 
256, \"favorite_color\": null}\n{\"name\": \"George\", \"favorite_number\": 
1024, \"favorite_color\": \"red\"}");
+    }
 }

Reply via email to