[ https://issues.apache.org/jira/browse/NIFI-2735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15468824#comment-15468824 ]
Matt Burgess commented on NIFI-2735: ------------------------------------ Good feedback, thanks! Here's what I've thought so far: 1) Currently there is no approach for that in AggregateValues because I inferred that partial aggregation means data loss or the flow files just haven't arrived yet. Not sure about the semantics of expiration, unless the user is comfortable with such a thing and then it can definitely be added. Another option I thought of is a RouteOnAttributeWithDecrement, which would be another barrier-style processor (that would expect all splits to come in), but would wait until they come in before transferring to relationships, and would update their fragment.count (aka split count) attributes en masse. Then from the original split, branches can be taken and the correct count would be preserved. 2) Adding a capability for an expression would be awesome, right now it's partially supported for those operations that take a parameter, since the Operation Parameter supports Expression Language. It would be cool to add Janino so the expression could be written in Java, for example. 3) AggregateValues uses State Management with the fragment identifier, so as long as there's no data loss, the processor should start/stop/resume where it left off. #1 and #3 together bring up another interesting point, which is how this processor would work in a clustered environment. That will take some thought, and would likely not be supported for this go-around, instead we may want a new NiFi concept of Barrier or whatever to synchronize across instances. That'd merge together the Flow-Based Programming and Bulk Synchronous Parallel paradigms, but could offer a ton of power and flexibility. For AggregateValues, its functionality was semi-intentionally left simple (using fragment.* attributes and trusting them implicitly) such that the data flow designer could use UpdateAttribute or whatever and achieve the desired result. For example, if you are using ListFile->FetchFile and somehow you knew the count of files, you could use AggregateValues as a barrier to "wait" until all flow files reached that processor after whatever previous processing is complete. The idea was for micro-batches and very simple aggregations/barriers vs a full EIP Aggregation pattern. I was hoping the inclusion of this first simple processor would get the wheels turning for more capabilities. > Add processor to perform simple aggregations > -------------------------------------------- > > Key: NIFI-2735 > URL: https://issues.apache.org/jira/browse/NIFI-2735 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Matt Burgess > Assignee: Matt Burgess > > This is a proposal for a new processor (AggregateValues, for example) that > can perform simple aggregation operations such as count, sum, average, min, > max, and concatenate, over a set of "related" flow files. For example, when a > JSON file is split on an array (using the SplitJson processor), the total > count of the splits, the index of each split, and the unique indentifier > (shared by each split) are stored as attributes in each flow file sent to the > "splits" relationship: > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi.processors.standard.SplitJson/index.html > These attributes are the "fragment.*" attributes in the documentation for > SplitText, SplitXml, and SplitJson, for example. > Such a processor could perform these operations for each flow file split from > the original document, and when all documents from a split have been > processed, a flow file could be transferred to an "aggregate" relationship > containing attributes for the operation, aggregate value, etc. > An interesting application of this (besides the actual aggregation > operations) is that you can use the "aggregate" relationship as an event > trigger. For example if you need to wait until all files from a group are > processed, you can use AggregateValues and the "aggregate" relationship to > indicate downstream that the entire group has been processed. If there is not > a Split processor upstream, then the attributes (fragment.*) would have to be > manipulated by the data flow designer, but this can be accomplished with > other processors (including the scripting processors if necessary). -- This message was sent by Atlassian JIRA (v6.3.4#6332)