HI,
I have a class defined :

public class MGroupingWindowAggregate implements AggregateFunction.. {
> private final Map<String, Object> keyHistMap = new TreeMap<>();
> }
>
In the constructor, I initialize it.

> public MGroupingWindowAggregate() {
> Histogram minHist = new Histogram(new
> SlidingTimeWindowReservoir(timeIntervalL, TimeUnit.MINUTES));
> org.apache.flink.metrics.Histogram minHistogram = new
> DropwizardHistogramWrapper(minHist);
> Map<String, org.apache.flink.metrics.Histogram> intervalHistMap = new
> TreeMap<>();
> intervalHistMap.putIfAbsent(interval, minHistogram);
> keyHistMap.putIfAbsent(operationKey, intervalHistMap);
> }
>
When trying to use it in the add() method of AggregateFunction, it fails
saying:
NotSerializableException:
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper

Tried to wrap DropwizardHistogramWrapper inside a serializable Object with
Composition but that also didn't work.

Looked at using RichFunction open() based on Stephan's advise here.
https://stackoverflow.com/questions/34118469/flink-using-dagger-injections-not-serializable
But cannot use RichFunction with AggrgeateFunction or use
RichAggregateFunction

How can I use the DropwizardHistogramWrapper -a non serializable class
inside my AggregateFunction ? Trying to use DropwizardHistogramWrapper to
get some Histogram percentile stats without re-inventing the wheel.

TIA,
Vijay

Reply via email to