To give another perspective on the “callback vs. non”, I’d say “heavy” or 
“messy” operations (like encryption, for example) should be contained in 
encapsulated code (other classes which provide a service) and then invoked from 
the callback or TWR. This allows for much more testable business logic, 
separation of concerns (a service which implements the behavior and then a 
component effectively calling the API), and composability/flexibility. If I 
want to build a processor which exposes a property allowing the user to select 
different encryption algorithms, I can either detect which one and delegate 
that to an implementation, or I could have a giant switch statement and the raw 
crypto primitive code all in a giant spaghetti method/callback definition. I 
know I would prefer the former. 

Andy LoPresto
alopre...@apache.org
alopresto.apa...@gmail.com
He/Him
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

> On Jun 11, 2020, at 8:14 AM, Mark Payne <marka...@hotmail.com> wrote:
> 
> Jason,
> 
> Modify vs. clone vs. create new:
> 
> You would clone a FlowFile if you want an exact copy of the FlowFile (with 
> the exception that the clone will have a unique UUID, Entry Date, etc.). Very 
> rare that a Processor will actually do this.
> 
> Modify vs. create a “Child” FlowFiles (i.e., `session.create(original);` ) - 
> This is a judgment call really. Do you think it will be necessary to have a 
> copy of the original FlowFile and a modified version of it? If so, you may 
> want to create a child FlowFile and send the original FlowFile to original. 
> In reality, you shouldn’t need this often. In most cases, if the user wants 
> both the original and the modified version, they can just add two 
> connections, one going to this processor and one going to wherever else they 
> want the FlowFile. This will cause NiFi to implicitly clone the FlowFile. 
> Where the “create a child and send out the original” matters is just when 
> there’s a feasible use case in which the user would want to have a modified 
> version of the FlowFile and the original version of the FlowFile and also not 
> want to process the original version until after the modified version has 
> been created. This is not common. However, over the years, it has become a 
> common practice to create “original” relationships when they are not needed, 
> likely because a few developers saw a pattern of creating an original 
> relationship and duplicated this to many other processors without really 
> understanding the difference.
> 
> “Net New” - there are two ways to create a FlowFile: `session.create()` and 
> `session.create(original);` - the first creates a FlowFile with no parent 
> FlowFile. This should be done only if there is no inbound FlowFile to create 
> it from. I.e., when this is a “source” processor. In 100% of all other cases, 
> it should be done as `session.create(original);` Providing the original 
> FlowFile does 2 important things. Firstly, it creates a linkage in provenance 
> between them. Secondly, it causes the newly created FlowFile to inherit all 
> attributes from the child.
> 
> Call vs. non-callback: It doesn’t matter. The callback was originally the 
> only way to read or write content of FlowFiles. It was done this way because 
> it was a straight-forward way to ensure that the framework was able to 
> properly manage InputStream, OutputStream, etc. But there were use cases that 
> didn’t fit the callback mechanism well so we eventually added ability to get 
> the InputStreams and OutputStreams directly and callers can just use 
> try-with-resources. This is probably preferred now for most cases just 
> because it results in cleaner code.
> 
> Thanks
> -Mark
> 
>> On Jun 11, 2020, at 10:43 AM, Jason Iannone <bread...@gmail.com 
>> <mailto:bread...@gmail.com>> wrote:
>> 
>> I confirmed what you mentioned as well. 
>> 
>> I also looked over many custom processor examples and looking for 
>> clarification on a few things which I didn't see explicitly called out in 
>> the developers guide.
>> Are their guidelines on when one should modify the original flowfile vs when 
>> you should clone vs when you should create net new?
>> Should heavier lifting such as decryption, formatting, etc. be done in a 
>> callback?
>> 
>> Thanks,
>> Jason
>> 
>> On Wed, Jun 10, 2020 at 4:32 PM Mark Payne <marka...@hotmail.com 
>> <mailto:marka...@hotmail.com>> wrote:
>> I don’t think flushing should matter, if you’re writing directly to the 
>> provided OutputStream. If you wrap it in a BufferedOutputStream or something 
>> like that, then of course you’ll want to flush that. Assuming that you are 
>> extending AbstractProcessor, it will call session.commit() for you 
>> automatically when onTrigger() returns.
>> 
>> I did just notice that you said you’re merging 1,000+ FlowFiles. That would 
>> make it kind of difficult to follow the provenance. Would recommend for 
>> debugging purposes, at least, that you try making small batches, maybe 25 
>> FlowFiles or something like that. Would make it a lot easier to find the 
>> culprit
>> 
>>> On Jun 10, 2020, at 4:28 PM, Jason Iannone <bread...@gmail.com 
>>> <mailto:bread...@gmail.com>> wrote:
>>> 
>>> Excellent advice, thank you! When writing via 
>>> ProcessSession.write(FlowFile, OutputStream) is it advised to flush and/or 
>>> session.commit()? I noticed we aren't doing either, but we are invoking 
>>> session.transfer.
>>> 
>>> Thanks,
>>> Jason
>>> 
>>> 
>>> On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <marka...@hotmail.com 
>>> <mailto:marka...@hotmail.com>> wrote:
>>> Jason,
>>> 
>>> Control characters should not cause any problem with MergeContent. 
>>> MergeContent just copies bytes from one stream to another. It’s also worth 
>>> noting that attributes don’t really come into play here. MergeContent is 
>>> combining the FlowFile content, so even if it has some weird attributes, 
>>> those won’t cause a problem in the output content. NiFi stores attributes 
>>> as a mapping of String to String key/value pairs (i.e., Map<String, 
>>> String>). So the processor is assuming that if you want to convert a 
>>> message header to an attribute, that header must be a string.
>>> 
>>> Content in the repository is stored using “slabs” or “blocks.” One 
>>> processor at a time has the opportunity to write to a file in the content 
>>> repository. When the processor finishes writing and transfers the FlowFile 
>>> to the next processor, NiFi keeps track of which file its content was 
>>> written to, the byte offset where its content starts, and the length of the 
>>> content. The next time that a processor needs to write to the content of a 
>>> FlowFile, it may end up appending to that same file on disk, but the 
>>> FlowFile that the content corresponds to will keep track of the byte offset 
>>> into the file where its content begins and how many bytes in that file 
>>> belong to that FlowFile.
>>> 
>>> My recommendation to track this down would be to find a FlowFile that is 
>>> corrupt, and then use the data provenance feature [1] to view its lineage. 
>>> Look at the FlowFiles that were joined together by MergeContent and see if 
>>> any of those is corrupt.
>>> 
>>> Thanks
>>> -Mark
>>> 
>>> [1] 
>>> http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance 
>>> <http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance>
>>> 
>>>> On Jun 10, 2020, at 2:07 PM, Jason Iannone <bread...@gmail.com 
>>>> <mailto:bread...@gmail.com>> wrote:
>>>> 
>>>> Hey Mark,
>>>> 
>>>> I was thinking over this more and despite no complaints from Jackson 
>>>> Objectmapper is it possible that hidden and/or control characters are 
>>>> present in the JSON values which would then cause MergeContent to behave 
>>>> this way? I looked over the code and nothing jumped out, but there is 
>>>> something we had to do because of how the publisher is setting kafka 
>>>> header attributes. Some attributes are bytes and not strings converted to 
>>>> bytes, and ConsumeKafka seems to assume that these can always be converted 
>>>> to a String. We had to change the encoding to be ISO8859 due to running 
>>>> into issues with the bytes getting corrupted.
>>>> 
>>>> I'm also trying to better understand how the content is being stored in 
>>>> the content repository, and whether something is going wrong when writing 
>>>> it out.
>>>> 
>>>> Thanks,
>>>> Jason
>>>> 
>>>> On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <marka...@hotmail.com 
>>>> <mailto:marka...@hotmail.com>> wrote:
>>>> Hey Jason,
>>>> 
>>>> Thanks for reaching out. That is definitely odd and not something that 
>>>> I’ve seen or heard about before.
>>>> 
>>>> Are you certain that the data is not being corrupted upstream of the 
>>>> processor? I ask because the code for the processor that handles writing 
>>>> out the content is pretty straight forward and hasn’t been modified in 
>>>> over 3 years, so I would expect to see it happen often if it were a bug in 
>>>> the MergeContent processor itself. Any chance that you can create a flow 
>>>> template/sample data that recreates the issue? Anything particularly 
>>>> unique about your flow?
>>>> 
>>>> Thanks
>>>> -Mark
>>>> 
>>>> 
>>>> > On Jun 9, 2020, at 6:47 PM, Jason Iannone <bread...@gmail.com 
>>>> > <mailto:bread...@gmail.com>> wrote:
>>>> > 
>>>> > Hi all,
>>>> > 
>>>> > Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. 
>>>> > The processor is being fed in many flowfiles with individual JSON 
>>>> > records. The records have various field types including a hex-encoded 
>>>> > byte[]. We are not trying to merge JSON records themselves but rather 
>>>> > consolidate many flowfiles into fewer flowfiles.
>>>> > 
>>>> > What we're seeing is that a random flowfile is split causing the merge 
>>>> > file to be invalid JSON. When running multiple bins we saw the flowfile 
>>>> > split across bins.
>>>> > 
>>>> > Example
>>>> > Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: 
>>>> > "123456789" }
>>>> > Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: 
>>>> > "123456790" } 
>>>> > Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: 
>>>> > "123456790" } 
>>>> > 
>>>> > Merged Result:
>>>> > {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" } 
>>>> > xbytes": A10F15D14B11", timestamp: "123456790" }  
>>>> > {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" } 
>>>> > {"name": "3", "h  
>>>> > 
>>>> > Mergecontent Configuration:
>>>> > Concurrent Tasks: 4
>>>> > Merge Strategy: Bin-Packing Algorithm
>>>> > Merge Format: Binary Concatenation
>>>> > Attribute Strategy: Keep Only Common Attributes
>>>> > Min. number of entries 1000
>>>> > Max number of entries: 20000
>>>> > Minimum group size: 10 KB
>>>> > Maximum number of bins: 5
>>>> > Header, Footer, and Demaractor are not set.
>>>> > 
>>>> > We then backed off the below to reduce min and max entries, bin to 1, 
>>>> > and thread to 1 and still see the same issue.
>>>> > 
>>>> > Any insights?
>>>> > 
>>>> > Thanks,
>>>> > Jason
>>>> 
>>> 
>> 
> 

Reply via email to