Hey,

I have a FlatMap that uses some generics (appended at the end of the mail).
I have some trouble with the type inference running into
InvalidTypesException on the first line in the open function.

How can I fix it?

Cheers Martin




public class AnomalyFlatMap<M extends Model,V extends ModelValue, T>
extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
    private transient ValueState<M> microModel;
    private final double threshold;
    private boolean updateIfAnomaly;
    private M initModel;

    public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
        this.threshold = threshold;
        this.updateIfAnomaly = updateIfAnomaly;
        this.initModel = model;

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<M> descriptor =
                new ValueStateDescriptor<>(
                        "RollingMicroModel",
                        TypeInformation.of(new TypeHint<M>() {
                        }),initModel
                        );
        microModel = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
        M model = microModel.value();
        Anomaly res  = model.calculateAnomaly(sample.f0);

        if ( res.getScore() <= threshold || updateIfAnomaly){
            model.addWindow(sample.f0);
            microModel.update(model);
        }
        collector.collect(new Tuple2<>(res,sample.f1));
    }
}

Reply via email to