Hi, I'm sorry, I meant TypeInformation.of(initModel.getClass()). On Thu, 21 Apr 2016 at 15:17 Martin Neumann <mneum...@sics.se> wrote:
> Hej, > > I already tried TypeInformation.of(initModel.class) and it complained > that initModel class is unknown. (Since it's of type M) > I added a function to the model.class that returns the TypeInformation its > working now though I still don't understand what happend behind the scenes > and what I changed :-) > > cheers Martin > > > On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> you're right there is not much (very little) in the documentation about >> TypeInformation. There is only the description in the JavaDoc: >> TypeInformation >> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> >> Essentially, >> how it works is that we always use a TypeSerializer<T> when we have to >> serialize values (for sending across network, storing in state, etc.). A >> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation >> can be obtained in several ways: >> >> - the TypeExtractor tries to analyze user functions to determine a >> TypeInformation for the input and output type >> - the TypeExtractor can try and analyze a given Class<T> to determine a >> TypeInformation >> - the Scala API uses macros and implicit parameters to create >> TypeInformation >> - TypeHint can be created to retrieve a TypeInformation >> - a TypeInformation can be manually constructed >> >> tl;dr In your case you can try TypeInformation.of(initModel.class). If >> that doesn't work you can try and pass in a function that gives you a >> TypeInformation for your model type M. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mneum...@sics.se> wrote: >> >>> Hej, >>> >>> I pass an instance of M in the constructor of the class, can I use that >>> instead? Maybe give the class a function that returns the right >>> TypeInformation? I'm trying figure out how TypeInformation works to better >>> understand the Issue is there any documentation about this? At the moment I >>> don't really understand what TypeInformation does and how it works. >>> >>> cheers Martin >>> >>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> I think it doesn't work because the concrete type of M is not available >>>> to create a TypeInformation for M. What you can do is manually pass a >>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use >>>> that when creating the state descriptor. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote: >>>> >>>>> Hey, >>>>> >>>>> I have a FlatMap that uses some generics (appended at the end of the >>>>> mail). >>>>> I have some trouble with the type inference running into >>>>> InvalidTypesException on the first line in the open function. >>>>> >>>>> How can I fix it? >>>>> >>>>> Cheers Martin >>>>> >>>>> >>>>> >>>>> >>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> >>>>> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> { >>>>> private transient ValueState<M> microModel; >>>>> private final double threshold; >>>>> private boolean updateIfAnomaly; >>>>> private M initModel; >>>>> >>>>> public AnomalyFlatMap(double threshold, M model, boolean >>>>> updateIfAnomaly) { >>>>> this.threshold = threshold; >>>>> this.updateIfAnomaly = updateIfAnomaly; >>>>> this.initModel = model; >>>>> >>>>> } >>>>> >>>>> @Override >>>>> public void open(Configuration parameters) throws Exception { >>>>> ValueStateDescriptor<M> descriptor = >>>>> new ValueStateDescriptor<>( >>>>> "RollingMicroModel", >>>>> TypeInformation.of(new TypeHint<M>() { >>>>> }),initModel >>>>> ); >>>>> microModel = getRuntimeContext().getState(descriptor); >>>>> } >>>>> >>>>> @Override >>>>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, >>>>> T>> collector) throws Exception { >>>>> M model = microModel.value(); >>>>> Anomaly res = model.calculateAnomaly(sample.f0); >>>>> >>>>> if ( res.getScore() <= threshold || updateIfAnomaly){ >>>>> model.addWindow(sample.f0); >>>>> microModel.update(model); >>>>> } >>>>> collector.collect(new Tuple2<>(res,sample.f1)); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>> >