What we've got is a bunch of custom NiFi processors that are processing
data.  As the data is processed, we currently use Spring to load up a DAO
to access Mongo and annotate the file is complete.  Then we send the
content to ElasticSearch.  As it turns out, we can simplify our process
quite a bit by using standard processors instead of our custom ones, so
we're trying to pull out the Mongo updates from occurring.  We've already
got it setup to be flow-scoped with Expression Language to define the
collection and query id, but not the actual update itself.  We want to be
able to dynamically set different fields on the update based on NiFi
attributes, like ${progress}, ${priority}, etc.

The issue I'm having isn't that I need to extract JSON from the FlowFile
Content into an attribute, it's that the FlowFile Attribute values need to
be stored in Mongo - which in the current implementation, updates to Mongo
appear to only be possible from the FlowFile Content read in.


On Wed, Jun 27, 2018 at 11:10 PM Andy LoPresto <[email protected]> wrote:

> Ryan,
>
> I believe what you are describing as the current behavior is accurate. I
> don’t quite follow your request to allow a property on the processor to
> define the content that gets used in the query, as that seems like it would
> be fairly static. However, I am not in the key demographic for the Mongo
> processors (as in, I only stand it up to review someone’s PR). In general,
> NiFi processor properties are designed to be “flow-scoped” rather than
> “single iteration of an operation-scoped” — i.e. flowfiles are flowing
> through processors and each respective operation is performed given the
> context at the time, rather than a job scheduling or “one-time activity”
> tool. Maybe that’s where the disconnect is if your request is more along
> those lines.
>
> The Update Query property does support NiFi Expression Language though, so
> you could set that property value to be “${update_query}” and ensure that
> the update_query attribute is set on incoming flowfiles. For each flowfile,
> the operation would occur with that dynamic query. You could use the
> EvaluateJsonPath processor preceding this to extract JSON components from
> flowfile content into an attribute.
>
> If you need an immediate fix, you can use the GenerateFlowFile processor
> to generate a flowfile which has the static content you’re looking for, and
> pass that to the PutMongo processor.
>
>
> Andy LoPresto
> [email protected]
> *[email protected] <[email protected]>*
> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>
> On Jun 27, 2018, at 7:45 PM, Ryan Hendrickson <
> [email protected]> wrote:
>
> I think something must be getting lost in my email.  The $set operator
> does work.  I tested it today, however, the NiFi processor requires that
> input to be passed in as FlowFile.  I'm recommending is that instead of
> using the FlowFile Content as the doc that updates the db, alternatively,
> it could be a NiFi Property that's set.
>
> Check out the following key lines..
>
> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java
>
> Line 197            session.read(flowFile, in ->
> StreamUtils.fillBuffer(in, content, true));
> Line 200            final Object doc = (mode.equals(MODE_INSERT) ||
> (mode.equals(MODE_UPDATE) && updateMode.equals(UPDATE_WITH_DOC.getValue())))
> Line 201                    ? Document.parse(new String(content, charset))
> : JSON.parse(new String(content, charset));
> Line 223             BasicDBObject update = (BasicDBObject)doc;
> Line 225             collection.updateOne(query, update, new
> UpdateOptions().upsert(upsert));
>
> So on Line 223, if I'm reading this right, instead of using the doc, which
> is the FlowFile content, just grab the update portion of the syntax,
> expressed as a NiFi Property UPDATE ( {"$set": {"status":"Stage_2"}} ),
> such as: context.getProperty(UPDATE).getValue(), and cast that to
> BasicDBObject, instead of doc.
>
> Does that describe a little better what I mean?
>
>
> On Wed, Jun 27, 2018 at 7:37 PM Mike Thomsen <[email protected]>
> wrote:
>
>> Ryan,
>>
>> FWIW, Mongo's own Java client doesn't work like that. See the update
>> methods it exposes here:
>>
>>
>> http://api.mongodb.com/java/current/com/mongodb/client/MongoCollection.html#findOneAndUpdate-org.bson.conversions.Bson-org.bson.conversions.Bson-
>>
>> The single parameter update() method doesn't exist on MongoCollection.
>>
>> It can be pretty tricky going back and forth between the Mongo shell and
>> the client APIs.
>>
>> On Wed, Jun 27, 2018 at 5:31 PM Ryan Hendrickson <
>> [email protected]> wrote:
>>
>>> Mike, Matt,
>>>    Thanks for the help.  Mike - the one json thing was a typo.
>>>
>>>    I just got this working as the following:
>>>
>>> File on disk, testQuery.json: {"$set":{"status":"Stage_2"}}
>>> GetFile ----FlowFile---> PutMongo
>>>
>>> NiFi Processor Properties:
>>>    Mode: update
>>>    Upsert: false
>>>    Update Query Key: No value set
>>>    Update Query: {"customId":"abc"}
>>>    Update Mode: With operators enabled
>>>
>>>   This feels like a pretty contrived example here that got it working.
>>> I'm not aware of any of the current processors that output with the Mongo
>>> Operators in a JSON file, which would mean if I'm operating on straight
>>> JSON, I'd have to manipulate it in some way to get that in there.  That
>>> seems fairly complicated.  Is there a simpler way to do this and I'm
>>> missing it?
>>>
>>> Just some background --
>>> What I'm trying to do:
>>>    I have the ID of a record in the database, and I only want to update
>>> 1 field.  I could take the ID and do a GetMongo -> JoltTransform ->
>>> PutMongo and replace the entire document, but that seems like a lot of
>>> processors for this use-case.
>>>
>>> My initial approach/recommendation:
>>>    I initially thought the Update Query property would take what Mongo's
>>> CLI is expecting {"customId":{"$customId"},{"$set": {"status":"Stage_2"}}.
>>> Instead, it works by splitting on the comma - (1) Query as Property, (2)
>>> Update with Operator as FlowFile.  I think supporting an update using
>>> operators that's set within the NiFi Processor Properties, vs in the
>>> incoming FlowFile could make this processor a lot more flexible.
>>>
>>> Thanks,
>>> Ryan
>>>
>>> On Wed, Jun 27, 2018 at 4:26 PM Matt Burgess <[email protected]>
>>> wrote:
>>>
>>>> If "customId" is a flow file attribute (and the property supports
>>>> expression language), then you just have your braces and $ swapped,
>>>> try ${customId} instead of {$customId}
>>>> On Wed, Jun 27, 2018 at 4:15 PM Mike Thomsen <[email protected]>
>>>> wrote:
>>>> >
>>>> > > can you clarify if you mean the NiFi Processor Property "Update
>>>> Query", or the FlowFile require proper json?
>>>> >
>>>> > Both.
>>>> >
>>>> > > I'm getting an error:  MongoDB due to redstartDocument can only be
>>>> called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is Array.
>>>> >
>>>> > PutMongo does not support arrays.
>>>> >
>>>> > > I'm trying to do the latter, "specify a document that contains
>>>> update operators"... On the mongo command line, the update would be:
>>>> > > db.collection.update({"customId":{$customId},{$set:
>>>> {"status":"Stage_2"}});
>>>> >
>>>> > I don' t know if that's a typo, but it should be
>>>> {"customId":{$customId}} followed by {$set: {"status":"Stage_2"}}. Your
>>>> version was one big document. What we would expect is a query like {
>>>> "customId": "XYZ"}. As mentioned, $customId is not a valid JSON value.
>>>> >
>>>> > Let me know if that helps.
>>>> >
>>>> > Mike
>>>> >
>>>> > On Wed, Jun 27, 2018 at 3:15 PM Ryan Hendrickson <
>>>> [email protected]> wrote:
>>>> >>
>>>> >> Hi Mike,
>>>> >>    Just curious - any other suggestions?
>>>> >>
>>>> >> Thanks,
>>>> >> Ryan
>>>> >>
>>>> >> On Thu, Jun 21, 2018 at 5:23 PM Ryan Hendrickson <
>>>> [email protected]> wrote:
>>>> >>>
>>>> >>> Thanks for the suggestions, can you clarify if you mean the NiFi
>>>> Processor Property "Update Query", or the FlowFile require proper json?
>>>> I'm not sure how to get proper json with the $set in there.
>>>> >>>
>>>> >>> I made the following modifications based it:
>>>> >>>
>>>> >>> NiFi Processors Properties:
>>>> >>>    Update Query: [{"customId":{$customId}},{"$set":
>>>> {"status":"Stage_2"}}]
>>>> >>>    Update Mode: With operators enabled  --- Confirmed that I've
>>>> been using this.
>>>> >>>
>>>> >>> FlowFile Contents: [{"customId":{$customId}},{"$set":
>>>> {"status":"Stage_2"}}]
>>>> >>>
>>>> >>> I'm getting an error:  MongoDB due to redstartDocument can only be
>>>> called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is Array.
>>>> >>>
>>>> >>> The NiFi Docs for Put Mongo say [1]:
>>>> >>> Update Query: Specify a full MongoDB query to be used for the
>>>> lookup query to do an update/upsert.  Supports Expression Language: true
>>>> >>> Update Mode: Choose an update mode. You can either supply a JSON
>>>> document to use as a direct replacement or specify a document that contains
>>>> update operators like $set and $unset.
>>>> >>>
>>>> >>> I'm trying to do the latter, "specify a document that contains
>>>> update operators"... On the mongo command line, the update would be:
>>>> >>>   db.collection.update({"customId":{$customId},{$set:
>>>> {"status":"Stage_2"}});
>>>> >>>
>>>> >>> In the NiFi flow, all I have is the customId, and I want to set a
>>>> status in the database when I receive it, but the database has a larger set
>>>> of doc keys/values.  I know I could do GetMongo -> JoltTransform for status
>>>> -> PutMongo, but it seems silly to use 3 processors when this PutMongo
>>>> looks like it can do it...
>>>> >>>
>>>> >>> Thanks,
>>>> >>> Ryan
>>>> >>>
>>>> >>> [1]
>>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mongodb-nar/1.6.0/org.apache.nifi.processors.mongodb.PutMongo/index.html
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Thu, Jun 21, 2018 at 4:57 PM Mike Thomsen <
>>>> [email protected]> wrote:
>>>> >>>>
>>>> >>>> Two things:
>>>> >>>>
>>>> >>>> 1. You need to use valid JSON. Your query is not a valid JSON
>>>> example because some of the values are not quoted.
>>>> >>>> 2. You need to make sure the update option is set to use
>>>> operators, not use document.
>>>> >>>>
>>>> >>>> Let us know if that helps.
>>>> >>>>
>>>> >>>> Mike
>>>> >>>>
>>>> >>>> On Thu, Jun 21, 2018 at 3:19 PM Ryan Hendrickson <
>>>> [email protected]> wrote:
>>>> >>>>>
>>>> >>>>> Hi,
>>>> >>>>>    I can't seem to figure out the right combo of parameters to
>>>> get a document to update in Mongo using the PutMongo processor and the $set
>>>> operator.
>>>> >>>>>
>>>> >>>>> Try 1:
>>>> >>>>> The incoming flowfile contains the customId: abc
>>>> >>>>>
>>>> >>>>> NiFi Processor Properties:
>>>> >>>>>    Mode: update
>>>> >>>>>    Upsert: false
>>>> >>>>>    Update Query Key: No value set
>>>> >>>>>    Update Query: {"customId":{$customId}},{$set:
>>>> {"status":"Stage_2"}}
>>>> >>>>>    Update Mode: With operators enabled
>>>> >>>>>
>>>> >>>>> This consistently fails, the abbreviated log output:
>>>> >>>>> PutMongo Failed to insert into MongoDB due to
>>>> com.mongodb.util.JSONParseException:
>>>> >>>>> abc
>>>> >>>>> ...
>>>> >>>>> at com.mongodb.util.JSONParser.parse(JSON.java:230)
>>>> >>>>> ...
>>>> >>>>> at
>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo.java:201)
>>>> >>>>>
>>>> >>>>>
>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L201
>>>> >>>>>
>>>> >>>>> It looks like it's trying to parse the incoming flowfile as a
>>>> JSON document with the above parameters set.
>>>> >>>>>
>>>> >>>>> Try 2:
>>>> >>>>> With that in mind, I changed my input flowfile to be a json
>>>> object, but I don't think it should need to be, because I'm using the
>>>> Update Query with Operators.
>>>> >>>>> New incoming flow file: {"customId":"abc"}
>>>> >>>>>
>>>> >>>>> This allows it to get line 225, before it fails with:
>>>> >>>>> PutMongo Failed to insert into MongoDB due to
>>>> java.lang.IllegalArgumentException: Invalid BSON field name customId:
>>>> >>>>> at
>>>> org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:494)
>>>> >>>>> at
>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo:225)
>>>> >>>>>
>>>> >>>>>
>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L225
>>>> >>>>>
>>>> >>>>> Line 225: //collection.updateOne(query, update, new
>>>> UpdateOptions().upsert(upsert));
>>>> >>>>>    query = {"customId":{$customId}},{$set: {"status":"Stage_2"}}
>>>> >>>>>    update = {"customId":"abc"}
>>>> >>>>> It looks like the 'update' variable, is my incoming flowfile.
>>>> I'm not sure why it would be, based on my understanding of the processor
>>>> properties works.
>>>> >>>>>
>>>> >>>>> If anyone has any insight on how to set this up for using the
>>>> operators to update a document, I'd really appreciate the insight.  I'm
>>>> lost in debugging.
>>>> >>>>>
>>>> >>>>> Thanks,
>>>> >>>>> Best,
>>>> >>>>> Ryan
>>>> >>>>>
>>>>
>>>
>

Reply via email to