This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2730a90 NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use
'bulkWrite' for both insert and upsert.
2730a90 is described below
commit 2730a9000e7996a0f05735854e28507e499166ac
Author: Tamas Palfy <[email protected]>
AuthorDate: Fri Oct 15 12:46:11 2021 +0200
NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use 'bulkWrite' for
both insert and upsert.
This closes #5482.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../nifi/processors/mongodb/PutMongoRecord.java | 205 +++++++--
.../nifi/processors/mongodb/PutMongoRecordIT.java | 457 +++++++++++++++++++++
2 files changed, 639 insertions(+), 23 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 5068671..9c3df04 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -19,10 +19,19 @@ package org.apache.nifi.processors.mongodb;
import com.mongodb.MongoException;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@@ -39,30 +48,45 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.bson.Document;
+import org.bson.conversions.Bson;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
-@Tags({"mongodb", "insert", "record", "put"})
+@Tags({"mongodb", "insert", "update", "upsert", "record", "put"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("This processor is a record-aware processor for
inserting data into MongoDB. It uses a configured record reader and " +
- "schema to read an incoming record set from the body of a flowfile and
then inserts batches of those records into " +
- "a configured MongoDB collection. This processor does not support
updates, deletes or upserts. The number of documents to insert at a time is
controlled " +
- "by the \"Insert Batch Size\" configuration property. This value
should be set to a reasonable size to ensure " +
- "that MongoDB is not overloaded with too many inserts at once.")
+@CapabilityDescription("This processor is a record-aware processor for
inserting/upserting data into MongoDB. It uses a configured record reader and "
+
+ "schema to read an incoming record set from the body of a flowfile and
then inserts/upserts batches of those records into " +
+ "a configured MongoDB collection. This processor does not support
deletes. The number of documents to insert/upsert at a time is controlled " +
+ "by the \"Batch Size\" configuration property. This value should be
set to a reasonable size to ensure " +
+ "that MongoDB is not overloaded with too many operations at once.")
+@ReadsAttribute(
+ attribute = PutMongoRecord.MONGODB_UPDATE_MODE,
+ description = "Configurable parameter for controlling update mode on a
per-flowfile basis." +
+ " Acceptable values are 'one' and 'many' and controls whether a single
incoming record should update a single or multiple Mongo documents."
+)
public class PutMongoRecord extends AbstractMongoProcessor {
+ static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
+
static final Relationship REL_SUCCESS = new
Relationship.Builder().name("success")
.description("All FlowFiles that are written to MongoDB are routed
to this relationship").build();
static final Relationship REL_FAILURE = new
Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be written to MongoDB are
routed to this relationship").build();
+ static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update
One", "Updates only the first document that matches the query.");
+ static final AllowableValue UPDATE_MANY = new AllowableValue("many",
"Update Many", "Updates every document that matches the query.");
+ static final AllowableValue UPDATE_FF_ATTRIBUTE = new
AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "'
flowfile attribute.",
+ "Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the
incoming flowfile. Acceptable values are 'one' and 'many'.");
+
static final PropertyDescriptor RECORD_READER_FACTORY = new
PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
@@ -70,15 +94,55 @@ public class PutMongoRecord extends AbstractMongoProcessor {
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
+
static final PropertyDescriptor INSERT_COUNT = new
PropertyDescriptor.Builder()
.name("insert_count")
- .displayName("Insert Batch Size")
- .description("The number of records to group together for one
single insert operation against MongoDB.")
+ .displayName("Batch Size")
+ .description("The number of records to group together for one
single insert/upsert operation against MongoDB.")
.defaultValue("100")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+ .name("ordered")
+ .displayName("Ordered")
+ .description("Perform ordered or unordered operations")
+ .allowableValues("True", "False")
+ .defaultValue("False")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor BYPASS_VALIDATION = new
PropertyDescriptor.Builder()
+ .name("bypass-validation")
+ .displayName("Bypass Validation")
+ .description("Bypass schema validation during insert/upsert")
+ .allowableValues("True", "False")
+ .defaultValue("True")
+ .required(true)
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor UPDATE_KEY_FIELDS = new
PropertyDescriptor.Builder()
+ .name("update-key-fields")
+ .displayName("Update Key Fields")
+ .description("Comma separated list of fields based on which to
identify documents that need to be updated. " +
+ "If this property is set NiFi will attempt an upsert operation
on all documents. " +
+ "If this property is not set all documents will be inserted.")
+ .required(false)
+ .addValidator(StandardValidators.createListValidator(true, false,
StandardValidators.NON_EMPTY_VALIDATOR))
+ .build();
+
+ static final PropertyDescriptor UPDATE_MODE = new
PropertyDescriptor.Builder()
+ .name("update-mode")
+ .displayName("Update Mode")
+ .dependsOn(UPDATE_KEY_FIELDS)
+ .description("Choose between updating a single document or multiple
documents per incoming record.")
+ .allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE)
+ .defaultValue(UPDATE_ONE.getValue())
+ .build();
+
private final static Set<Relationship> relationships;
private final static List<PropertyDescriptor> propertyDescriptors;
@@ -88,6 +152,10 @@ public class PutMongoRecord extends AbstractMongoProcessor {
_propertyDescriptors.add(WRITE_CONCERN);
_propertyDescriptors.add(RECORD_READER_FACTORY);
_propertyDescriptors.add(INSERT_COUNT);
+ _propertyDescriptors.add(ORDERED);
+ _propertyDescriptors.add(BYPASS_VALIDATION);
+ _propertyDescriptors.add(UPDATE_KEY_FIELDS);
+ _propertyDescriptors.add(UPDATE_MODE);
propertyDescriptors =
Collections.unmodifiableList(_propertyDescriptors);
final Set<Relationship> _relationships = new HashSet<>();
@@ -118,15 +186,36 @@ public class PutMongoRecord extends
AbstractMongoProcessor {
final WriteConcern writeConcern = getWriteConcern(context);
- List<Document> inserts = new ArrayList<>();
int ceiling = context.getProperty(INSERT_COUNT).asInteger();
- int added = 0;
+ int written = 0;
boolean error = false;
- try (final InputStream inStream = session.read(flowFile);
- final RecordReader reader =
recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
- final MongoCollection<Document> collection =
getCollection(context, flowFile).withWriteConcern(writeConcern);
+ boolean ordered = context.getProperty(ORDERED).asBoolean();
+ boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
+
+ Map<String, List<String>> updateKeyFieldPathToFieldChain = new
LinkedHashMap<>();
+ if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+
Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*"))
+ .forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put(
+ updateKeyField,
+ Arrays.asList(updateKeyField.split("\\."))
+ ));
+ }
+
+ BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
+ bulkWriteOptions.ordered(ordered);
+ bulkWriteOptions.bypassDocumentValidation(bypass);
+
+ try (
+ final InputStream inStream = session.read(flowFile);
+ final RecordReader reader =
recordParserFactory.createRecordReader(flowFile, inStream, getLogger());
+ ) {
RecordSchema schema = reader.getSchema();
+
+ final MongoCollection<Document> collection =
getCollection(context, flowFile).withWriteConcern(writeConcern);
+
+ List<WriteModel<Document>> writeModels = new ArrayList<>();
+
Record record;
while ((record = reader.nextRecord()) != null) {
// Convert each Record to HashMap and put into the Mongo
document
@@ -135,17 +224,43 @@ public class PutMongoRecord extends
AbstractMongoProcessor {
for (String name : schema.getFieldNames()) {
document.put(name, contentMap.get(name));
}
- inserts.add(convertArrays(document));
- if (inserts.size() == ceiling) {
- collection.insertMany(inserts);
- added += inserts.size();
- inserts = new ArrayList<>();
+ Document readyToUpsert = convertArrays(document);
+
+ WriteModel<Document> writeModel;
+ if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+ Bson[] filters =
buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
+
+ if (updateModeMatches(UPDATE_ONE.getValue(), context,
flowFile)) {
+ writeModel = new UpdateOneModel<>(
+ Filters.and(filters),
+ new Document("$set", readyToUpsert),
+ new UpdateOptions().upsert(true)
+ );
+ } else if (updateModeMatches(UPDATE_MANY.getValue(),
context, flowFile)) {
+ writeModel = new UpdateManyModel<>(
+ Filters.and(filters),
+ new Document("$set", readyToUpsert),
+ new UpdateOptions().upsert(true)
+ );
+ } else {
+ String flowfileUpdateMode =
flowFile.getAttribute(MONGODB_UPDATE_MODE);
+ throw new ProcessException("Unrecognized '" +
MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+ }
+ } else {
+ writeModel = new InsertOneModel<>(readyToUpsert);
+ }
+
+ writeModels.add(writeModel);
+ if (writeModels.size() == ceiling) {
+ collection.bulkWrite(writeModels, bulkWriteOptions);
+ written += writeModels.size();
+ writeModels = new ArrayList<>();
}
}
- if (inserts.size() > 0) {
- collection.insertMany(inserts);
+ if (writeModels.size() > 0) {
+ collection.bulkWrite(writeModels, bulkWriteOptions);
}
- } catch (SchemaNotFoundException | IOException |
MalformedRecordException | MongoException e) {
+ } catch (ProcessException | SchemaNotFoundException | IOException |
MalformedRecordException | MongoException e) {
getLogger().error("PutMongoRecord failed with error:", e);
session.transfer(flowFile, REL_FAILURE);
error = true;
@@ -154,9 +269,9 @@ public class PutMongoRecord extends AbstractMongoProcessor {
String url = clientService != null
? clientService.getURI()
:
context.getProperty(URI).evaluateAttributeExpressions().getValue();
- session.getProvenanceReporter().send(flowFile, url,
String.format("Added %d documents to MongoDB.", added));
+ session.getProvenanceReporter().send(flowFile, url,
String.format("Written %d documents to MongoDB.", written));
session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Inserted {} records into MongoDB", new
Object[]{ added });
+ getLogger().info("Written {} records into MongoDB", new
Object[]{ written });
}
}
}
@@ -190,4 +305,48 @@ public class PutMongoRecord extends AbstractMongoProcessor
{
return retVal;
}
+
+ private Bson[] buildFilters(Map<String, List<String>>
updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+ Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+ .stream()
+ .map(updateKeyFieldPath__fieldChain -> {
+ String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+ List<String> fieldChain =
updateKeyFieldPath__fieldChain.getValue();
+
+ Object value = readyToUpsert;
+ String previousField = null;
+ for (String field : fieldChain) {
+ if (!(value instanceof Map)) {
+ throw new ProcessException("field '" + previousField +
"' (from field expression '" + fieldPath + "') is not an embedded document");
+ }
+
+ value = ((Map) value).get(field);
+
+ if (value == null) {
+ throw new ProcessException("field '" + field + "'
(from field expression '" + fieldPath + "') has no value");
+ }
+
+ previousField = field;
+ }
+
+ Bson filter = Filters.eq(fieldPath, value);
+ return filter;
+ })
+ .toArray(Bson[]::new);
+
+ return filters;
+ }
+
+ private boolean updateModeMatches(String updateModeToMatch, ProcessContext
context, FlowFile flowFile) {
+ String updateMode = context.getProperty(UPDATE_MODE).getValue();
+
+ boolean updateModeMatches = updateMode.equals(updateModeToMatch)
+ ||
+ (
+ updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue())
+ &&
updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE))
+ );
+
+ return updateModeMatches;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index a30a76c..e57769b 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -45,12 +45,16 @@ import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -243,4 +247,457 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0);
runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1);
}
+
+ @Test
+ void testUpsertAsInsert() throws Exception {
+ // GIVEN
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Arrays.asList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "name1");
+ put("age", 21);
+ }})},
+ new Object[]{2, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "name2");
+ put("age", 22);
+ }})}
+ )
+ );
+
+ Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+ new HashMap<String, Object>() {{
+ put("id", 1);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "name1");
+ put("age", 21);
+ }}));
+ }},
+ new HashMap<String, Object>() {{
+ put("id", 2);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "name2");
+ put("age", 22);
+ }}));
+ }}
+ ));
+
+ // WHEN
+ // THEN
+ testUpsertSuccess(runner, inputs, expected);
+ }
+
+ @Test
+ void testUpsertAsUpdate() throws Exception {
+ // GIVEN
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Arrays.asList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "updating_name1");
+ put("age", "age1".length());
+ }})},
+ new Object[]{2, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "name2");
+ put("age", "updating_age2".length());
+ }})}
+ ),
+ Arrays.asList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "updated_name1");
+ put("age", "age1".length());
+ }})},
+ new Object[]{2, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "name2");
+ put("age", "updated_age2".length());
+ }})}
+ )
+ );
+
+ Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+ new HashMap<String, Object>() {{
+ put("id", 1);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "updated_name1");
+ put("age", "age1".length());
+ }}));
+ }},
+ new HashMap<String, Object>() {{
+ put("id", 2);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "name2");
+ put("age", "updated_age2".length());
+ }}));
+ }}
+ ));
+
+ // WHEN
+ testUpsertSuccess(runner, inputs, expected);
+ }
+
+ @Test
+ void testUpsertAsInsertAndUpdate() throws Exception {
+ // GIVEN
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Collections.singletonList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "updating_name1");
+ put("age", "updating_age1".length());
+ }})}
+ ),
+ Arrays.asList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "updated_name1");
+ put("age", "updated_age1".length());
+ }})},
+ new Object[]{2, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "inserted_name2");
+ put("age", "inserted_age2".length());
+ }})}
+ )
+ );
+
+ Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+ new HashMap<String, Object>() {{
+ put("id", 1);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "updated_name1");
+ put("age", "updated_age1".length());
+ }}));
+ }},
+ new HashMap<String, Object>() {{
+ put("id", 2);
+ put("person", new Document(new HashMap<String, Object>() {{
+ put("name", "inserted_name2");
+ put("age", "inserted_age2".length());
+ }}));
+ }}
+ ));
+
+ // WHEN
+ testUpsertSuccess(runner, inputs, expected);
+ }
+
+ @Test
+ void testRouteToFailureWhenKeyFieldDoesNotExist() throws Exception {
+ // GIVEN
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS,
"id,non_existent_field");
+
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Collections.singletonList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "unimportant");
+ put("age", "unimportant".length());
+ }})}
+ )
+ );
+
+ // WHEN
+ // THEN
+ testUpsertFailure(runner, inputs);
+ }
+
+ @Test
+ void testUpdateMany() throws Exception {
+ // GIVEN
+ TestRunner initRunner = init();
+
+ // Init Mongo data
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("team", RecordFieldType.STRING);
+ recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+ List<Object[]> init = Arrays.asList(
+ new Object[]{"Joe", "A", "green"},
+ new Object[]{"Jane", "A", "green"},
+ new Object[]{"Jeff", "B", "blue"},
+ new Object[]{"Janet", "B", "blue"}
+ );
+
+ init.forEach(recordReader::addRecord);
+
+ initRunner.enqueue("");
+ initRunner.run();
+
+ // Update Mongo data
+ setup();
+ TestRunner updateRunner = init();
+
+ updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+ updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_MANY.getValue());
+
+ recordReader.addSchemaField("team", RecordFieldType.STRING);
+ recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Arrays.asList(
+ new Object[]{"A", "yellow"},
+ new Object[]{"B", "red"}
+ )
+ );
+
+ Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+ new HashMap<String, Object>() {{
+ put("name", "Joe");
+ put("team", "A");
+ put("color", "yellow");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Jane");
+ put("team", "A");
+ put("color", "yellow");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Jeff");
+ put("team", "B");
+ put("color", "red");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Janet");
+ put("team", "B");
+ put("color", "red");
+ }}
+ ));
+
+ // WHEN
+ // THEN
+ testUpsertSuccess(updateRunner, inputs, expected);
+ }
+
+ @Test
+ void testUpdateModeFFAttributeSetToMany() throws Exception {
+ // GIVEN
+ TestRunner initRunner = init();
+
+ // Init Mongo data
+ recordReader.addSchemaField("name", RecordFieldType.STRING);
+ recordReader.addSchemaField("team", RecordFieldType.STRING);
+ recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+ List<Object[]> init = Arrays.asList(
+ new Object[]{"Joe", "A", "green"},
+ new Object[]{"Jane", "A", "green"},
+ new Object[]{"Jeff", "B", "blue"},
+ new Object[]{"Janet", "B", "blue"}
+ );
+
+ init.forEach(recordReader::addRecord);
+
+ initRunner.enqueue("");
+ initRunner.run();
+
+ // Update Mongo data
+ setup();
+ TestRunner updateRunner = init();
+
+ updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+ updateRunner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+
+ recordReader.addSchemaField("team", RecordFieldType.STRING);
+ recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Arrays.asList(
+ new Object[]{"A", "yellow"},
+ new Object[]{"B", "red"}
+ )
+ );
+
+ Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+ new HashMap<String, Object>() {{
+ put("name", "Joe");
+ put("team", "A");
+ put("color", "yellow");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Jane");
+ put("team", "A");
+ put("color", "yellow");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Jeff");
+ put("team", "B");
+ put("color", "red");
+ }},
+ new HashMap<String, Object>() {{
+ put("name", "Janet");
+ put("team", "B");
+ put("color", "red");
+ }}
+ ));
+
+ // WHEN
+ inputs.forEach(input -> {
+ input.forEach(recordReader::addRecord);
+
+ MockFlowFile flowFile = new MockFlowFile(1);
+ flowFile.putAttributes(new HashMap<String, String>(){{
+ put(PutMongoRecord.MONGODB_UPDATE_MODE, "many");
+ }});
+ updateRunner.enqueue(flowFile);
+ updateRunner.run();
+ });
+
+ // THEN
+ assertEquals(0, updateRunner.getQueueSize().getObjectCount());
+
+ updateRunner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS,
inputs.size());
+
+ Set<Map<String, Object>> actual = new HashSet<>();
+ for (Document document : collection.find()) {
+ actual.add(document.entrySet().stream()
+ .filter(key__value -> !key__value.getKey().equals("_id"))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ void testRouteToFailureWhenUpdateModeFFAttributeSetToInvalid() throws
Exception {
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+ runner.setProperty(PutMongoRecord.UPDATE_MODE,
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+
+ recordReader.addSchemaField("team", RecordFieldType.STRING);
+ recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Arrays.asList(
+ new Object[]{"A", "yellow"},
+ new Object[]{"B", "red"}
+ )
+ );
+
+ // WHEN
+ // THEN
+ testUpsertFailure(runner, inputs);
+ }
+
+ @Test
+ void testRouteToFailureWhenKeyFieldReferencesNonEmbeddedDocument() throws
Exception {
+ // GIVEN
+ TestRunner runner = init();
+
+ runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS,
"id,id.is_not_an_embedded_document");
+
+ recordReader.addSchemaField("id", RecordFieldType.INT);
+ recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+ final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("name", RecordFieldType.STRING.getDataType()),
+ new RecordField("age", RecordFieldType.INT.getDataType())
+ ));
+
+ List<List<Object[]>> inputs = Arrays.asList(
+ Collections.singletonList(
+ new Object[]{1, new MapRecord(personSchema, new
HashMap<String, Object>() {{
+ put("name", "unimportant");
+ put("age", "unimportant".length());
+ }})}
+ )
+ );
+
+ // WHEN
+ // THEN
+ testUpsertFailure(runner, inputs);
+ }
+
+ private void testUpsertSuccess(TestRunner runner, List<List<Object[]>>
inputs, Set<Map<String, Object>> expected) {
+ // GIVEN
+
+ // WHEN
+ inputs.forEach(input -> {
+ input.forEach(recordReader::addRecord);
+
+ runner.enqueue("");
+ runner.run();
+ });
+
+ // THEN
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+
+ runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS,
inputs.size());
+
+ Set<Map<String, Object>> actual = new HashSet<>();
+ for (Document document : collection.find()) {
+ actual.add(document.entrySet().stream()
+ .filter(key__value -> !key__value.getKey().equals("_id"))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
+ assertEquals(expected, actual);
+ }
+
+ private void testUpsertFailure(TestRunner runner, List<List<Object[]>>
inputs) {
+ // GIVEN
+ Set<Object> expected = Collections.emptySet();
+
+ // WHEN
+ inputs.forEach(input -> {
+ input.forEach(recordReader::addRecord);
+
+ runner.enqueue("");
+ runner.run();
+ });
+
+ // THEN
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+
+ runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_FAILURE,
inputs.size());
+
+ Set<Map<String, Object>> actual = new HashSet<>();
+ for (Document document : collection.find()) {
+ actual.add(document.entrySet().stream()
+ .filter(key__value -> !key__value.getKey().equals("_id"))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue)));
+ }
+
+ assertEquals(expected, actual);
+ }
}