To give another perspective on the “callback vs. non”, I’d say “heavy” or “messy” operations (like encryption, for example) should be contained in encapsulated code (other classes which provide a service) and then invoked from the callback or TWR. This allows for much more testable business logic, separation of concerns (a service which implements the behavior and then a component effectively calling the API), and composability/flexibility. If I want to build a processor which exposes a property allowing the user to select different encryption algorithms, I can either detect which one and delegate that to an implementation, or I could have a giant switch statement and the raw crypto primitive code all in a giant spaghetti method/callback definition. I know I would prefer the former.
Andy LoPresto alopre...@apache.org alopresto.apa...@gmail.com He/Him PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4 BACE 3C6E F65B 2F7D EF69 > On Jun 11, 2020, at 8:14 AM, Mark Payne <marka...@hotmail.com> wrote: > > Jason, > > Modify vs. clone vs. create new: > > You would clone a FlowFile if you want an exact copy of the FlowFile (with > the exception that the clone will have a unique UUID, Entry Date, etc.). Very > rare that a Processor will actually do this. > > Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - > This is a judgment call really. Do you think it will be necessary to have a > copy of the original FlowFile and a modified version of it? If so, you may > want to create a child FlowFile and send the original FlowFile to original. > In reality, you shouldn’t need this often. In most cases, if the user wants > both the original and the modified version, they can just add two > connections, one going to this processor and one going to wherever else they > want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. > Where the “create a child and send out the original” matters is just when > there’s a feasible use case in which the user would want to have a modified > version of the FlowFile and the original version of the FlowFile and also not > want to process the original version until after the modified version has > been created. This is not common. However, over the years, it has become a > common practice to create “original” relationships when they are not needed, > likely because a few developers saw a pattern of creating an original > relationship and duplicated this to many other processors without really > understanding the difference. > > “Net New” - there are two ways to create a FlowFile: `session.create()` and > `session.create(original);` - the first creates a FlowFile with no parent > FlowFile. This should be done only if there is no inbound FlowFile to create > it from. I.e., when this is a “source” processor. In 100% of all other cases, > it should be done as `session.create(original);` Providing the original > FlowFile does 2 important things. Firstly, it creates a linkage in provenance > between them. Secondly, it causes the newly created FlowFile to inherit all > attributes from the child. > > Call vs. non-callback: It doesn’t matter. The callback was originally the > only way to read or write content of FlowFiles. It was done this way because > it was a straight-forward way to ensure that the framework was able to > properly manage InputStream, OutputStream, etc. But there were use cases that > didn’t fit the callback mechanism well so we eventually added ability to get > the InputStreams and OutputStreams directly and callers can just use > try-with-resources. This is probably preferred now for most cases just > because it results in cleaner code. > > Thanks > -Mark > >> On Jun 11, 2020, at 10:43 AM, Jason Iannone <bread...@gmail.com >> <mailto:bread...@gmail.com>> wrote: >> >> I confirmed what you mentioned as well. >> >> I also looked over many custom processor examples and looking for >> clarification on a few things which I didn't see explicitly called out in >> the developers guide. >> Are their guidelines on when one should modify the original flowfile vs when >> you should clone vs when you should create net new? >> Should heavier lifting such as decryption, formatting, etc. be done in a >> callback? >> >> Thanks, >> Jason >> >> On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <marka...@hotmail.com >> <mailto:marka...@hotmail.com>> wrote: >> I don’t think flushing should matter, if you’re writing directly to the >> provided OutputStream. If you wrap it in a BufferedOutputStream or something >> like that, then of course you’ll want to flush that. Assuming that you are >> extending AbstractProcessor, it will call session.commit() for you >> automatically when onTrigger() returns. >> >> I did just notice that you said you’re merging 1,000+ FlowFiles. That would >> make it kind of difficult to follow the provenance. Would recommend for >> debugging purposes, at least, that you try making small batches, maybe 25 >> FlowFiles or something like that. Would make it a lot easier to find the >> culprit >> >>> On Jun 10, 2020, at 4:28 PM, Jason Iannone <bread...@gmail.com >>> <mailto:bread...@gmail.com>> wrote: >>> >>> Excellent advice, thank you! When writing via >>> ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or >>> session.commit()? I noticed we aren't doing either, but we are invoking >>> session.transfer. >>> >>> Thanks, >>> Jason >>> >>> >>> On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <marka...@hotmail.com >>> <mailto:marka...@hotmail.com>> wrote: >>> Jason, >>> >>> Control characters should not cause any problem with MergeContent. >>> MergeContent just copies bytes from one stream to another. It’s also worth >>> noting that attributes don’t really come into play here. MergeContent is >>> combining the FlowFile content, so even if it has some weird attributes, >>> those won’t cause a problem in the output content. NiFi stores attributes >>> as a mapping of String to String key/value pairs (i.e., Map<String, >>> String>). So the processor is assuming that if you want to convert a >>> message header to an attribute, that header must be a string. >>> >>> Content in the repository is stored using “slabs” or “blocks.” One >>> processor at a time has the opportunity to write to a file in the content >>> repository. When the processor finishes writing and transfers the FlowFile >>> to the next processor, NiFi keeps track of which file its content was >>> written to, the byte offset where its content starts, and the length of the >>> content. The next time that a processor needs to write to the content of a >>> FlowFile, it may end up appending to that same file on disk, but the >>> FlowFile that the content corresponds to will keep track of the byte offset >>> into the file where its content begins and how many bytes in that file >>> belong to that FlowFile. >>> >>> My recommendation to track this down would be to find a FlowFile that is >>> corrupt, and then use the data provenance feature [1] to view its lineage. >>> Look at the FlowFiles that were joined together by MergeContent and see if >>> any of those is corrupt. >>> >>> Thanks >>> -Mark >>> >>> [1] >>> http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance >>> <http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance> >>> >>>> On Jun 10, 2020, at 2:07 PM, Jason Iannone <bread...@gmail.com >>>> <mailto:bread...@gmail.com>> wrote: >>>> >>>> Hey Mark, >>>> >>>> I was thinking over this more and despite no complaints from Jackson >>>> Objectmapper is it possible that hidden and/or control characters are >>>> present in the JSON values which would then cause MergeContent to behave >>>> this way? I looked over the code and nothing jumped out, but there is >>>> something we had to do because of how the publisher is setting kafka >>>> header attributes. Some attributes are bytes and not strings converted to >>>> bytes, and ConsumeKafka seems to assume that these can always be converted >>>> to a String. We had to change the encoding to be ISO8859 due to running >>>> into issues with the bytes getting corrupted. >>>> >>>> I'm also trying to better understand how the content is being stored in >>>> the content repository, and whether something is going wrong when writing >>>> it out. >>>> >>>> Thanks, >>>> Jason >>>> >>>> On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <marka...@hotmail.com >>>> <mailto:marka...@hotmail.com>> wrote: >>>> Hey Jason, >>>> >>>> Thanks for reaching out. That is definitely odd and not something that >>>> I’ve seen or heard about before. >>>> >>>> Are you certain that the data is not being corrupted upstream of the >>>> processor? I ask because the code for the processor that handles writing >>>> out the content is pretty straight forward and hasn’t been modified in >>>> over 3 years, so I would expect to see it happen often if it were a bug in >>>> the MergeContent processor itself. Any chance that you can create a flow >>>> template/sample data that recreates the issue? Anything particularly >>>> unique about your flow? >>>> >>>> Thanks >>>> -Mark >>>> >>>> >>>> > On Jun 9, 2020, at 6:47 PM, Jason Iannone <bread...@gmail.com >>>> > <mailto:bread...@gmail.com>> wrote: >>>> > >>>> > Hi all, >>>> > >>>> > Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. >>>> > The processor is being fed in many flowfiles with individual JSON >>>> > records. The records have various field types including a hex-encoded >>>> > byte[]. We are not trying to merge JSON records themselves but rather >>>> > consolidate many flowfiles into fewer flowfiles. >>>> > >>>> > What we're seeing is that a random flowfile is split causing the merge >>>> > file to be invalid JSON. When running multiple bins we saw the flowfile >>>> > split across bins. >>>> > >>>> > Example >>>> > Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: >>>> > "123456789" } >>>> > Flowfile 2: {"name": "2", "hexbytes": A10F15D14B11", timestamp: >>>> > "123456790" } >>>> > Flowfile 3: {"name": "3", "hexbytes": A10F15D14B11", timestamp: >>>> > "123456790" } >>>> > >>>> > Merged Result: >>>> > {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" } >>>> > xbytes": A10F15D14B11", timestamp: "123456790" } >>>> > {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" } >>>> > {"name": "3", "h >>>> > >>>> > Mergecontent Configuration: >>>> > Concurrent Tasks: 4 >>>> > Merge Strategy: Bin-Packing Algorithm >>>> > Merge Format: Binary Concatenation >>>> > Attribute Strategy: Keep Only Common Attributes >>>> > Min. number of entries 1000 >>>> > Max number of entries: 20000 >>>> > Minimum group size: 10 KB >>>> > Maximum number of bins: 5 >>>> > Header, Footer, and Demaractor are not set. >>>> > >>>> > We then backed off the below to reduce min and max entries, bin to 1, >>>> > and thread to 1 and still see the same issue. >>>> > >>>> > Any insights? >>>> > >>>> > Thanks, >>>> > Jason >>>> >>> >> >