Hey, I have a FlatMap that uses some generics (appended at the end of the mail). I have some trouble with the type inference running into InvalidTypesException on the first line in the open function.
How can I fix it? Cheers Martin public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> { private transient ValueState<M> microModel; private final double threshold; private boolean updateIfAnomaly; private M initModel; public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) { this.threshold = threshold; this.updateIfAnomaly = updateIfAnomaly; this.initModel = model; } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<M> descriptor = new ValueStateDescriptor<>( "RollingMicroModel", TypeInformation.of(new TypeHint<M>() { }),initModel ); microModel = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> collector) throws Exception { M model = microModel.value(); Anomaly res = model.calculateAnomaly(sample.f0); if ( res.getScore() <= threshold || updateIfAnomaly){ model.addWindow(sample.f0); microModel.update(model); } collector.collect(new Tuple2<>(res,sample.f1)); } }