Hi,

I have two possible options to achieve this. The first option is that you could 
obviously always derive the key again from the input of the aggregate function. 
The second option is combining an AggregateFunction with a 
ProcessWindowFunction. With the second solution you get incremental aggregation 
and the ProcessWindowFunction is only called once in the end with the result.

Best,
Stefan

> Am 03.05.2018 um 19:53 schrieb Ken Krugler <kkrugler_li...@transpac.com>:
> 
> Hi list,
> 
> I was trying different ways to implement a moving average (count based, not 
> time based).
> 
> The blunt instrument approach is to create a custom FlatMapFunction that 
> keeps track of the last N values.
> 
> It seemed like using an AggregateFunction would be most consistent with the 
> Flink API, along the lines of...
> 
>             .keyBy(new MyKeySelector())
>             .window(GlobalWindows.create())
>             .trigger(CountTrigger.of(1))
>             .aggregate(new MovingAverageAggregator(10))
> 
> This works, but the API for the AggregateFunction (MovingAverageAggregator) 
> feels a bit odd.
> 
> Specifically, I want to emit a <key, moving average> result from getResult(), 
> but the key isn’t passed to the createAccumulator() method, nor is it passed 
> to the getResult() method. So in the add() method I check if the accumulator 
> I’ve created has a key set, and if not then I extract the key from the record 
> and set it on the accumulator, so I can use it in the getResult() call.
> 
> Is this expected, or am I miss-using the functionality?
> 
> Thanks,
> 
> — Ken
> 
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to