Mike, You that works right now if you pass in a flowfile, right? Ryan
On Mon, Jul 9, 2018 at 3:17 PM Mike Thomsen <[email protected]> wrote: > Ryan, > > As I mentioned earlier in the thread, this: > > > {"id":"${id}", $set: {"field1":"anything"} > > is not supported by Mongo's Java driver based on my reading of the > javadocs. All of the update and replace functions require two separate > documents to be passed to them. If you can point me to an official Mongo > API that supports that, I'll be happy to take a look at it as a possible > feature for 1.8. > > Thanks, > > Mike > > On Mon, Jul 9, 2018 at 3:01 PM Ryan Hendrickson < > [email protected]> wrote: > >> I'm not sure what the right answer here is to implement something, >> especially thinking about the bigger picture of the Mongo processors. What >> appears to me to be a clear need, is to set specific fields, without the >> document being passed in requiring specific MongoDB syntax in it. There >> must be a more flexible solution possible that can both continue to use >> Mongo Operators and also use NiFi Expression Language. >> >> I could imagine a drop-down combo box in the UI that asks which Mongo >> Operator you'd like to use: $set, $unset, etc., then a text field >> supporting Expression Language for the JSON values. >> >> Example of: {"id":"${id}", $set: {"field1":"anything"}, would be >> expressed in the UI as: >> >> Mode: update >> Update Query: {"id":"${id}"} >> Mongo Operator: $set (from DropDown) >> Operator Update Values: {"field1":"anything"} >> Update Mode: With operators enabled >> >> As an aside, the team's had some discussions on whether or not the >> MongoURI should be set as a sensitive value since it contains the password >> in there, or break out the password to be a separate sensitive field. >> >> Going back to Andy's suggestion, that sounds great, but it doesn't keep >> my initial JSON document I need later in the flow. I could split the >> FlowFiles using an empty UpdateAttribute, and then do that, although it's a >> lot of hoops to jump through. >> >> Thanks, >> Ryan >> >> On Thu, Jun 28, 2018 at 11:19 AM Mike Thomsen <[email protected]> >> wrote: >> >>> There's also a (can't remember the name) processor or two that generates >>> SQL operations. Something like that for Mongo would be appropriate. The >>> question is how to approach that because you might want to do multiple >>> operations in the same query. Expressing that in the processor UI could >>> non-intuitive. At first glance, I would suggest a many-to-one mapping with >>> dynamic properties. Something like this: >>> >>> customID => $set >>> logins => $inc >>> bad_logins => $inc >>> some_field => $unset >>> >>> We couldn't support EL there because it would break $set because in this >>> case customID should mean "fetch attribute customID and $set its value" so >>> ${customID} just drops the value into the field and sets null(?) as the >>> value. >>> >>> Doing a JSON builder for the body is probably not necessary just due to >>> the fact that we already have good ways to read the JSON and do it manually >>> (Jolt and ExecuteScript as examples). >>> >>> Thoughts? >>> >>> On Thu, Jun 28, 2018 at 10:17 AM Otto Fowler <[email protected]> >>> wrote: >>> >>>> So you want to set state not data. >>>> >>>> >>>> On June 27, 2018 at 23:32:48, Ryan Hendrickson ( >>>> [email protected]) wrote: >>>> >>>> What we've got is a bunch of custom NiFi processors that are processing >>>> data. As the data is processed, we currently use Spring to load up a DAO >>>> to access Mongo and annotate the file is complete. Then we send the >>>> content to ElasticSearch. As it turns out, we can simplify our process >>>> quite a bit by using standard processors instead of our custom ones, so >>>> we're trying to pull out the Mongo updates from occurring. We've already >>>> got it setup to be flow-scoped with Expression Language to define the >>>> collection and query id, but not the actual update itself. We want to be >>>> able to dynamically set different fields on the update based on NiFi >>>> attributes, like ${progress}, ${priority}, etc. >>>> >>>> The issue I'm having isn't that I need to extract JSON from the >>>> FlowFile Content into an attribute, it's that the FlowFile Attribute values >>>> need to be stored in Mongo - which in the current implementation, updates >>>> to Mongo appear to only be possible from the FlowFile Content read in. >>>> >>>> >>>> On Wed, Jun 27, 2018 at 11:10 PM Andy LoPresto <[email protected]> >>>> wrote: >>>> >>>>> Ryan, >>>>> >>>>> I believe what you are describing as the current behavior is accurate. >>>>> I don’t quite follow your request to allow a property on the processor to >>>>> define the content that gets used in the query, as that seems like it >>>>> would >>>>> be fairly static. However, I am not in the key demographic for the Mongo >>>>> processors (as in, I only stand it up to review someone’s PR). In general, >>>>> NiFi processor properties are designed to be “flow-scoped” rather than >>>>> “single iteration of an operation-scoped” — i.e. flowfiles are flowing >>>>> through processors and each respective operation is performed given the >>>>> context at the time, rather than a job scheduling or “one-time activity” >>>>> tool. Maybe that’s where the disconnect is if your request is more along >>>>> those lines. >>>>> >>>>> The Update Query property does support NiFi Expression Language >>>>> though, so you could set that property value to be “${update_query}” and >>>>> ensure that the update_query attribute is set on incoming flowfiles. For >>>>> each flowfile, the operation would occur with that dynamic query. You >>>>> could >>>>> use the EvaluateJsonPath processor preceding this to extract JSON >>>>> components from flowfile content into an attribute. >>>>> >>>>> If you need an immediate fix, you can use the GenerateFlowFile >>>>> processor to generate a flowfile which has the static content you’re >>>>> looking for, and pass that to the PutMongo processor. >>>>> >>>>> >>>>> Andy LoPresto >>>>> [email protected] >>>>> *[email protected] <[email protected]>* >>>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >>>>> >>>>> On Jun 27, 2018, at 7:45 PM, Ryan Hendrickson < >>>>> [email protected]> wrote: >>>>> >>>>> I think something must be getting lost in my email. The $set operator >>>>> does work. I tested it today, however, the NiFi processor requires that >>>>> input to be passed in as FlowFile. I'm recommending is that instead of >>>>> using the FlowFile Content as the doc that updates the db, alternatively, >>>>> it could be a NiFi Property that's set. >>>>> >>>>> Check out the following key lines.. >>>>> >>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java >>>>> >>>>> Line 197 session.read(flowFile, in -> >>>>> StreamUtils.fillBuffer(in, content, true)); >>>>> Line 200 final Object doc = (mode.equals(MODE_INSERT) || >>>>> (mode.equals(MODE_UPDATE) && >>>>> updateMode.equals(UPDATE_WITH_DOC.getValue()))) >>>>> Line 201 ? Document.parse(new String(content, >>>>> charset)) : JSON.parse(new String(content, charset)); >>>>> Line 223 BasicDBObject update = (BasicDBObject)doc; >>>>> Line 225 collection.updateOne(query, update, new >>>>> UpdateOptions().upsert(upsert)); >>>>> >>>>> So on Line 223, if I'm reading this right, instead of using the doc, >>>>> which is the FlowFile content, just grab the update portion of the syntax, >>>>> expressed as a NiFi Property UPDATE ( {"$set": {"status":"Stage_2"}} >>>>> ), such as: context.getProperty(UPDATE).getValue(), and cast that to >>>>> BasicDBObject, instead of doc. >>>>> >>>>> Does that describe a little better what I mean? >>>>> >>>>> >>>>> On Wed, Jun 27, 2018 at 7:37 PM Mike Thomsen <[email protected]> >>>>> wrote: >>>>> >>>>>> Ryan, >>>>>> >>>>>> FWIW, Mongo's own Java client doesn't work like that. See the update >>>>>> methods it exposes here: >>>>>> >>>>>> >>>>>> http://api.mongodb.com/java/current/com/mongodb/client/MongoCollection.html#findOneAndUpdate-org.bson.conversions.Bson-org.bson.conversions.Bson- >>>>>> >>>>>> The single parameter update() method doesn't exist on MongoCollection. >>>>>> >>>>>> It can be pretty tricky going back and forth between the Mongo shell >>>>>> and the client APIs. >>>>>> >>>>>> On Wed, Jun 27, 2018 at 5:31 PM Ryan Hendrickson < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Mike, Matt, >>>>>>> Thanks for the help. Mike - the one json thing was a typo. >>>>>>> >>>>>>> I just got this working as the following: >>>>>>> >>>>>>> File on disk, testQuery.json: {"$set":{"status":"Stage_2"}} >>>>>>> GetFile ----FlowFile---> PutMongo >>>>>>> >>>>>>> NiFi Processor Properties: >>>>>>> Mode: update >>>>>>> Upsert: false >>>>>>> Update Query Key: No value set >>>>>>> Update Query: {"customId":"abc"} >>>>>>> Update Mode: With operators enabled >>>>>>> >>>>>>> This feels like a pretty contrived example here that got it >>>>>>> working. I'm not aware of any of the current processors that output >>>>>>> with >>>>>>> the Mongo Operators in a JSON file, which would mean if I'm operating on >>>>>>> straight JSON, I'd have to manipulate it in some way to get that in >>>>>>> there. >>>>>>> That seems fairly complicated. Is there a simpler way to do this >>>>>>> and I'm missing it? >>>>>>> >>>>>>> Just some background -- >>>>>>> What I'm trying to do: >>>>>>> I have the ID of a record in the database, and I only want to >>>>>>> update 1 field. I could take the ID and do a GetMongo -> JoltTransform >>>>>>> -> >>>>>>> PutMongo and replace the entire document, but that seems like a lot of >>>>>>> processors for this use-case. >>>>>>> >>>>>>> My initial approach/recommendation: >>>>>>> I initially thought the Update Query property would take what >>>>>>> Mongo's CLI is expecting {"customId":{"$customId"},{"$set": >>>>>>> {"status":"Stage_2"}}. Instead, it works by splitting on the comma - >>>>>>> (1) >>>>>>> Query as Property, (2) Update with Operator as FlowFile. I think >>>>>>> supporting an update using operators that's set within the NiFi >>>>>>> Processor >>>>>>> Properties, vs in the incoming FlowFile could make this processor a lot >>>>>>> more flexible. >>>>>>> >>>>>>> Thanks, >>>>>>> Ryan >>>>>>> >>>>>>> On Wed, Jun 27, 2018 at 4:26 PM Matt Burgess <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> If "customId" is a flow file attribute (and the property supports >>>>>>>> expression language), then you just have your braces and $ swapped, >>>>>>>> try ${customId} instead of {$customId} >>>>>>>> On Wed, Jun 27, 2018 at 4:15 PM Mike Thomsen < >>>>>>>> [email protected]> wrote: >>>>>>>> > >>>>>>>> > > can you clarify if you mean the NiFi Processor Property "Update >>>>>>>> Query", or the FlowFile require proper json? >>>>>>>> > >>>>>>>> > Both. >>>>>>>> > >>>>>>>> > > I'm getting an error: MongoDB due to redstartDocument can only >>>>>>>> be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is >>>>>>>> Array. >>>>>>>> > >>>>>>>> > PutMongo does not support arrays. >>>>>>>> > >>>>>>>> > > I'm trying to do the latter, "specify a document that contains >>>>>>>> update operators"... On the mongo command line, the update would be: >>>>>>>> > > db.collection.update({"customId":{$customId},{$set: >>>>>>>> {"status":"Stage_2"}}); >>>>>>>> > >>>>>>>> > I don' t know if that's a typo, but it should be >>>>>>>> {"customId":{$customId}} followed by {$set: {"status":"Stage_2"}}. Your >>>>>>>> version was one big document. What we would expect is a query like { >>>>>>>> "customId": "XYZ"}. As mentioned, $customId is not a valid JSON value. >>>>>>>> > >>>>>>>> > Let me know if that helps. >>>>>>>> > >>>>>>>> > Mike >>>>>>>> > >>>>>>>> > On Wed, Jun 27, 2018 at 3:15 PM Ryan Hendrickson < >>>>>>>> [email protected]> wrote: >>>>>>>> >> >>>>>>>> >> Hi Mike, >>>>>>>> >> Just curious - any other suggestions? >>>>>>>> >> >>>>>>>> >> Thanks, >>>>>>>> >> Ryan >>>>>>>> >> >>>>>>>> >> On Thu, Jun 21, 2018 at 5:23 PM Ryan Hendrickson < >>>>>>>> [email protected]> wrote: >>>>>>>> >>> >>>>>>>> >>> Thanks for the suggestions, can you clarify if you mean the >>>>>>>> NiFi Processor Property "Update Query", or the FlowFile require proper >>>>>>>> json? I'm not sure how to get proper json with the $set in there. >>>>>>>> >>> >>>>>>>> >>> I made the following modifications based it: >>>>>>>> >>> >>>>>>>> >>> NiFi Processors Properties: >>>>>>>> >>> Update Query: [{"customId":{$customId}},{"$set": >>>>>>>> {"status":"Stage_2"}}] >>>>>>>> >>> Update Mode: With operators enabled --- Confirmed that I've >>>>>>>> been using this. >>>>>>>> >>> >>>>>>>> >>> FlowFile Contents: [{"customId":{$customId}},{"$set": >>>>>>>> {"status":"Stage_2"}}] >>>>>>>> >>> >>>>>>>> >>> I'm getting an error: MongoDB due to redstartDocument can only >>>>>>>> be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is >>>>>>>> Array. >>>>>>>> >>> >>>>>>>> >>> The NiFi Docs for Put Mongo say [1]: >>>>>>>> >>> Update Query: Specify a full MongoDB query to be used for the >>>>>>>> lookup query to do an update/upsert. Supports Expression Language: >>>>>>>> true >>>>>>>> >>> Update Mode: Choose an update mode. You can either supply a >>>>>>>> JSON document to use as a direct replacement or specify a document that >>>>>>>> contains update operators like $set and $unset. >>>>>>>> >>> >>>>>>>> >>> I'm trying to do the latter, "specify a document that contains >>>>>>>> update operators"... On the mongo command line, the update would be: >>>>>>>> >>> db.collection.update({"customId":{$customId},{$set: >>>>>>>> {"status":"Stage_2"}}); >>>>>>>> >>> >>>>>>>> >>> In the NiFi flow, all I have is the customId, and I want to set >>>>>>>> a status in the database when I receive it, but the database has a >>>>>>>> larger >>>>>>>> set of doc keys/values. I know I could do GetMongo -> JoltTransform >>>>>>>> for >>>>>>>> status -> PutMongo, but it seems silly to use 3 processors when this >>>>>>>> PutMongo looks like it can do it... >>>>>>>> >>> >>>>>>>> >>> Thanks, >>>>>>>> >>> Ryan >>>>>>>> >>> >>>>>>>> >>> [1] >>>>>>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mongodb-nar/1.6.0/org.apache.nifi.processors.mongodb.PutMongo/index.html >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> On Thu, Jun 21, 2018 at 4:57 PM Mike Thomsen < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Two things: >>>>>>>> >>>> >>>>>>>> >>>> 1. You need to use valid JSON. Your query is not a valid JSON >>>>>>>> example because some of the values are not quoted. >>>>>>>> >>>> 2. You need to make sure the update option is set to use >>>>>>>> operators, not use document. >>>>>>>> >>>> >>>>>>>> >>>> Let us know if that helps. >>>>>>>> >>>> >>>>>>>> >>>> Mike >>>>>>>> >>>> >>>>>>>> >>>> On Thu, Jun 21, 2018 at 3:19 PM Ryan Hendrickson < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>> >>>>>>>> >>>>> Hi, >>>>>>>> >>>>> I can't seem to figure out the right combo of parameters >>>>>>>> to get a document to update in Mongo using the PutMongo processor and >>>>>>>> the >>>>>>>> $set operator. >>>>>>>> >>>>> >>>>>>>> >>>>> Try 1: >>>>>>>> >>>>> The incoming flowfile contains the customId: abc >>>>>>>> >>>>> >>>>>>>> >>>>> NiFi Processor Properties: >>>>>>>> >>>>> Mode: update >>>>>>>> >>>>> Upsert: false >>>>>>>> >>>>> Update Query Key: No value set >>>>>>>> >>>>> Update Query: {"customId":{$customId}},{$set: >>>>>>>> {"status":"Stage_2"}} >>>>>>>> >>>>> Update Mode: With operators enabled >>>>>>>> >>>>> >>>>>>>> >>>>> This consistently fails, the abbreviated log output: >>>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to >>>>>>>> com.mongodb.util.JSONParseException: >>>>>>>> >>>>> abc >>>>>>>> >>>>> ... >>>>>>>> >>>>> at com.mongodb.util.JSONParser.parse(JSON.java:230) >>>>>>>> >>>>> ... >>>>>>>> >>>>> at >>>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo.java:201) >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L201 >>>>>>>> >>>>> >>>>>>>> >>>>> It looks like it's trying to parse the incoming flowfile as a >>>>>>>> JSON document with the above parameters set. >>>>>>>> >>>>> >>>>>>>> >>>>> Try 2: >>>>>>>> >>>>> With that in mind, I changed my input flowfile to be a json >>>>>>>> object, but I don't think it should need to be, because I'm using the >>>>>>>> Update Query with Operators. >>>>>>>> >>>>> New incoming flow file: {"customId":"abc"} >>>>>>>> >>>>> >>>>>>>> >>>>> This allows it to get line 225, before it fails with: >>>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to >>>>>>>> java.lang.IllegalArgumentException: Invalid BSON field name customId: >>>>>>>> >>>>> at >>>>>>>> org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:494) >>>>>>>> >>>>> at >>>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo:225) >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L225 >>>>>>>> >>>>> >>>>>>>> >>>>> Line 225: //collection.updateOne(query, update, new >>>>>>>> UpdateOptions().upsert(upsert)); >>>>>>>> >>>>> query = {"customId":{$customId}},{$set: >>>>>>>> {"status":"Stage_2"}} >>>>>>>> >>>>> update = {"customId":"abc"} >>>>>>>> >>>>> It looks like the 'update' variable, is my incoming >>>>>>>> flowfile. I'm not sure why it would be, based on my understanding of >>>>>>>> the >>>>>>>> processor properties works. >>>>>>>> >>>>> >>>>>>>> >>>>> If anyone has any insight on how to set this up for using the >>>>>>>> operators to update a document, I'd really appreciate the insight. I'm >>>>>>>> lost in debugging. >>>>>>>> >>>>> >>>>>>>> >>>>> Thanks, >>>>>>>> >>>>> Best, >>>>>>>> >>>>> Ryan >>>>>>>> >>>>> >>>>>>>> >>>>>>> >>>>>
