Hi Ken,

You can also use an additional ProcessWindowFunction [1] that is applied on
the result of the AggregateFunction to set the key.
Since the PWF is only applied on the final result, there no overhead
(actually, it might even be slightly cheaper because the AggregateFunction
can be simpler).

If you don't want to use a PWF, your approach is the right one.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation

2018-05-03 19:53 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:

> Hi list,
>
> I was trying different ways to implement a moving average (count based,
> not time based).
>
> The blunt instrument approach is to create a custom FlatMapFunction that
> keeps track of the last N values.
>
> It seemed like using an AggregateFunction would be most consistent with
> the Flink API, along the lines of...
>
>             .keyBy(new MyKeySelector())
>             .window(GlobalWindows.create())
>             .trigger(CountTrigger.of(1))
>             .aggregate(new MovingAverageAggregator(10))
>
> This works, but the API for the AggregateFunction
> (MovingAverageAggregator) feels a bit odd.
>
> Specifically, I want to emit a <key, moving average> result from
> getResult(), but the key isn’t passed to the createAccumulator() method,
> nor is it passed to the getResult() method. So in the add() method I check
> if the accumulator I’ve created has a key set, and if not then I extract
> the key from the record and set it on the accumulator, so I can use it in
> the getResult() call.
>
> Is this expected, or am I miss-using the functionality?
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>

Reply via email to