Repository: nifi
Updated Branches:
  refs/heads/master cf3c66668 -> 794a64231


NIFI-5284: Added JSON_TYPE support to RunMongoAggregation

This closes #2776

Signed-off-by: Mike Thomsen <mikerthom...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/794a6423
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/794a6423
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/794a6423

Branch: refs/heads/master
Commit: 794a642310bc147885bbb6212563b1c2417f3f18
Parents: cf3c666
Author: zenfenan <sivaprasanna...@gmail.com>
Authored: Sat Jun 9 17:35:59 2018 +0530
Committer: Mike Thomsen <mikerthom...@gmail.com>
Committed: Sun Jun 10 06:45:21 2018 -0400

----------------------------------------------------------------------
 .../mongodb/AbstractMongoProcessor.java         | 38 ++++++++++++++++++++
 .../nifi/processors/mongodb/GetMongo.java       | 38 ++------------------
 .../processors/mongodb/RunMongoAggregation.java | 23 ++++++------
 .../mongodb/RunMongoAggregationIT.java          | 36 +++++++++++++++++--
 4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
index 339bee7..fa3bc10 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nifi.processors.mongodb;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.mongodb.MongoClient;
 import com.mongodb.MongoClientOptions;
 import com.mongodb.MongoClientOptions.Builder;
@@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.authentication.exception.ProviderCreationException;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
@@ -45,6 +47,8 @@ import javax.net.ssl.SSLContext;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -57,6 +61,13 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
     static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = 
"REPLICA_ACKNOWLEDGED";
     static final String WRITE_CONCERN_MAJORITY = "MAJORITY";
 
+    protected static final String JSON_TYPE_EXTENDED = "Extended";
+    protected static final String JSON_TYPE_STANDARD   = "Standard";
+    protected static final AllowableValue JSON_EXTENDED = new 
AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
+            "Use MongoDB's \"extended JSON\". This is the JSON generated with 
toJson() on a MongoDB Document from the Java driver");
+    protected static final AllowableValue JSON_STANDARD = new 
AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
+            "Generate a JSON document that conforms to typical JSON 
conventions instead of Mongo-specific conventions.");
+
     protected static final PropertyDescriptor URI = new 
PropertyDescriptor.Builder()
         .name("Mongo URI")
         .displayName("Mongo URI")
@@ -65,6 +76,7 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+
     protected static final PropertyDescriptor DATABASE_NAME = new 
PropertyDescriptor.Builder()
         .name("Mongo Database Name")
         .displayName("Mongo Database Name")
@@ -73,6 +85,7 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+
     protected static final PropertyDescriptor COLLECTION_NAME = new 
PropertyDescriptor.Builder()
         .name("Mongo Collection Name")
         .description("The name of the collection to use")
@@ -80,6 +93,19 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
+
+    protected static final PropertyDescriptor JSON_TYPE = new 
PropertyDescriptor.Builder()
+            .allowableValues(JSON_EXTENDED, JSON_STANDARD)
+            .defaultValue(JSON_TYPE_EXTENDED)
+            .displayName("JSON Type")
+            .name("json-type")
+            .description("By default, MongoDB's Java driver returns \"extended 
JSON\". Some of the features of this variant of JSON" +
+                    " may cause problems for other JSON parsers that expect 
only standard JSON types and conventions. This configuration setting " +
+                    " controls whether to use extended JSON or provide a clean 
view that conforms to standard JSON.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
         .name("ssl-context-service")
         .displayName("SSL Context Service")
@@ -88,6 +114,7 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         .required(false)
         .identifiesControllerService(SSLContextService.class)
         .build();
+
     public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
         .name("ssl-client-auth")
         .displayName("Client Auth")
@@ -155,6 +182,7 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         descriptors.add(CLIENT_AUTH);
     }
 
+    protected ObjectMapper objectMapper;
     protected MongoClient mongoClient;
 
     @OnScheduled
@@ -275,4 +303,14 @@ public abstract class AbstractMongoProcessor extends 
AbstractProcessor {
         session.getProvenanceReporter().receive(flowFile, getURI(context));
         session.transfer(flowFile, rel);
     }
