dan-s1 commented on code in PR #8610: URL: https://github.com/apache/nifi/pull/8610#discussion_r1572451044
########## nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java: ########## @@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); // parse - final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) + final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && flowfileType.equals(UPDATE_WITH_DOC.getValue()))) ? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset)); - if (MODE_INSERT.equalsIgnoreCase(mode)) { + if (MODE_INSERT.equalsIgnoreCase(processorMode)) { collection.insertOne((Document)doc); logger.info("inserted {} into MongoDB", new Object[] { flowFile }); } else { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue(); final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); - final Document query; + final Document updateQuery; - if (!StringUtils.isBlank(updateKey)) { - query = parseUpdateKey(updateKey, (Map)doc); + if (StringUtils.isNotBlank(updateKey)) { + updateQuery = parseUpdateKey(updateKey, (Map)doc); removeUpdateKeys(updateKey, (Map)doc); } else { - query = Document.parse(filterQuery); + updateQuery = Document.parse(filterQuery); } - - if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { - collection.replaceOne(query, (Document)doc, new ReplaceOptions().upsert(upsert)); + UpdateResult updateResult; + if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) { + updateResult = collection.replaceOne(updateQuery, (Document)doc, new ReplaceOptions().upsert(upsert)); } else { BasicDBObject update = (BasicDBObject)doc; update.remove(updateKey); - collection.updateOne(query, update, new UpdateOptions().upsert(upsert)); + UpdateOptions updateOptions = new UpdateOptions().upsert(upsert); + PropertyValue updateQueryMode = context.getProperty(MONGO_UPDATE_MODE); + + if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, updateQueryMode, flowFile)) { + updateResult = collection.updateOne(updateQuery, update, updateOptions); + } else if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, updateQueryMode, flowFile)) { + updateResult = collection.updateMany(updateQuery, update,updateOptions); + } else { + String flowfileUpdateMode = flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE); + throw new ProcessException("Unrecognized '" + PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'"); + } + } + flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount())); Review Comment: ```suggestion } flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount())); ``` ########## nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java: ########## @@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); // parse - final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) + final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && flowfileType.equals(UPDATE_WITH_DOC.getValue()))) ? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset)); - if (MODE_INSERT.equalsIgnoreCase(mode)) { + if (MODE_INSERT.equalsIgnoreCase(processorMode)) { collection.insertOne((Document)doc); logger.info("inserted {} into MongoDB", new Object[] { flowFile }); } else { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue(); final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); - final Document query; + final Document updateQuery; - if (!StringUtils.isBlank(updateKey)) { - query = parseUpdateKey(updateKey, (Map)doc); + if (StringUtils.isNotBlank(updateKey)) { + updateQuery = parseUpdateKey(updateKey, (Map)doc); removeUpdateKeys(updateKey, (Map)doc); } else { - query = Document.parse(filterQuery); + updateQuery = Document.parse(filterQuery); } - - if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { - collection.replaceOne(query, (Document)doc, new ReplaceOptions().upsert(upsert)); + UpdateResult updateResult; + if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) { + updateResult = collection.replaceOne(updateQuery, (Document)doc, new ReplaceOptions().upsert(upsert)); } else { BasicDBObject update = (BasicDBObject)doc; update.remove(updateKey); - collection.updateOne(query, update, new UpdateOptions().upsert(upsert)); + UpdateOptions updateOptions = new UpdateOptions().upsert(upsert); + PropertyValue updateQueryMode = context.getProperty(MONGO_UPDATE_MODE); + + if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, updateQueryMode, flowFile)) { + updateResult = collection.updateOne(updateQuery, update, updateOptions); + } else if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, updateQueryMode, flowFile)) { + updateResult = collection.updateMany(updateQuery, update,updateOptions); + } else { + String flowfileUpdateMode = flowFile.getAttribute(PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE); + throw new ProcessException("Unrecognized '" + PutMongoHelper.ATTRIBUTE_MONGODB_UPDATE_MODE + "' value '" + flowfileUpdateMode + "'"); + } + } + flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MATCH_COUNT, String.valueOf(updateResult.getMatchedCount())); + flowFile = session.putAttribute(flowFile, ATTRIBUTE_UPDATE_MODIFY_COUNT, String.valueOf(updateResult.getModifiedCount())); + BsonValue upsertedId = updateResult.getUpsertedId(); + if (upsertedId != null) { + String id = upsertedId.isString()? upsertedId.asString().getValue(): upsertedId.asObjectId().getValue().toString(); Review Comment: ```suggestion String id = upsertedId.isString() ? upsertedId.asString().getValue() : upsertedId.asObjectId().getValue().toString(); ``` ########## nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java: ########## @@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); // parse - final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) + final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && flowfileType.equals(UPDATE_WITH_DOC.getValue()))) ? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset)); - if (MODE_INSERT.equalsIgnoreCase(mode)) { + if (MODE_INSERT.equalsIgnoreCase(processorMode)) { Review Comment: Pick either `.equals` or `.equalsIgnoreCase` for both lines. I am leaning though towards `.equals`. ########## nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java: ########## @@ -202,32 +225,49 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.read(flowFile, in -> StreamUtils.fillBuffer(in, content, true)); // parse - final Object doc = (mode.equals(MODE_INSERT) || (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue()))) + final Object doc = (processorMode.equals(MODE_INSERT) || (processorMode.equals(MODE_UPDATE) && flowfileType.equals(UPDATE_WITH_DOC.getValue()))) ? Document.parse(new String(content, charset)) : BasicDBObject.parse(new String(content, charset)); - if (MODE_INSERT.equalsIgnoreCase(mode)) { + if (MODE_INSERT.equalsIgnoreCase(processorMode)) { collection.insertOne((Document)doc); logger.info("inserted {} into MongoDB", new Object[] { flowFile }); } else { // update final boolean upsert = context.getProperty(UPSERT).asBoolean(); final String updateKey = context.getProperty(UPDATE_QUERY_KEY).evaluateAttributeExpressions(flowFile).getValue(); final String filterQuery = context.getProperty(UPDATE_QUERY).evaluateAttributeExpressions(flowFile).getValue(); - final Document query; + final Document updateQuery; - if (!StringUtils.isBlank(updateKey)) { - query = parseUpdateKey(updateKey, (Map)doc); + if (StringUtils.isNotBlank(updateKey)) { + updateQuery = parseUpdateKey(updateKey, (Map)doc); removeUpdateKeys(updateKey, (Map)doc); } else { - query = Document.parse(filterQuery); + updateQuery = Document.parse(filterQuery); } - - if (updateMode.equals(UPDATE_WITH_DOC.getValue())) { - collection.replaceOne(query, (Document)doc, new ReplaceOptions().upsert(upsert)); + UpdateResult updateResult; + if (flowfileType.equals(UPDATE_WITH_DOC.getValue())) { + updateResult = collection.replaceOne(updateQuery, (Document)doc, new ReplaceOptions().upsert(upsert)); } else { BasicDBObject update = (BasicDBObject)doc; update.remove(updateKey); - collection.updateOne(query, update, new UpdateOptions().upsert(upsert)); + UpdateOptions updateOptions = new UpdateOptions().upsert(upsert); + PropertyValue updateQueryMode = context.getProperty(MONGO_UPDATE_MODE); + + if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_ONE, updateQueryMode, flowFile)) { + updateResult = collection.updateOne(updateQuery, update, updateOptions); + } else if (PutMongoHelper.updateModeMatches(MongoUpdateOption.UPDATE_MANY, updateQueryMode, flowFile)) { + updateResult = collection.updateMany(updateQuery, update,updateOptions); Review Comment: ```suggestion updateResult = collection.updateMany(updateQuery, update, updateOptions); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org