This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2730a90  NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use 
'bulkWrite' for both insert and upsert.
2730a90 is described below

commit 2730a9000e7996a0f05735854e28507e499166ac
Author: Tamas Palfy <[email protected]>
AuthorDate: Fri Oct 15 12:46:11 2021 +0200

    NIFI-9334 Add support for upsert in 'PutMongoRecord'. Use 'bulkWrite' for 
both insert and upsert.
    
    This closes #5482.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../nifi/processors/mongodb/PutMongoRecord.java    | 205 +++++++--
 .../nifi/processors/mongodb/PutMongoRecordIT.java  | 457 +++++++++++++++++++++
 2 files changed, 639 insertions(+), 23 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
index 5068671..9c3df04 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongoRecord.java
@@ -19,10 +19,19 @@ package org.apache.nifi.processors.mongodb;
 import com.mongodb.MongoException;
 import com.mongodb.WriteConcern;
 import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.BulkWriteOptions;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.InsertOneModel;
+import com.mongodb.client.model.UpdateManyModel;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
@@ -39,30 +48,45 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.bson.Document;
+import org.bson.conversions.Bson;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 @EventDriven
-@Tags({"mongodb", "insert", "record", "put"})
+@Tags({"mongodb", "insert", "update", "upsert", "record", "put"})
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("This processor is a record-aware processor for 
inserting data into MongoDB. It uses a configured record reader and " +
-        "schema to read an incoming record set from the body of a flowfile and 
then inserts batches of those records into " +
-        "a configured MongoDB collection. This processor does not support 
updates, deletes or upserts. The number of documents to insert at a time is 
controlled " +
-        "by the \"Insert Batch Size\" configuration property. This value 
should be set to a reasonable size to ensure " +
-        "that MongoDB is not overloaded with too many inserts at once.")
+@CapabilityDescription("This processor is a record-aware processor for 
inserting/upserting data into MongoDB. It uses a configured record reader and " 
+
+        "schema to read an incoming record set from the body of a flowfile and 
then inserts/upserts batches of those records into " +
+        "a configured MongoDB collection. This processor does not support 
deletes. The number of documents to insert/upsert at a time is controlled " +
+        "by the \"Batch Size\" configuration property. This value should be 
set to a reasonable size to ensure " +
+        "that MongoDB is not overloaded with too many operations at once.")
+@ReadsAttribute(
+    attribute = PutMongoRecord.MONGODB_UPDATE_MODE,
+    description = "Configurable parameter for controlling update mode on a 
per-flowfile basis." +
+        " Acceptable values are 'one' and 'many' and controls whether a single 
incoming record should update a single or multiple Mongo documents."
+)
 public class PutMongoRecord extends AbstractMongoProcessor {
+    static final String MONGODB_UPDATE_MODE = "mongodb.update.mode";
+
     static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
             .description("All FlowFiles that are written to MongoDB are routed 
to this relationship").build();
     static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
             .description("All FlowFiles that cannot be written to MongoDB are 
routed to this relationship").build();
 
+    static final AllowableValue UPDATE_ONE = new AllowableValue("one", "Update 
One", "Updates only the first document that matches the query.");
+    static final AllowableValue UPDATE_MANY = new AllowableValue("many", 
"Update Many", "Updates every document that matches the query.");
+    static final AllowableValue UPDATE_FF_ATTRIBUTE = new 
AllowableValue("flowfile-attribute", "Use '" + MONGODB_UPDATE_MODE + "' 
flowfile attribute.",
+        "Use the value of the '" + MONGODB_UPDATE_MODE + "' attribute of the 
incoming flowfile. Acceptable values are 'one' and 'many'.");
+
     static final PropertyDescriptor RECORD_READER_FACTORY = new 
PropertyDescriptor.Builder()
             .name("record-reader")
             .displayName("Record Reader")
@@ -70,15 +94,55 @@ public class PutMongoRecord extends AbstractMongoProcessor {
             .identifiesControllerService(RecordReaderFactory.class)
             .required(true)
             .build();
+
     static final PropertyDescriptor INSERT_COUNT = new 
PropertyDescriptor.Builder()
             .name("insert_count")
-            .displayName("Insert Batch Size")
-            .description("The number of records to group together for one 
single insert operation against MongoDB.")
+            .displayName("Batch Size")
+            .description("The number of records to group together for one 
single insert/upsert operation against MongoDB.")
             .defaultValue("100")
             .required(true)
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor ORDERED = new PropertyDescriptor.Builder()
+            .name("ordered")
+            .displayName("Ordered")
+            .description("Perform ordered or unordered operations")
+            .allowableValues("True", "False")
+            .defaultValue("False")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor BYPASS_VALIDATION = new 
PropertyDescriptor.Builder()
+            .name("bypass-validation")
+            .displayName("Bypass Validation")
+            .description("Bypass schema validation during insert/upsert")
+            .allowableValues("True", "False")
+            .defaultValue("True")
+            .required(true)
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor UPDATE_KEY_FIELDS = new 
PropertyDescriptor.Builder()
+            .name("update-key-fields")
+            .displayName("Update Key Fields")
+            .description("Comma separated list of fields based on which to 
identify documents that need to be updated. " +
+                "If this property is set NiFi will attempt an upsert operation 
on all documents. " +
+                "If this property is not set all documents will be inserted.")
+            .required(false)
+            .addValidator(StandardValidators.createListValidator(true, false, 
StandardValidators.NON_EMPTY_VALIDATOR))
+            .build();
+
+    static final PropertyDescriptor UPDATE_MODE = new 
PropertyDescriptor.Builder()
+        .name("update-mode")
+        .displayName("Update Mode")
+        .dependsOn(UPDATE_KEY_FIELDS)
+        .description("Choose between updating a single document or multiple 
documents per incoming record.")
+        .allowableValues(UPDATE_ONE, UPDATE_MANY, UPDATE_FF_ATTRIBUTE)
+        .defaultValue(UPDATE_ONE.getValue())
+        .build();
+
     private final static Set<Relationship> relationships;
     private final static List<PropertyDescriptor> propertyDescriptors;
 
@@ -88,6 +152,10 @@ public class PutMongoRecord extends AbstractMongoProcessor {
         _propertyDescriptors.add(WRITE_CONCERN);
         _propertyDescriptors.add(RECORD_READER_FACTORY);
         _propertyDescriptors.add(INSERT_COUNT);
+        _propertyDescriptors.add(ORDERED);
+        _propertyDescriptors.add(BYPASS_VALIDATION);
+        _propertyDescriptors.add(UPDATE_KEY_FIELDS);
+        _propertyDescriptors.add(UPDATE_MODE);
         propertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
         final Set<Relationship> _relationships = new HashSet<>();
@@ -118,15 +186,36 @@ public class PutMongoRecord extends 
AbstractMongoProcessor {
 
         final WriteConcern writeConcern = getWriteConcern(context);
 
-        List<Document> inserts = new ArrayList<>();
         int ceiling = context.getProperty(INSERT_COUNT).asInteger();
-        int added   = 0;
+        int written = 0;
         boolean error = false;
 
-        try (final InputStream inStream = session.read(flowFile);
-             final RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, inStream, getLogger())) {
-            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+        boolean ordered = context.getProperty(ORDERED).asBoolean();
+        boolean bypass = context.getProperty(BYPASS_VALIDATION).asBoolean();
+
+        Map<String, List<String>> updateKeyFieldPathToFieldChain = new 
LinkedHashMap<>();
+        if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+            
Arrays.stream(context.getProperty(UPDATE_KEY_FIELDS).getValue().split("\\s*,\\s*"))
+                .forEach(updateKeyField -> updateKeyFieldPathToFieldChain.put(
+                    updateKeyField,
+                    Arrays.asList(updateKeyField.split("\\."))
+                ));
+        }
+
+        BulkWriteOptions bulkWriteOptions = new BulkWriteOptions();
+        bulkWriteOptions.ordered(ordered);
+        bulkWriteOptions.bypassDocumentValidation(bypass);
+
+        try (
+            final InputStream inStream = session.read(flowFile);
+            final RecordReader reader = 
recordParserFactory.createRecordReader(flowFile, inStream, getLogger());
+        ) {
             RecordSchema schema = reader.getSchema();
+
+            final MongoCollection<Document> collection = 
getCollection(context, flowFile).withWriteConcern(writeConcern);
+
+            List<WriteModel<Document>> writeModels = new ArrayList<>();
+
             Record record;
             while ((record = reader.nextRecord()) != null) {
                 // Convert each Record to HashMap and put into the Mongo 
document
@@ -135,17 +224,43 @@ public class PutMongoRecord extends 
AbstractMongoProcessor {
                 for (String name : schema.getFieldNames()) {
                     document.put(name, contentMap.get(name));
                 }
-                inserts.add(convertArrays(document));
-                if (inserts.size() == ceiling) {
-                    collection.insertMany(inserts);
-                    added += inserts.size();
-                    inserts = new ArrayList<>();
+                Document readyToUpsert = convertArrays(document);
+
+                WriteModel<Document> writeModel;
+                if (context.getProperty(UPDATE_KEY_FIELDS).isSet()) {
+                    Bson[] filters = 
buildFilters(updateKeyFieldPathToFieldChain, readyToUpsert);
+
+                    if (updateModeMatches(UPDATE_ONE.getValue(), context, 
flowFile)) {
+                        writeModel = new UpdateOneModel<>(
+                            Filters.and(filters),
+                            new Document("$set", readyToUpsert),
+                            new UpdateOptions().upsert(true)
+                        );
+                    } else if (updateModeMatches(UPDATE_MANY.getValue(), 
context, flowFile)) {
+                        writeModel = new UpdateManyModel<>(
+                            Filters.and(filters),
+                            new Document("$set", readyToUpsert),
+                            new UpdateOptions().upsert(true)
+                        );
+                    } else {
+                        String flowfileUpdateMode = 
flowFile.getAttribute(MONGODB_UPDATE_MODE);
+                        throw new ProcessException("Unrecognized '" + 
MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'");
+                    }
+                } else {
+                    writeModel = new InsertOneModel<>(readyToUpsert);
+                }
+
+                writeModels.add(writeModel);
+                if (writeModels.size() == ceiling) {
+                    collection.bulkWrite(writeModels, bulkWriteOptions);
+                    written += writeModels.size();
+                    writeModels = new ArrayList<>();
                 }
             }
-            if (inserts.size() > 0) {
-                collection.insertMany(inserts);
+            if (writeModels.size() > 0) {
+                collection.bulkWrite(writeModels, bulkWriteOptions);
             }
-        } catch (SchemaNotFoundException | IOException | 
MalformedRecordException | MongoException e) {
+        } catch (ProcessException | SchemaNotFoundException | IOException | 
MalformedRecordException | MongoException e) {
             getLogger().error("PutMongoRecord failed with error:", e);
             session.transfer(flowFile, REL_FAILURE);
             error = true;
@@ -154,9 +269,9 @@ public class PutMongoRecord extends AbstractMongoProcessor {
                 String url = clientService != null
                         ? clientService.getURI()
                         : 
context.getProperty(URI).evaluateAttributeExpressions().getValue();
-                session.getProvenanceReporter().send(flowFile, url, 
String.format("Added %d documents to MongoDB.", added));
+                session.getProvenanceReporter().send(flowFile, url, 
String.format("Written %d documents to MongoDB.", written));
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Inserted {} records into MongoDB", new 
Object[]{ added });
+                getLogger().info("Written {} records into MongoDB", new 
Object[]{ written });
             }
         }
     }
@@ -190,4 +305,48 @@ public class PutMongoRecord extends AbstractMongoProcessor 
{
 
         return retVal;
     }
+
+    private Bson[] buildFilters(Map<String, List<String>> 
updateKeyFieldPathToFieldChain, Document readyToUpsert) {
+        Bson[] filters = updateKeyFieldPathToFieldChain.entrySet()
+            .stream()
+            .map(updateKeyFieldPath__fieldChain -> {
+                String fieldPath = updateKeyFieldPath__fieldChain.getKey();
+                List<String> fieldChain = 
updateKeyFieldPath__fieldChain.getValue();
+
+                Object value = readyToUpsert;
+                String previousField = null;
+                for (String field : fieldChain) {
+                    if (!(value instanceof Map)) {
+                        throw new ProcessException("field '" + previousField + 
"' (from field expression '" + fieldPath + "') is not an embedded document");
+                    }
+
+                    value = ((Map) value).get(field);
+
+                    if (value == null) {
+                        throw new ProcessException("field '" + field + "' 
(from field expression '" + fieldPath + "') has no value");
+                    }
+
+                    previousField = field;
+                }
+
+                Bson filter = Filters.eq(fieldPath, value);
+                return filter;
+            })
+            .toArray(Bson[]::new);
+
+        return filters;
+    }
+
+    private boolean updateModeMatches(String updateModeToMatch, ProcessContext 
context, FlowFile flowFile) {
+        String updateMode = context.getProperty(UPDATE_MODE).getValue();
+
+        boolean updateModeMatches = updateMode.equals(updateModeToMatch)
+            ||
+            (
+                updateMode.equals(UPDATE_FF_ATTRIBUTE.getValue())
+                    && 
updateModeToMatch.equalsIgnoreCase(flowFile.getAttribute(MONGODB_UPDATE_MODE))
+            );
+
+        return updateModeMatches;
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
index a30a76c..e57769b 100644
--- 
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
+++ 
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/PutMongoRecordIT.java
@@ -45,12 +45,16 @@ import org.junit.jupiter.api.Test;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -243,4 +247,457 @@ public class PutMongoRecordIT extends MongoWriteTestBase {
         runner.assertTransferCount(PutMongoRecord.REL_FAILURE, 0);
         runner.assertTransferCount(PutMongoRecord.REL_SUCCESS, 1);
     }
+
+    @Test
+    void testUpsertAsInsert() throws Exception {
+        // GIVEN
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+        ));
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Arrays.asList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "name1");
+                    put("age", 21);
+                }})},
+                new Object[]{2, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "name2");
+                    put("age", 22);
+                }})}
+            )
+        );
+
+        Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+            new HashMap<String, Object>() {{
+                put("id", 1);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "name1");
+                    put("age", 21);
+                }}));
+            }},
+            new HashMap<String, Object>() {{
+                put("id", 2);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "name2");
+                    put("age", 22);
+                }}));
+            }}
+        ));
+
+        // WHEN
+        // THEN
+        testUpsertSuccess(runner, inputs, expected);
+    }
+
+    @Test
+    void testUpsertAsUpdate() throws Exception {
+        // GIVEN
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+        ));
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Arrays.asList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "updating_name1");
+                    put("age", "age1".length());
+                }})},
+                new Object[]{2, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "name2");
+                    put("age", "updating_age2".length());
+                }})}
+            ),
+            Arrays.asList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "updated_name1");
+                    put("age", "age1".length());
+                }})},
+                new Object[]{2, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "name2");
+                    put("age", "updated_age2".length());
+                }})}
+            )
+        );
+
+        Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+            new HashMap<String, Object>() {{
+                put("id", 1);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "updated_name1");
+                    put("age", "age1".length());
+                }}));
+            }},
+            new HashMap<String, Object>() {{
+                put("id", 2);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "name2");
+                    put("age", "updated_age2".length());
+                }}));
+            }}
+        ));
+
+        // WHEN
+        testUpsertSuccess(runner, inputs, expected);
+    }
+
+    @Test
+    void testUpsertAsInsertAndUpdate() throws Exception {
+        // GIVEN
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "id");
+
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+        ));
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Collections.singletonList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "updating_name1");
+                    put("age", "updating_age1".length());
+                }})}
+            ),
+            Arrays.asList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "updated_name1");
+                    put("age", "updated_age1".length());
+                }})},
+                new Object[]{2, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "inserted_name2");
+                    put("age", "inserted_age2".length());
+                }})}
+            )
+        );
+
+        Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+            new HashMap<String, Object>() {{
+                put("id", 1);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "updated_name1");
+                    put("age", "updated_age1".length());
+                }}));
+            }},
+            new HashMap<String, Object>() {{
+                put("id", 2);
+                put("person", new Document(new HashMap<String, Object>() {{
+                    put("name", "inserted_name2");
+                    put("age", "inserted_age2".length());
+                }}));
+            }}
+        ));
+
+        // WHEN
+        testUpsertSuccess(runner, inputs, expected);
+    }
+
+    @Test
+    void testRouteToFailureWhenKeyFieldDoesNotExist() throws Exception {
+        // GIVEN
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, 
"id,non_existent_field");
+
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+        ));
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Collections.singletonList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "unimportant");
+                    put("age", "unimportant".length());
+                }})}
+            )
+        );
+
+        // WHEN
+        // THEN
+        testUpsertFailure(runner, inputs);
+    }
+
+    @Test
+    void testUpdateMany() throws Exception {
+        // GIVEN
+        TestRunner initRunner = init();
+
+        // Init Mongo data
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("team", RecordFieldType.STRING);
+        recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+        List<Object[]> init = Arrays.asList(
+            new Object[]{"Joe", "A", "green"},
+            new Object[]{"Jane", "A", "green"},
+            new Object[]{"Jeff", "B", "blue"},
+            new Object[]{"Janet", "B", "blue"}
+        );
+
+        init.forEach(recordReader::addRecord);
+
+        initRunner.enqueue("");
+        initRunner.run();
+
+        // Update Mongo data
+        setup();
+        TestRunner updateRunner = init();
+
+        updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_MANY.getValue());
+
+        recordReader.addSchemaField("team", RecordFieldType.STRING);
+        recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Arrays.asList(
+                new Object[]{"A", "yellow"},
+                new Object[]{"B", "red"}
+            )
+        );
+
+        Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+            new HashMap<String, Object>() {{
+                put("name", "Joe");
+                put("team", "A");
+                put("color", "yellow");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Jane");
+                put("team", "A");
+                put("color", "yellow");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Jeff");
+                put("team", "B");
+                put("color", "red");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Janet");
+                put("team", "B");
+                put("color", "red");
+            }}
+        ));
+
+        // WHEN
+        // THEN
+        testUpsertSuccess(updateRunner, inputs, expected);
+    }
+
+    @Test
+    void testUpdateModeFFAttributeSetToMany() throws Exception {
+        // GIVEN
+        TestRunner initRunner = init();
+
+        // Init Mongo data
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("team", RecordFieldType.STRING);
+        recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+        List<Object[]> init = Arrays.asList(
+            new Object[]{"Joe", "A", "green"},
+            new Object[]{"Jane", "A", "green"},
+            new Object[]{"Jeff", "B", "blue"},
+            new Object[]{"Janet", "B", "blue"}
+        );
+
+        init.forEach(recordReader::addRecord);
+
+        initRunner.enqueue("");
+        initRunner.run();
+
+        // Update Mongo data
+        setup();
+        TestRunner updateRunner = init();
+
+        updateRunner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+        updateRunner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+
+        recordReader.addSchemaField("team", RecordFieldType.STRING);
+        recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Arrays.asList(
+                new Object[]{"A", "yellow"},
+                new Object[]{"B", "red"}
+            )
+        );
+
+        Set<Map<String, Object>> expected = new HashSet<>(Arrays.asList(
+            new HashMap<String, Object>() {{
+                put("name", "Joe");
+                put("team", "A");
+                put("color", "yellow");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Jane");
+                put("team", "A");
+                put("color", "yellow");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Jeff");
+                put("team", "B");
+                put("color", "red");
+            }},
+            new HashMap<String, Object>() {{
+                put("name", "Janet");
+                put("team", "B");
+                put("color", "red");
+            }}
+        ));
+
+        // WHEN
+        inputs.forEach(input -> {
+            input.forEach(recordReader::addRecord);
+
+            MockFlowFile flowFile = new MockFlowFile(1);
+            flowFile.putAttributes(new HashMap<String, String>(){{
+                put(PutMongoRecord.MONGODB_UPDATE_MODE, "many");
+            }});
+            updateRunner.enqueue(flowFile);
+            updateRunner.run();
+        });
+
+        // THEN
+        assertEquals(0, updateRunner.getQueueSize().getObjectCount());
+
+        updateRunner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 
inputs.size());
+
+        Set<Map<String, Object>> actual = new HashSet<>();
+        for (Document document : collection.find()) {
+            actual.add(document.entrySet().stream()
+                .filter(key__value -> !key__value.getKey().equals("_id"))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+        }
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    void testRouteToFailureWhenUpdateModeFFAttributeSetToInvalid() throws 
Exception {
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, "team");
+        runner.setProperty(PutMongoRecord.UPDATE_MODE, 
PutMongoRecord.UPDATE_FF_ATTRIBUTE.getValue());
+
+        recordReader.addSchemaField("team", RecordFieldType.STRING);
+        recordReader.addSchemaField("color", RecordFieldType.STRING);
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Arrays.asList(
+                new Object[]{"A", "yellow"},
+                new Object[]{"B", "red"}
+            )
+        );
+
+        // WHEN
+        // THEN
+        testUpsertFailure(runner, inputs);
+    }
+
+    @Test
+    void testRouteToFailureWhenKeyFieldReferencesNonEmbeddedDocument() throws 
Exception {
+        // GIVEN
+        TestRunner runner = init();
+
+        runner.setProperty(PutMongoRecord.UPDATE_KEY_FIELDS, 
"id,id.is_not_an_embedded_document");
+
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("person", RecordFieldType.RECORD);
+
+        final RecordSchema personSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("name", RecordFieldType.STRING.getDataType()),
+            new RecordField("age", RecordFieldType.INT.getDataType())
+        ));
+
+        List<List<Object[]>> inputs = Arrays.asList(
+            Collections.singletonList(
+                new Object[]{1, new MapRecord(personSchema, new 
HashMap<String, Object>() {{
+                    put("name", "unimportant");
+                    put("age", "unimportant".length());
+                }})}
+            )
+        );
+
+        // WHEN
+        // THEN
+        testUpsertFailure(runner, inputs);
+    }
+
+    private void testUpsertSuccess(TestRunner runner, List<List<Object[]>> 
inputs, Set<Map<String, Object>> expected) {
+        // GIVEN
+
+        // WHEN
+        inputs.forEach(input -> {
+            input.forEach(recordReader::addRecord);
+
+            runner.enqueue("");
+            runner.run();
+        });
+
+        // THEN
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+
+        runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_SUCCESS, 
inputs.size());
+
+        Set<Map<String, Object>> actual = new HashSet<>();
+        for (Document document : collection.find()) {
+            actual.add(document.entrySet().stream()
+                .filter(key__value -> !key__value.getKey().equals("_id"))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+        }
+
+        assertEquals(expected, actual);
+    }
+
+    private void testUpsertFailure(TestRunner runner, List<List<Object[]>> 
inputs) {
+        // GIVEN
+        Set<Object> expected = Collections.emptySet();
+
+        // WHEN
+        inputs.forEach(input -> {
+            input.forEach(recordReader::addRecord);
+
+            runner.enqueue("");
+            runner.run();
+        });
+
+        // THEN
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+
+        runner.assertAllFlowFilesTransferred(PutMongoRecord.REL_FAILURE, 
inputs.size());
+
+        Set<Map<String, Object>> actual = new HashSet<>();
+        for (Document document : collection.find()) {
+            actual.add(document.entrySet().stream()
+                .filter(key__value -> !key__value.getKey().equals("_id"))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue)));
+        }
+
+        assertEquals(expected, actual);
+    }
 }

Reply via email to