Ryan, I thought we were headed back down the path of the Mongo shell vs Mongo driver API discussion, if not my apologies and carry on. I'll try to find some time to kick the tires on EL behavior.
On Mon, Jul 9, 2018 at 3:18 PM Ryan Hendrickson < [email protected]> wrote: > Mike, > You that works right now if you pass in a flowfile, right? > > Ryan > > On Mon, Jul 9, 2018 at 3:17 PM Mike Thomsen <[email protected]> > wrote: > >> Ryan, >> >> As I mentioned earlier in the thread, this: >> >> > {"id":"${id}", $set: {"field1":"anything"} >> >> is not supported by Mongo's Java driver based on my reading of the >> javadocs. All of the update and replace functions require two separate >> documents to be passed to them. If you can point me to an official Mongo >> API that supports that, I'll be happy to take a look at it as a possible >> feature for 1.8. >> >> Thanks, >> >> Mike >> >> On Mon, Jul 9, 2018 at 3:01 PM Ryan Hendrickson < >> [email protected]> wrote: >> >>> I'm not sure what the right answer here is to implement something, >>> especially thinking about the bigger picture of the Mongo processors. What >>> appears to me to be a clear need, is to set specific fields, without the >>> document being passed in requiring specific MongoDB syntax in it. There >>> must be a more flexible solution possible that can both continue to use >>> Mongo Operators and also use NiFi Expression Language. >>> >>> I could imagine a drop-down combo box in the UI that asks which Mongo >>> Operator you'd like to use: $set, $unset, etc., then a text field >>> supporting Expression Language for the JSON values. >>> >>> Example of: {"id":"${id}", $set: {"field1":"anything"}, would be >>> expressed in the UI as: >>> >>> Mode: update >>> Update Query: {"id":"${id}"} >>> Mongo Operator: $set (from DropDown) >>> Operator Update Values: {"field1":"anything"} >>> Update Mode: With operators enabled >>> >>> As an aside, the team's had some discussions on whether or not the >>> MongoURI should be set as a sensitive value since it contains the password >>> in there, or break out the password to be a separate sensitive field. >>> >>> Going back to Andy's suggestion, that sounds great, but it doesn't keep >>> my initial JSON document I need later in the flow. I could split the >>> FlowFiles using an empty UpdateAttribute, and then do that, although it's a >>> lot of hoops to jump through. >>> >>> Thanks, >>> Ryan >>> >>> On Thu, Jun 28, 2018 at 11:19 AM Mike Thomsen <[email protected]> >>> wrote: >>> >>>> There's also a (can't remember the name) processor or two that >>>> generates SQL operations. Something like that for Mongo would be >>>> appropriate. The question is how to approach that because you might want to >>>> do multiple operations in the same query. Expressing that in the processor >>>> UI could non-intuitive. At first glance, I would suggest a many-to-one >>>> mapping with dynamic properties. Something like this: >>>> >>>> customID => $set >>>> logins => $inc >>>> bad_logins => $inc >>>> some_field => $unset >>>> >>>> We couldn't support EL there because it would break $set because in >>>> this case customID should mean "fetch attribute customID and $set its >>>> value" so ${customID} just drops the value into the field and sets null(?) >>>> as the value. >>>> >>>> Doing a JSON builder for the body is probably not necessary just due to >>>> the fact that we already have good ways to read the JSON and do it manually >>>> (Jolt and ExecuteScript as examples). >>>> >>>> Thoughts? >>>> >>>> On Thu, Jun 28, 2018 at 10:17 AM Otto Fowler <[email protected]> >>>> wrote: >>>> >>>>> So you want to set state not data. >>>>> >>>>> >>>>> On June 27, 2018 at 23:32:48, Ryan Hendrickson ( >>>>> [email protected]) wrote: >>>>> >>>>> What we've got is a bunch of custom NiFi processors that are >>>>> processing data. As the data is processed, we currently use Spring to >>>>> load >>>>> up a DAO to access Mongo and annotate the file is complete. Then we send >>>>> the content to ElasticSearch. As it turns out, we can simplify our >>>>> process >>>>> quite a bit by using standard processors instead of our custom ones, so >>>>> we're trying to pull out the Mongo updates from occurring. We've already >>>>> got it setup to be flow-scoped with Expression Language to define the >>>>> collection and query id, but not the actual update itself. We want to be >>>>> able to dynamically set different fields on the update based on NiFi >>>>> attributes, like ${progress}, ${priority}, etc. >>>>> >>>>> The issue I'm having isn't that I need to extract JSON from the >>>>> FlowFile Content into an attribute, it's that the FlowFile Attribute >>>>> values >>>>> need to be stored in Mongo - which in the current implementation, updates >>>>> to Mongo appear to only be possible from the FlowFile Content read in. >>>>> >>>>> >>>>> On Wed, Jun 27, 2018 at 11:10 PM Andy LoPresto <[email protected]> >>>>> wrote: >>>>> >>>>>> Ryan, >>>>>> >>>>>> I believe what you are describing as the current behavior is >>>>>> accurate. I don’t quite follow your request to allow a property on the >>>>>> processor to define the content that gets used in the query, as that >>>>>> seems >>>>>> like it would be fairly static. However, I am not in the key demographic >>>>>> for the Mongo processors (as in, I only stand it up to review someone’s >>>>>> PR). In general, NiFi processor properties are designed to be >>>>>> “flow-scoped” >>>>>> rather than “single iteration of an operation-scoped” — i.e. flowfiles >>>>>> are >>>>>> flowing through processors and each respective operation is performed >>>>>> given >>>>>> the context at the time, rather than a job scheduling or “one-time >>>>>> activity” tool. Maybe that’s where the disconnect is if your request is >>>>>> more along those lines. >>>>>> >>>>>> The Update Query property does support NiFi Expression Language >>>>>> though, so you could set that property value to be “${update_query}” and >>>>>> ensure that the update_query attribute is set on incoming flowfiles. For >>>>>> each flowfile, the operation would occur with that dynamic query. You >>>>>> could >>>>>> use the EvaluateJsonPath processor preceding this to extract JSON >>>>>> components from flowfile content into an attribute. >>>>>> >>>>>> If you need an immediate fix, you can use the GenerateFlowFile >>>>>> processor to generate a flowfile which has the static content you’re >>>>>> looking for, and pass that to the PutMongo processor. >>>>>> >>>>>> >>>>>> Andy LoPresto >>>>>> [email protected] >>>>>> *[email protected] <[email protected]>* >>>>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 >>>>>> >>>>>> On Jun 27, 2018, at 7:45 PM, Ryan Hendrickson < >>>>>> [email protected]> wrote: >>>>>> >>>>>> I think something must be getting lost in my email. The $set >>>>>> operator does work. I tested it today, however, the NiFi processor >>>>>> requires that input to be passed in as FlowFile. I'm recommending is >>>>>> that >>>>>> instead of using the FlowFile Content as the doc that updates the db, >>>>>> alternatively, it could be a NiFi Property that's set. >>>>>> >>>>>> Check out the following key lines.. >>>>>> >>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java >>>>>> >>>>>> Line 197 session.read(flowFile, in -> >>>>>> StreamUtils.fillBuffer(in, content, true)); >>>>>> Line 200 final Object doc = (mode.equals(MODE_INSERT) || >>>>>> (mode.equals(MODE_UPDATE) && >>>>>> updateMode.equals(UPDATE_WITH_DOC.getValue()))) >>>>>> Line 201 ? Document.parse(new String(content, >>>>>> charset)) : JSON.parse(new String(content, charset)); >>>>>> Line 223 BasicDBObject update = (BasicDBObject)doc; >>>>>> Line 225 collection.updateOne(query, update, new >>>>>> UpdateOptions().upsert(upsert)); >>>>>> >>>>>> So on Line 223, if I'm reading this right, instead of using the doc, >>>>>> which is the FlowFile content, just grab the update portion of the >>>>>> syntax, >>>>>> expressed as a NiFi Property UPDATE ( {"$set": {"status":"Stage_2"}} >>>>>> ), such as: context.getProperty(UPDATE).getValue(), and cast that to >>>>>> BasicDBObject, instead of doc. >>>>>> >>>>>> Does that describe a little better what I mean? >>>>>> >>>>>> >>>>>> On Wed, Jun 27, 2018 at 7:37 PM Mike Thomsen <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Ryan, >>>>>>> >>>>>>> FWIW, Mongo's own Java client doesn't work like that. See the update >>>>>>> methods it exposes here: >>>>>>> >>>>>>> >>>>>>> http://api.mongodb.com/java/current/com/mongodb/client/MongoCollection.html#findOneAndUpdate-org.bson.conversions.Bson-org.bson.conversions.Bson- >>>>>>> >>>>>>> The single parameter update() method doesn't exist on >>>>>>> MongoCollection. >>>>>>> >>>>>>> It can be pretty tricky going back and forth between the Mongo shell >>>>>>> and the client APIs. >>>>>>> >>>>>>> On Wed, Jun 27, 2018 at 5:31 PM Ryan Hendrickson < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Mike, Matt, >>>>>>>> Thanks for the help. Mike - the one json thing was a typo. >>>>>>>> >>>>>>>> I just got this working as the following: >>>>>>>> >>>>>>>> File on disk, testQuery.json: {"$set":{"status":"Stage_2"}} >>>>>>>> GetFile ----FlowFile---> PutMongo >>>>>>>> >>>>>>>> NiFi Processor Properties: >>>>>>>> Mode: update >>>>>>>> Upsert: false >>>>>>>> Update Query Key: No value set >>>>>>>> Update Query: {"customId":"abc"} >>>>>>>> Update Mode: With operators enabled >>>>>>>> >>>>>>>> This feels like a pretty contrived example here that got it >>>>>>>> working. I'm not aware of any of the current processors that output >>>>>>>> with >>>>>>>> the Mongo Operators in a JSON file, which would mean if I'm operating >>>>>>>> on >>>>>>>> straight JSON, I'd have to manipulate it in some way to get that in >>>>>>>> there. >>>>>>>> That seems fairly complicated. Is there a simpler way to do this >>>>>>>> and I'm missing it? >>>>>>>> >>>>>>>> Just some background -- >>>>>>>> What I'm trying to do: >>>>>>>> I have the ID of a record in the database, and I only want to >>>>>>>> update 1 field. I could take the ID and do a GetMongo -> >>>>>>>> JoltTransform -> >>>>>>>> PutMongo and replace the entire document, but that seems like a lot of >>>>>>>> processors for this use-case. >>>>>>>> >>>>>>>> My initial approach/recommendation: >>>>>>>> I initially thought the Update Query property would take what >>>>>>>> Mongo's CLI is expecting {"customId":{"$customId"},{"$set": >>>>>>>> {"status":"Stage_2"}}. Instead, it works by splitting on the comma - >>>>>>>> (1) >>>>>>>> Query as Property, (2) Update with Operator as FlowFile. I think >>>>>>>> supporting an update using operators that's set within the NiFi >>>>>>>> Processor >>>>>>>> Properties, vs in the incoming FlowFile could make this processor a lot >>>>>>>> more flexible. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Ryan >>>>>>>> >>>>>>>> On Wed, Jun 27, 2018 at 4:26 PM Matt Burgess <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> If "customId" is a flow file attribute (and the property supports >>>>>>>>> expression language), then you just have your braces and $ swapped, >>>>>>>>> try ${customId} instead of {$customId} >>>>>>>>> On Wed, Jun 27, 2018 at 4:15 PM Mike Thomsen < >>>>>>>>> [email protected]> wrote: >>>>>>>>> > >>>>>>>>> > > can you clarify if you mean the NiFi Processor Property >>>>>>>>> "Update Query", or the FlowFile require proper json? >>>>>>>>> > >>>>>>>>> > Both. >>>>>>>>> > >>>>>>>>> > > I'm getting an error: MongoDB due to redstartDocument can >>>>>>>>> only be called when CurrentBSONType is DOCUMENT, not when >>>>>>>>> CurrentBSONType >>>>>>>>> is Array. >>>>>>>>> > >>>>>>>>> > PutMongo does not support arrays. >>>>>>>>> > >>>>>>>>> > > I'm trying to do the latter, "specify a document that contains >>>>>>>>> update operators"... On the mongo command line, the update would be: >>>>>>>>> > > db.collection.update({"customId":{$customId},{$set: >>>>>>>>> {"status":"Stage_2"}}); >>>>>>>>> > >>>>>>>>> > I don' t know if that's a typo, but it should be >>>>>>>>> {"customId":{$customId}} followed by {$set: {"status":"Stage_2"}}. >>>>>>>>> Your >>>>>>>>> version was one big document. What we would expect is a query like { >>>>>>>>> "customId": "XYZ"}. As mentioned, $customId is not a valid JSON value. >>>>>>>>> > >>>>>>>>> > Let me know if that helps. >>>>>>>>> > >>>>>>>>> > Mike >>>>>>>>> > >>>>>>>>> > On Wed, Jun 27, 2018 at 3:15 PM Ryan Hendrickson < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >> >>>>>>>>> >> Hi Mike, >>>>>>>>> >> Just curious - any other suggestions? >>>>>>>>> >> >>>>>>>>> >> Thanks, >>>>>>>>> >> Ryan >>>>>>>>> >> >>>>>>>>> >> On Thu, Jun 21, 2018 at 5:23 PM Ryan Hendrickson < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>> >>>>>>>>> >>> Thanks for the suggestions, can you clarify if you mean the >>>>>>>>> NiFi Processor Property "Update Query", or the FlowFile require proper >>>>>>>>> json? I'm not sure how to get proper json with the $set in there. >>>>>>>>> >>> >>>>>>>>> >>> I made the following modifications based it: >>>>>>>>> >>> >>>>>>>>> >>> NiFi Processors Properties: >>>>>>>>> >>> Update Query: [{"customId":{$customId}},{"$set": >>>>>>>>> {"status":"Stage_2"}}] >>>>>>>>> >>> Update Mode: With operators enabled --- Confirmed that >>>>>>>>> I've been using this. >>>>>>>>> >>> >>>>>>>>> >>> FlowFile Contents: [{"customId":{$customId}},{"$set": >>>>>>>>> {"status":"Stage_2"}}] >>>>>>>>> >>> >>>>>>>>> >>> I'm getting an error: MongoDB due to redstartDocument can >>>>>>>>> only be called when CurrentBSONType is DOCUMENT, not when >>>>>>>>> CurrentBSONType >>>>>>>>> is Array. >>>>>>>>> >>> >>>>>>>>> >>> The NiFi Docs for Put Mongo say [1]: >>>>>>>>> >>> Update Query: Specify a full MongoDB query to be used for the >>>>>>>>> lookup query to do an update/upsert. Supports Expression Language: >>>>>>>>> true >>>>>>>>> >>> Update Mode: Choose an update mode. You can either supply a >>>>>>>>> JSON document to use as a direct replacement or specify a document >>>>>>>>> that >>>>>>>>> contains update operators like $set and $unset. >>>>>>>>> >>> >>>>>>>>> >>> I'm trying to do the latter, "specify a document that contains >>>>>>>>> update operators"... On the mongo command line, the update would be: >>>>>>>>> >>> db.collection.update({"customId":{$customId},{$set: >>>>>>>>> {"status":"Stage_2"}}); >>>>>>>>> >>> >>>>>>>>> >>> In the NiFi flow, all I have is the customId, and I want to >>>>>>>>> set a status in the database when I receive it, but the database has a >>>>>>>>> larger set of doc keys/values. I know I could do GetMongo -> >>>>>>>>> JoltTransform >>>>>>>>> for status -> PutMongo, but it seems silly to use 3 processors when >>>>>>>>> this >>>>>>>>> PutMongo looks like it can do it... >>>>>>>>> >>> >>>>>>>>> >>> Thanks, >>>>>>>>> >>> Ryan >>>>>>>>> >>> >>>>>>>>> >>> [1] >>>>>>>>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-mongodb-nar/1.6.0/org.apache.nifi.processors.mongodb.PutMongo/index.html >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> >>> On Thu, Jun 21, 2018 at 4:57 PM Mike Thomsen < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>> >>>>>>>>> >>>> Two things: >>>>>>>>> >>>> >>>>>>>>> >>>> 1. You need to use valid JSON. Your query is not a valid JSON >>>>>>>>> example because some of the values are not quoted. >>>>>>>>> >>>> 2. You need to make sure the update option is set to use >>>>>>>>> operators, not use document. >>>>>>>>> >>>> >>>>>>>>> >>>> Let us know if that helps. >>>>>>>>> >>>> >>>>>>>>> >>>> Mike >>>>>>>>> >>>> >>>>>>>>> >>>> On Thu, Jun 21, 2018 at 3:19 PM Ryan Hendrickson < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>> >>>>>>>>> >>>>> Hi, >>>>>>>>> >>>>> I can't seem to figure out the right combo of parameters >>>>>>>>> to get a document to update in Mongo using the PutMongo processor and >>>>>>>>> the >>>>>>>>> $set operator. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Try 1: >>>>>>>>> >>>>> The incoming flowfile contains the customId: abc >>>>>>>>> >>>>> >>>>>>>>> >>>>> NiFi Processor Properties: >>>>>>>>> >>>>> Mode: update >>>>>>>>> >>>>> Upsert: false >>>>>>>>> >>>>> Update Query Key: No value set >>>>>>>>> >>>>> Update Query: {"customId":{$customId}},{$set: >>>>>>>>> {"status":"Stage_2"}} >>>>>>>>> >>>>> Update Mode: With operators enabled >>>>>>>>> >>>>> >>>>>>>>> >>>>> This consistently fails, the abbreviated log output: >>>>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to >>>>>>>>> com.mongodb.util.JSONParseException: >>>>>>>>> >>>>> abc >>>>>>>>> >>>>> ... >>>>>>>>> >>>>> at com.mongodb.util.JSONParser.parse(JSON.java:230) >>>>>>>>> >>>>> ... >>>>>>>>> >>>>> at >>>>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo.java:201) >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L201 >>>>>>>>> >>>>> >>>>>>>>> >>>>> It looks like it's trying to parse the incoming flowfile as >>>>>>>>> a JSON document with the above parameters set. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Try 2: >>>>>>>>> >>>>> With that in mind, I changed my input flowfile to be a json >>>>>>>>> object, but I don't think it should need to be, because I'm using the >>>>>>>>> Update Query with Operators. >>>>>>>>> >>>>> New incoming flow file: {"customId":"abc"} >>>>>>>>> >>>>> >>>>>>>>> >>>>> This allows it to get line 225, before it fails with: >>>>>>>>> >>>>> PutMongo Failed to insert into MongoDB due to >>>>>>>>> java.lang.IllegalArgumentException: Invalid BSON field name customId: >>>>>>>>> >>>>> at >>>>>>>>> org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:494) >>>>>>>>> >>>>> at >>>>>>>>> org.apache.nifi.processors.mongodb.PutMongo.onTrigger(PutMongo:225) >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> https://github.com/apache/nifi/blob/rel/nifi-1.6.0/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java#L225 >>>>>>>>> >>>>> >>>>>>>>> >>>>> Line 225: //collection.updateOne(query, update, new >>>>>>>>> UpdateOptions().upsert(upsert)); >>>>>>>>> >>>>> query = {"customId":{$customId}},{$set: >>>>>>>>> {"status":"Stage_2"}} >>>>>>>>> >>>>> update = {"customId":"abc"} >>>>>>>>> >>>>> It looks like the 'update' variable, is my incoming >>>>>>>>> flowfile. I'm not sure why it would be, based on my understanding of >>>>>>>>> the >>>>>>>>> processor properties works. >>>>>>>>> >>>>> >>>>>>>>> >>>>> If anyone has any insight on how to set this up for using >>>>>>>>> the operators to update a document, I'd really appreciate the >>>>>>>>> insight. I'm >>>>>>>>> lost in debugging. >>>>>>>>> >>>>> >>>>>>>>> >>>>> Thanks, >>>>>>>>> >>>>> Best, >>>>>>>>> >>>>> Ryan >>>>>>>>> >>>>> >>>>>>>>> >>>>>>>> >>>>>>
