Steve, In a flow-based system like NiFi it can be difficult to tell when there are "no more events with the same key". However if you have a notion of the max amount of time where events would appear, you might be able to use MergeRecord or MergeContent with a Correlation Attribute Name of "eventUuid" with a Min and Max Bin Age of that time interval (and make sure the Max Records is set higher than you'd ever expect for the number of events in the system over that time). When MergeRecord emits a flow file, you are assured that all the records have the same eventUuid, and then you might use QueryRecord to execute SQL on the records which allows you to do some aggregations of the measures.
This won't be a perfect system as MergeRecord/Content only has the number of bins (unique eventUuids, e.g.) you specify, so it's possible that a single flow file does not contain all events for that UUID over that time period. You may be able to use an incremental pipeline like MergeRecord -> PartitionRecord -> MergeRecord -> QueryRecord. If there's a timestamp field in each record you could have something like "MIN(timestamp) as min_timestamp, MAX(timestamp) as max_timestamp, AVG(myEventMeasure) GROUP BY eventUuid" to do the aggregates. It might not be an exact window size but "max_timestamp - min_timestamp" would give you the width of the window for the records in each flow file. Queries over streams is an active research area, the folks working on Apache Calcite are doing some really cool things, and we've got Calcite powering QueryRecord under the hood, so I'm hoping we can continue to enjoy the fruits of their labor :) Regards, Matt On Fri, Aug 23, 2019 at 4:03 PM Steve Robert <contact.steverob...@gmail.com> wrote: > > Hi Brian , > > thank you for your reply. > To be precise I studied the possibility to open a window on an attribute > value and close it after an user-defined time and if there are no more > events with the same key. > In flink side lot operator send Event with SUCCESS OR FAIL . operator is > running in paralleles. > I wanted to be able to do a aggregation to determine if there was an error or > not during a life cycle iteration. each life cycle is unique and associated > with a UUID > > Le ven. 23 août 2019 à 21:00, Bryan Bende <bbe...@gmail.com> a écrit : >> >> I think AttributeRollingWindow was made to aggregate a numeric >> attribute. So for example, if all flows files had an attribute called >> "amount" that was an integer, then you could say Value to Track = >> ${amount} and it would aggregate those values over window. >> >> In your case the attribute you have is not the value to aggregate, its >> a key. The value in your case is a constant of 1 where you want to >> increment the aggregated value by 1 whenever you see the key. >> >> I guess it depends what you want to do with the aggregated value, but >> your situation seems closer to a counter which could be tracked with >> UpdateCounter setting the Counter Name to ${eventUuid} and the Delta >> to 1, but then I don't know exactly how you would use this counter >> later. >> >> On Fri, Aug 23, 2019 at 1:59 PM Steve Robert >> <contact.steverob...@gmail.com> wrote: >> > >> > Hi Guys , >> > >> > I apologize in advance if my question seems trivial but I am new on Nifi. >> > I'm studying Nifi for an integration with Flink that I'm used to. >> > >> > Since flink I send events using Site-to-Site to Nifi. >> > the Flowfile have attribute >> > "eventUuid":"97f82c90-0782-4aab-8850-56ee60b0b73d" >> > >> > I would like to do an aggregation on events based on the value of this >> > attribute, >> > >> > so I look at the documentation of AttributeRollingWindow. >> > >> > I understood by looking at the errors that this operator does not accept >> > strings and therefore can not track a value on String >> > Value to Track : ${eventUuid} will throw NumberFormatExceptions. >> > >> > Value to Track : ${eventUuid:toNumber()} will not work because UUID 128 >> > bit can't be convert to an 32 bytes number; >> > >> > Is this a limitation or a bad approach on my part? I can not use an event >> > on the timestamp here because this UUID is generated at the beginning of >> > the workflow >> > Thank a lot for your help >> > Steve >> > >> >