+
+    protected synchronized void configureMapper(String setting) {
+        objectMapper = new ObjectMapper();
+
+        if (setting.equals(JSON_TYPE_STANDARD)) {
+            objectMapper.registerModule(ObjectIdSerializer.getModule());
+            DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+            objectMapper.setDateFormat(df);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
index 10eb0c7..356f3f4 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java
@@ -45,8 +45,6 @@ import org.bson.json.JsonWriterSettings;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,7 +53,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-
 @Tags({ "mongodb", "read", "get" })
 @InputRequirement(Requirement.INPUT_ALLOWED)
 @CapabilityDescription("Creates FlowFiles from documents in MongoDB")
@@ -134,24 +131,6 @@ public class GetMongo extends AbstractMongoProcessor {
             .addValidator(Validator.VALID)
             .build();
 
-    static final String JSON_TYPE_EXTENDED = "Extended";
-    static final String JSON_TYPE_STANDARD   = "Standard";
-    static final AllowableValue JSON_EXTENDED = new 
AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON",
-            "Use MongoDB's \"extended JSON\". This is the JSON generated with 
toJson() on a MongoDB Document from the Java driver");
-    static final AllowableValue JSON_STANDARD = new 
AllowableValue(JSON_TYPE_STANDARD, "Standard JSON",
-            "Generate a JSON document that conforms to typical JSON 
conventions instead of Mongo-specific conventions.");
-    static final PropertyDescriptor JSON_TYPE = new 
PropertyDescriptor.Builder()
-            .allowableValues(JSON_EXTENDED, JSON_STANDARD)
-            .defaultValue(JSON_TYPE_EXTENDED)
-            .displayName("JSON Type")
-            .name("json-type")
-            .description("By default, MongoDB's Java driver returns \"extended 
JSON\". Some of the features of this variant of JSON" +
-                    " may cause problems for other JSON parsers that expect 
only standard JSON types and conventions. This configuration setting " +
-                    " controls whether to use extended JSON or provide a clean 
view that conforms to standard JSON.")
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .required(true)
-            .build();
-
     private final static Set<Relationship> relationships;
     private final static List<PropertyDescriptor> propertyDescriptors;
 
@@ -189,8 +168,6 @@ public class GetMongo extends AbstractMongoProcessor {
         return propertyDescriptors;
     }
 
-    private ObjectMapper mapper;
-
     //Turn a list of Mongo result documents into a String representation of a 
JSON array
     private String buildBatch(List<Document> documents, String 
jsonTypeSetting, String prettyPrintSetting) throws IOException {
         StringBuilder builder = new StringBuilder();
@@ -198,7 +175,7 @@ public class GetMongo extends AbstractMongoProcessor {
             Document document = documents.get(index);
             String asJson;
             if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
-                asJson = getObjectWriter(mapper, 
prettyPrintSetting).writeValueAsString(document);
+                asJson = getObjectWriter(objectMapper, 
prettyPrintSetting).writeValueAsString(document);
             } else {
                 asJson = document.toJson(new JsonWriterSettings(true));
             }
@@ -210,15 +187,6 @@ public class GetMongo extends AbstractMongoProcessor {
         return "[" + builder.toString() + "]";
     }
 
-    private void configureMapper(String setting) {
-        mapper = new ObjectMapper();
-        if (setting.equals(JSON_TYPE_STANDARD)) {
-            mapper.registerModule(ObjectIdSerializer.getModule());
-            DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-            mapper.setDateFormat(df);
-        }
-    }
-
     private ObjectWriter getObjectWriter(ObjectMapper mapper, String 
ppSetting) {
         return ppSetting.equals(YES_PP.getValue()) ? 
mapper.writerWithDefaultPrettyPrinter()
                 : mapper.writer();
@@ -237,7 +205,7 @@ public class GetMongo extends AbstractMongoProcessor {
 
         final ComponentLog logger = getLogger();
 
-        Map<String, String> attributes = new HashMap<String, String>();
+        Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
 
         final Document query;
@@ -334,7 +302,7 @@ public class GetMongo extends AbstractMongoProcessor {
                         flowFile = session.write(flowFile, out -> {
                             String json;
                             if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) {
-                                json = getObjectWriter(mapper, 
usePrettyPrint).writeValueAsString(cursor.next());
+                                json = getObjectWriter(objectMapper, 
usePrettyPrint).writeValueAsString(cursor.next());
                             } else {
                                 json = cursor.next().toJson();
                             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
index 19969c4..684c5f5 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java
@@ -96,6 +96,7 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
         _propertyDescriptors.addAll(descriptors);
         _propertyDescriptors.add(CHARSET);
         _propertyDescriptors.add(QUERY);
+        _propertyDescriptors.add(JSON_TYPE);
         _propertyDescriptors.add(QUERY_ATTRIBUTE);
         _propertyDescriptors.add(BATCH_SIZE);
         _propertyDescriptors.add(RESULTS_PER_FLOWFILE);
@@ -120,11 +121,10 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
         return propertyDescriptors;
     }
 
-    static String buildBatch(List<Document> batch) {
-        ObjectMapper mapper = new ObjectMapper();
+    private String buildBatch(List<Document> batch) {
         String retVal;
         try {
-            retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : 
batch.get(0));
+            retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch 
: batch.get(0));
         } catch (Exception e) {
             retVal = null;
         }
@@ -143,12 +143,15 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
             }
         }
 
-        String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
-        String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
-        Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
-        Integer resultsPerFlowfile = 
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
+        final String query = 
context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
+        final String queryAttr = 
context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
+        final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger();
+        final Integer resultsPerFlowfile = 
context.getProperty(RESULTS_PER_FLOWFILE).asInteger();
+        final String jsonTypeSetting = 
context.getProperty(JSON_TYPE).getValue();
+
+        configureMapper(jsonTypeSetting);
 
-        Map<String, String> attrs = new HashMap<String, String>();
+        Map<String, String> attrs = new HashMap<>();
         if (queryAttr != null && queryAttr.trim().length() > 0) {
             attrs.put(queryAttr, query);
         }
@@ -162,13 +165,13 @@ public class RunMongoAggregation extends 
AbstractMongoProcessor {
             it.batchSize(batchSize != null ? batchSize : 1);
 
             iter = it.iterator();
-            List<Document> batch = new ArrayList<Document>();
+            List<Document> batch = new ArrayList<>();
 
             while (iter.hasNext()) {
                 batch.add(iter.next());
                 if (batch.size() == resultsPerFlowfile) {
                     writeBatch(buildBatch(batch), flowFile, context, session, 
attrs, REL_RESULTS);
-                    batch = new ArrayList<Document>();
+                    batch = new ArrayList<>();
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
index f2ddbca..02d9ad4 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java
@@ -33,6 +33,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
@@ -48,6 +49,7 @@ public class RunMongoAggregationIT {
     private TestRunner runner;
     private MongoClient mongoClient;
     private Map<String, Integer> mappings;
+    private Calendar now = Calendar.getInstance();
 
     @Before
     public void setup() {
@@ -68,7 +70,7 @@ public class RunMongoAggregationIT {
 
         for (int x = 0; x < values.length; x++) {
             for (int y = 0; y < x + 2; y++) {
-                Document doc = new Document().append("val", values[x]);
+                Document doc = new Document().append("val", 
values[x]).append("date", now.getTime());
                 collection.insertOne(doc);
             }
             mappings.put(values[x], x + 2);
@@ -78,7 +80,6 @@ public class RunMongoAggregationIT {
     @After
     public void teardown() {
         runner = null;
-
         mongoClient.getDatabase(DB_NAME).drop();
     }
 
@@ -164,6 +165,37 @@ public class RunMongoAggregationIT {
         runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testJsonTypes() throws IOException {
+
+        runner.setProperty(RunMongoAggregation.JSON_TYPE, 
RunMongoAggregation.JSON_STANDARD);
+        runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { 
\"myArray\": [ \"$val\", \"$date\" ] } } ]");
+        runner.enqueue("test");
+        runner.run(1, true, true);
+
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
+        ObjectMapper mapper = new ObjectMapper();
+        SimpleDateFormat format = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+        for (MockFlowFile mockFlowFile : flowFiles) {
+            byte[] raw = runner.getContentAsByteArray(mockFlowFile);
+            Map<String, List<String>> read = mapper.readValue(raw, Map.class);
+            Assert.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( 
format.format(now.getTime())));
+        }
+
+        runner.clearTransferState();
+
+        runner.setProperty(RunMongoAggregation.JSON_TYPE, 
RunMongoAggregation.JSON_EXTENDED);
+        runner.enqueue("test");
+        runner.run(1, true, true);
+
+        flowFiles = 
runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
+        for (MockFlowFile mockFlowFile : flowFiles) {
+            byte[] raw = runner.getContentAsByteArray(mockFlowFile);
+            Map<String, List<Long>> read = mapper.readValue(raw, Map.class);
+            Assert.assertTrue(read.get("myArray").get(1) == 
now.getTimeInMillis());
+        }
+    }
+
     private void evaluateRunner(int original) throws IOException {
         runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 
mappings.size());
         runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);

Reply via email to