dan-s1 commented on code in PR #8610:
URL: https://github.com/apache/nifi/pull/8610#discussion_r1572451044


##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
 
             // parse
-            final Object doc = (mode.equals(MODE_INSERT) || 
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+            final Object doc = (processorMode.equals(MODE_INSERT) || 
(processorMode.equals(MODE_UPDATE) && 
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
                     ? Document.parse(new String(content, charset)) : 
BasicDBObject.parse(new String(content, charset));
 
-            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+            if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
                 collection.insertOne((Document)doc);
                 logger.info("inserted {} into MongoDB", new Object[] { 
flowFile });
             } else {
                 // update
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                 final String filterQuery = 
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
-                final Document query;
+                final Document updateQuery;
 
-                if (!StringUtils.isBlank(updateKey)) {
-                    query = parseUpdateKey(updateKey, (Map)doc);
+                if (StringUtils.isNotBlank(updateKey)) {
+                    updateQuery = parseUpdateKey(updateKey, (Map)doc);
                     removeUpdateKeys(updateKey, (Map)doc);
                 } else {
-                    query = Document.parse(filterQuery);
+                    updateQuery = Document.parse(filterQuery);
                 }
-
-                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
-                    collection.replaceOne(query, (Document)doc, new 
ReplaceOptions().upsert(upsert));
+                UpdateResult updateResult;
+                if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+                    updateResult = collection.replaceOne(updateQuery, 
(Document)doc, new ReplaceOptions().upsert(upsert));
                 } else {
                     BasicDBObject update = (BasicDBObject)doc;
                     update.remove(updateKey);
-                    collection.updateOne(query, update, new 
UpdateOptions().upsert(upsert));
+                    UpdateOptions updateOptions = new 
UpdateOptions().upsert(upsert);
+                    PropertyValue updateQueryMode = 
context.getProperty(MONGO_UPDATE_MODE);
+
+                    if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateOne(updateQuery, 
update, updateOptions);
+                    } else if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateMany(updateQuery, 
update,updateOptions);
+                    } else {
+                        String flowfileUpdateMode = 
flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE);
+                        throw new ProcessException("Unrecognized '" + 
PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode 
+ "'");
+                    }
+                }
+                flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));

Review Comment:
   ```suggestion
                   }
                   
                   flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
   ```



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
 
             // parse
