[ 
https://issues.apache.org/jira/browse/NIFI-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468824#comment-15468824
 ] 

Matt Burgess commented on NIFI-2735:
------------------------------------

Good feedback, thanks! Here's what I've thought so far:

1) Currently there is no approach for that in AggregateValues because I 
inferred that partial aggregation means data loss or the flow files just 
haven't arrived yet. Not sure about the semantics of expiration, unless the 
user is comfortable with such a thing and then it can definitely be added. 
Another option I thought of is a RouteOnAttributeWithDecrement, which would be 
another barrier-style processor (that would expect all splits to come in), but 
would wait until they come in before transferring to relationships, and would 
update their fragment.count (aka split count) attributes en masse. Then from 
the original split, branches can be taken and the correct count would be 
preserved.

2) Adding a capability for an expression would be awesome, right now it's 
partially supported for those operations that take a parameter, since the 
Operation Parameter supports Expression Language. It would be cool to add 
Janino so the expression could be written in Java, for example.

3) AggregateValues uses State Management with the fragment identifier, so as 
long as there's no data loss, the processor should start/stop/resume where it 
left off.

#1 and #3 together bring up another interesting point, which is how this 
processor would work in a clustered environment. That will take some thought, 
and would likely not be supported for this go-around, instead we may want a new 
NiFi concept of Barrier or whatever to synchronize across instances. That'd 
merge together the Flow-Based Programming and Bulk Synchronous Parallel 
paradigms, but could offer a ton of power and flexibility.

For AggregateValues, its functionality was semi-intentionally left simple 
(using fragment.* attributes and trusting them implicitly) such that the data 
flow designer could use UpdateAttribute or whatever and achieve the desired 
result. For example, if you are using ListFile->FetchFile and somehow you knew 
the count of files, you could use AggregateValues as a barrier to "wait" until 
all flow files reached that processor after whatever previous processing is 
complete.  The idea was for micro-batches and very simple aggregations/barriers 
vs a full EIP Aggregation pattern. I was hoping the inclusion of this first 
simple processor would get the wheels turning for more capabilities.

> Add processor to perform simple aggregations
> --------------------------------------------
>
>                 Key: NIFI-2735
>                 URL: https://issues.apache.org/jira/browse/NIFI-2735
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>
> This is a proposal for a new processor (AggregateValues, for example) that 
> can perform simple aggregation operations such as count, sum, average, min, 
> max, and concatenate, over a set of "related" flow files. For example, when a 
> JSON file is split on an array (using the SplitJson processor), the total 
> count of the splits, the index of each split, and the unique indentifier 
> (shared by each split) are stored as attributes in each flow file sent to the 
> "splits" relationship:
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.SplitJson/index.html
> These attributes are the "fragment.*" attributes in the documentation for 
> SplitText, SplitXml, and SplitJson, for example.
> Such a processor could perform these operations for each flow file split from 
> the original document, and when all documents from a split have been 
> processed, a flow file could be transferred to an "aggregate" relationship 
> containing attributes for the operation, aggregate value, etc.
> An interesting application of this (besides the actual aggregation 
> operations) is that you can use the "aggregate" relationship as an event 
> trigger. For example if you need to wait until all files from a group are 
> processed, you can use AggregateValues and the "aggregate" relationship to 
> indicate downstream that the entire group has been processed. If there is not 
> a Split processor upstream, then the attributes (fragment.*) would have to be 
> manipulated by the data flow designer, but this can be accomplished with 
> other processors (including the scripting processors if necessary). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to