Repository: nifi Updated Branches: refs/heads/master cf3c66668 -> 794a64231
NIFI-5284: Added JSON_TYPE support to RunMongoAggregation This closes #2776 Signed-off-by: Mike Thomsen <mikerthom...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/794a6423 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/794a6423 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/794a6423 Branch: refs/heads/master Commit: 794a642310bc147885bbb6212563b1c2417f3f18 Parents: cf3c666 Author: zenfenan <sivaprasanna...@gmail.com> Authored: Sat Jun 9 17:35:59 2018 +0530 Committer: Mike Thomsen <mikerthom...@gmail.com> Committed: Sun Jun 10 06:45:21 2018 -0400 ---------------------------------------------------------------------- .../mongodb/AbstractMongoProcessor.java | 38 ++++++++++++++++++++ .../nifi/processors/mongodb/GetMongo.java | 38 ++------------------ .../processors/mongodb/RunMongoAggregation.java | 23 ++++++------ .../mongodb/RunMongoAggregationIT.java | 36 +++++++++++++++++-- 4 files changed, 88 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index 339bee7..fa3bc10 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -18,6 +18,7 @@ */ package org.apache.nifi.processors.mongodb; +import com.fasterxml.jackson.databind.ObjectMapper; import com.mongodb.MongoClient; import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientOptions.Builder; @@ -29,6 +30,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; @@ -45,6 +47,8 @@ import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -57,6 +61,13 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { static final String WRITE_CONCERN_REPLICA_ACKNOWLEDGED = "REPLICA_ACKNOWLEDGED"; static final String WRITE_CONCERN_MAJORITY = "MAJORITY"; + protected static final String JSON_TYPE_EXTENDED = "Extended"; + protected static final String JSON_TYPE_STANDARD = "Standard"; + protected static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", + "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); + protected static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", + "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); + protected static final PropertyDescriptor URI = new PropertyDescriptor.Builder() .name("Mongo URI") .displayName("Mongo URI") @@ -65,6 +76,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor DATABASE_NAME = new PropertyDescriptor.Builder() .name("Mongo Database Name") .displayName("Mongo Database Name") @@ -73,6 +85,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + protected static final PropertyDescriptor COLLECTION_NAME = new PropertyDescriptor.Builder() .name("Mongo Collection Name") .description("The name of the collection to use") @@ -80,6 +93,19 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + + protected static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() + .allowableValues(JSON_EXTENDED, JSON_STANDARD) + .defaultValue(JSON_TYPE_EXTENDED) + .displayName("JSON Type") + .name("json-type") + .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + + " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + + " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(true) + .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("ssl-context-service") .displayName("SSL Context Service") @@ -88,6 +114,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(false) .identifiesControllerService(SSLContextService.class) .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() .name("ssl-client-auth") .displayName("Client Auth") @@ -155,6 +182,7 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { descriptors.add(CLIENT_AUTH); } + protected ObjectMapper objectMapper; protected MongoClient mongoClient; @OnScheduled @@ -275,4 +303,14 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { session.getProvenanceReporter().receive(flowFile, getURI(context)); session.transfer(flowFile, rel); } + + protected synchronized void configureMapper(String setting) { + objectMapper = new ObjectMapper(); + + if (setting.equals(JSON_TYPE_STANDARD)) { + objectMapper.registerModule(ObjectIdSerializer.getModule()); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + objectMapper.setDateFormat(df); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index 10eb0c7..356f3f4 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -45,8 +45,6 @@ import org.bson.json.JsonWriterSettings; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,7 +53,6 @@ import java.util.List; import java.util.Map; import java.util.Set; - @Tags({ "mongodb", "read", "get" }) @InputRequirement(Requirement.INPUT_ALLOWED) @CapabilityDescription("Creates FlowFiles from documents in MongoDB") @@ -134,24 +131,6 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(Validator.VALID) .build(); - static final String JSON_TYPE_EXTENDED = "Extended"; - static final String JSON_TYPE_STANDARD = "Standard"; - static final AllowableValue JSON_EXTENDED = new AllowableValue(JSON_TYPE_EXTENDED, "Extended JSON", - "Use MongoDB's \"extended JSON\". This is the JSON generated with toJson() on a MongoDB Document from the Java driver"); - static final AllowableValue JSON_STANDARD = new AllowableValue(JSON_TYPE_STANDARD, "Standard JSON", - "Generate a JSON document that conforms to typical JSON conventions instead of Mongo-specific conventions."); - static final PropertyDescriptor JSON_TYPE = new PropertyDescriptor.Builder() - .allowableValues(JSON_EXTENDED, JSON_STANDARD) - .defaultValue(JSON_TYPE_EXTENDED) - .displayName("JSON Type") - .name("json-type") - .description("By default, MongoDB's Java driver returns \"extended JSON\". Some of the features of this variant of JSON" + - " may cause problems for other JSON parsers that expect only standard JSON types and conventions. This configuration setting " + - " controls whether to use extended JSON or provide a clean view that conforms to standard JSON.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .required(true) - .build(); - private final static Set<Relationship> relationships; private final static List<PropertyDescriptor> propertyDescriptors; @@ -189,8 +168,6 @@ public class GetMongo extends AbstractMongoProcessor { return propertyDescriptors; } - private ObjectMapper mapper; - //Turn a list of Mongo result documents into a String representation of a JSON array private String buildBatch(List<Document> documents, String jsonTypeSetting, String prettyPrintSetting) throws IOException { StringBuilder builder = new StringBuilder(); @@ -198,7 +175,7 @@ public class GetMongo extends AbstractMongoProcessor { Document document = documents.get(index); String asJson; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - asJson = getObjectWriter(mapper, prettyPrintSetting).writeValueAsString(document); + asJson = getObjectWriter(objectMapper, prettyPrintSetting).writeValueAsString(document); } else { asJson = document.toJson(new JsonWriterSettings(true)); } @@ -210,15 +187,6 @@ public class GetMongo extends AbstractMongoProcessor { return "[" + builder.toString() + "]"; } - private void configureMapper(String setting) { - mapper = new ObjectMapper(); - if (setting.equals(JSON_TYPE_STANDARD)) { - mapper.registerModule(ObjectIdSerializer.getModule()); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - mapper.setDateFormat(df); - } - } - private ObjectWriter getObjectWriter(ObjectMapper mapper, String ppSetting) { return ppSetting.equals(YES_PP.getValue()) ? mapper.writerWithDefaultPrettyPrinter() : mapper.writer(); @@ -237,7 +205,7 @@ public class GetMongo extends AbstractMongoProcessor { final ComponentLog logger = getLogger(); - Map<String, String> attributes = new HashMap<String, String>(); + Map<String, String> attributes = new HashMap<>(); attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); final Document query; @@ -334,7 +302,7 @@ public class GetMongo extends AbstractMongoProcessor { flowFile = session.write(flowFile, out -> { String json; if (jsonTypeSetting.equals(JSON_TYPE_STANDARD)) { - json = getObjectWriter(mapper, usePrettyPrint).writeValueAsString(cursor.next()); + json = getObjectWriter(objectMapper, usePrettyPrint).writeValueAsString(cursor.next()); } else { json = cursor.next().toJson(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java index 19969c4..684c5f5 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/RunMongoAggregation.java @@ -96,6 +96,7 @@ public class RunMongoAggregation extends AbstractMongoProcessor { _propertyDescriptors.addAll(descriptors); _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(JSON_TYPE); _propertyDescriptors.add(QUERY_ATTRIBUTE); _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(RESULTS_PER_FLOWFILE); @@ -120,11 +121,10 @@ public class RunMongoAggregation extends AbstractMongoProcessor { return propertyDescriptors; } - static String buildBatch(List<Document> batch) { - ObjectMapper mapper = new ObjectMapper(); + private String buildBatch(List<Document> batch) { String retVal; try { - retVal = mapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); + retVal = objectMapper.writeValueAsString(batch.size() > 1 ? batch : batch.get(0)); } catch (Exception e) { retVal = null; } @@ -143,12 +143,15 @@ public class RunMongoAggregation extends AbstractMongoProcessor { } } - String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); - String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); - Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); - Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String query = context.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue(); + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + final Integer batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final Integer resultsPerFlowfile = context.getProperty(RESULTS_PER_FLOWFILE).asInteger(); + final String jsonTypeSetting = context.getProperty(JSON_TYPE).getValue(); + + configureMapper(jsonTypeSetting); - Map<String, String> attrs = new HashMap<String, String>(); + Map<String, String> attrs = new HashMap<>(); if (queryAttr != null && queryAttr.trim().length() > 0) { attrs.put(queryAttr, query); } @@ -162,13 +165,13 @@ public class RunMongoAggregation extends AbstractMongoProcessor { it.batchSize(batchSize != null ? batchSize : 1); iter = it.iterator(); - List<Document> batch = new ArrayList<Document>(); + List<Document> batch = new ArrayList<>(); while (iter.hasNext()) { batch.add(iter.next()); if (batch.size() == resultsPerFlowfile) { writeBatch(buildBatch(batch), flowFile, context, session, attrs, REL_RESULTS); - batch = new ArrayList<Document>(); + batch = new ArrayList<>(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/794a6423/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java index f2ddbca..02d9ad4 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/RunMongoAggregationIT.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.HashMap; import java.util.List; @@ -48,6 +49,7 @@ public class RunMongoAggregationIT { private TestRunner runner; private MongoClient mongoClient; private Map<String, Integer> mappings; + private Calendar now = Calendar.getInstance(); @Before public void setup() { @@ -68,7 +70,7 @@ public class RunMongoAggregationIT { for (int x = 0; x < values.length; x++) { for (int y = 0; y < x + 2; y++) { - Document doc = new Document().append("val", values[x]); + Document doc = new Document().append("val", values[x]).append("date", now.getTime()); collection.insertOne(doc); } mappings.put(values[x], x + 2); @@ -78,7 +80,6 @@ public class RunMongoAggregationIT { @After public void teardown() { runner = null; - mongoClient.getDatabase(DB_NAME).drop(); } @@ -164,6 +165,37 @@ public class RunMongoAggregationIT { runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1); } + @Test + public void testJsonTypes() throws IOException { + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_STANDARD); + runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { \"myArray\": [ \"$val\", \"$date\" ] } } ]"); + runner.enqueue("test"); + runner.run(1, true, true); + + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + ObjectMapper mapper = new ObjectMapper(); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map<String, List<String>> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( format.format(now.getTime()))); + } + + runner.clearTransferState(); + + runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_EXTENDED); + runner.enqueue("test"); + runner.run(1, true, true); + + flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS); + for (MockFlowFile mockFlowFile : flowFiles) { + byte[] raw = runner.getContentAsByteArray(mockFlowFile); + Map<String, List<Long>> read = mapper.readValue(raw, Map.class); + Assert.assertTrue(read.get("myArray").get(1) == now.getTimeInMillis()); + } + } + private void evaluateRunner(int original) throws IOException { runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size()); runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);