-            final Object doc = (mode.equals(MODE_INSERT) || 
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+            final Object doc = (processorMode.equals(MODE_INSERT) || 
(processorMode.equals(MODE_UPDATE) && 
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
                     ? Document.parse(new String(content, charset)) : 
BasicDBObject.parse(new String(content, charset));
 
-            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+            if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
                 collection.insertOne((Document)doc);
                 logger.info("inserted {} into MongoDB", new Object[] { 
flowFile });
             } else {
                 // update
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                 final String filterQuery = 
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
-                final Document query;
+                final Document updateQuery;
 
-                if (!StringUtils.isBlank(updateKey)) {
-                    query = parseUpdateKey(updateKey, (Map)doc);
+                if (StringUtils.isNotBlank(updateKey)) {
+                    updateQuery = parseUpdateKey(updateKey, (Map)doc);
                     removeUpdateKeys(updateKey, (Map)doc);
                 } else {
-                    query = Document.parse(filterQuery);
+                    updateQuery = Document.parse(filterQuery);
                 }
-
-                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
-                    collection.replaceOne(query, (Document)doc, new 
ReplaceOptions().upsert(upsert));
+                UpdateResult updateResult;
+                if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+                    updateResult = collection.replaceOne(updateQuery, 
(Document)doc, new ReplaceOptions().upsert(upsert));
                 } else {
                     BasicDBObject update = (BasicDBObject)doc;
                     update.remove(updateKey);
-                    collection.updateOne(query, update, new 
UpdateOptions().upsert(upsert));
+                    UpdateOptions updateOptions = new 
UpdateOptions().upsert(upsert);
+                    PropertyValue updateQueryMode = 
context.getProperty(MONGO_UPDATE_MODE);
+
+                    if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateOne(updateQuery, 
update, updateOptions);
+                    } else if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateMany(updateQuery, 
update,updateOptions);
+                    } else {
+                        String flowfileUpdateMode = 
flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE);
+                        throw new ProcessException("Unrecognized '" + 
PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode 
+ "'");
+                    }
+                }
+                flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount()));
+                flowFile = session.putAttribute(flowFile, 
ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount()));
+                BsonValue upsertedId = updateResult.getUpsertedId();
+                if (upsertedId != null) {
+                    String id = upsertedId.isString()? 
upsertedId.asString().getValue(): upsertedId.asObjectId().getValue().toString();

Review Comment:
   ```suggestion
                       String id = upsertedId.isString() ? 
upsertedId.asString().getValue() : 
upsertedId.asObjectId().getValue().toString();
   ```



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
 
             // parse
-            final Object doc = (mode.equals(MODE_INSERT) || 
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+            final Object doc = (processorMode.equals(MODE_INSERT) || 
(processorMode.equals(MODE_UPDATE) && 
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
                     ? Document.parse(new String(content, charset)) : 
BasicDBObject.parse(new String(content, charset));
 
-            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+            if (MODE_INSERT.equalsIgnoreCase(processorMode)) {

Review Comment:
   Pick either `.equals` or `.equalsIgnoreCase` for both lines. I am leaning 
though towards `.equals`.



##########
nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java:
##########
@@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, 
true));
 
             // parse
-            final Object doc = (mode.equals(MODE_INSERT) || 
(mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
+            final Object doc = (processorMode.equals(MODE_INSERT) || 
(processorMode.equals(MODE_UPDATE) && 
flowfileType.equals(UPDATE_WITH_DOC.getValue())))
                     ? Document.parse(new String(content, charset)) : 
BasicDBObject.parse(new String(content, charset));
 
-            if (MODE_INSERT.equalsIgnoreCase(mode)) {
+            if (MODE_INSERT.equalsIgnoreCase(processorMode)) {
                 collection.insertOne((Document)doc);
                 logger.info("inserted {} into MongoDB", new Object[] { 
flowFile });
             } else {
                 // update
                 final boolean upsert = context.getProperty(UPSERT).asBoolean();
                 final String updateKey = 
context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue();
                 final String filterQuery = 
context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue();
-                final Document query;
+                final Document updateQuery;
 
-                if (!StringUtils.isBlank(updateKey)) {
-                    query = parseUpdateKey(updateKey, (Map)doc);
+                if (StringUtils.isNotBlank(updateKey)) {
+                    updateQuery = parseUpdateKey(updateKey, (Map)doc);
                     removeUpdateKeys(updateKey, (Map)doc);
                 } else {
-                    query = Document.parse(filterQuery);
+                    updateQuery = Document.parse(filterQuery);
                 }
-
-                if (updateMode.equals(UPDATE_WITH_DOC.getValue())) {
-                    collection.replaceOne(query, (Document)doc, new 
ReplaceOptions().upsert(upsert));
+                UpdateResult updateResult;
+                if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) {
+                    updateResult = collection.replaceOne(updateQuery, 
(Document)doc, new ReplaceOptions().upsert(upsert));
                 } else {
                     BasicDBObject update = (BasicDBObject)doc;
                     update.remove(updateKey);
-                    collection.updateOne(query, update, new 
UpdateOptions().upsert(upsert));
+                    UpdateOptions updateOptions = new 
UpdateOptions().upsert(upsert);
+                    PropertyValue updateQueryMode = 
context.getProperty(MONGO_UPDATE_MODE);
+
+                    if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateOne(updateQuery, 
update, updateOptions);
+                    } else if 
(PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, 
updateQueryMode, flowFile)) {
+                        updateResult = collection.updateMany(updateQuery, 
update,updateOptions);

Review Comment:
   ```suggestion
                           updateResult = collection.updateMany(updateQuery, 
update, updateOptions);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to