Hey,
I have a FlatMap that uses some generics (appended at the end of the mail).
I have some trouble with the type inference running into
InvalidTypesException on the first line in the open function.
How can I fix it?
Cheers Martin
public class AnomalyFlatMap<M extends Model,V extends ModelValue, T>
extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
private transient ValueState<M> microModel;
private final double threshold;
private boolean updateIfAnomaly;
private M initModel;
public AnomalyFlatMap(double threshold, M model, boolean updateIfAnomaly) {
this.threshold = threshold;
this.updateIfAnomaly = updateIfAnomaly;
this.initModel = model;
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<M> descriptor =
new ValueStateDescriptor<>(
"RollingMicroModel",
TypeInformation.of(new TypeHint<M>() {
}),initModel
);
microModel = getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly,
T>> collector) throws Exception {
M model = microModel.value();
Anomaly res = model.calculateAnomaly(sample.f0);
if ( res.getScore() <= threshold || updateIfAnomaly){
model.addWindow(sample.f0);
microModel.update(model);
}
collector.collect(new Tuple2<>(res,sample.f1));
}
}