Hej,

I already tried TypeInformation.of(initModel.class) and it complained that
initModel class is unknown. (Since it's of type M)
I added a function to the model.class that returns the TypeInformation its
working now though I still don't understand what happend behind the scenes
and what I changed :-)

cheers Martin


On Thu, Apr 21, 2016 at 2:27 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> you're right there is not much (very little) in the documentation about
> TypeInformation. There is only the description in the JavaDoc:
> TypeInformation
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html>
>  Essentially,
> how it works is that we always use a TypeSerializer<T> when we have to
> serialize values (for sending across network, storing in state, etc.). A
> TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation
> can be obtained in several ways:
>
>  - the TypeExtractor tries to analyze user functions to determine a
> TypeInformation for the input and output type
>  - the TypeExtractor can try and analyze a given Class<T> to determine a
> TypeInformation
>  - the Scala API uses macros and implicit parameters to create
> TypeInformation
>  - TypeHint can be created to retrieve a TypeInformation
>  - a TypeInformation can be manually constructed
>
> tl;dr In your case you can try TypeInformation.of(initModel.class). If
> that doesn't work you can try and pass in a function that gives you a
> TypeInformation for your model type M.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mneum...@sics.se> wrote:
>
>> Hej,
>>
>> I pass an instance of M in the constructor of the class, can I use that
>> instead? Maybe give the class a function that returns the right
>> TypeInformation? I'm trying figure out how TypeInformation works to better
>> understand the Issue is there any documentation about this? At the moment I
>> don't really understand what TypeInformation does and how it works.
>>
>> cheers Martin
>>
>> On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I think it doesn't work because the concrete type of M is not available
>>> to create a TypeInformation for M. What you can do is manually pass a
>>> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use
>>> that when creating the state descriptor.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote:
>>>
>>>> Hey,
>>>>
>>>> I have a FlatMap that uses some generics (appended at the end of the
>>>> mail).
>>>> I have some trouble with the type inference running into
>>>> InvalidTypesException on the first line in the open function.
>>>>
>>>> How can I fix it?
>>>>
>>>> Cheers Martin
>>>>
>>>>
>>>>
>>>>
>>>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> 
>>>> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> {
>>>>     private transient ValueState<M> microModel;
>>>>     private final double threshold;
>>>>     private boolean updateIfAnomaly;
>>>>     private M initModel;
>>>>
>>>>     public AnomalyFlatMap(double threshold, M model, boolean 
>>>> updateIfAnomaly) {
>>>>         this.threshold = threshold;
>>>>         this.updateIfAnomaly = updateIfAnomaly;
>>>>         this.initModel = model;
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     public void open(Configuration parameters) throws Exception {
>>>>         ValueStateDescriptor<M> descriptor =
>>>>                 new ValueStateDescriptor<>(
>>>>                         "RollingMicroModel",
>>>>                         TypeInformation.of(new TypeHint<M>() {
>>>>                         }),initModel
>>>>                         );
>>>>         microModel = getRuntimeContext().getState(descriptor);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> 
>>>> collector) throws Exception {
>>>>         M model = microModel.value();
>>>>         Anomaly res  = model.calculateAnomaly(sample.f0);
>>>>
>>>>         if ( res.getScore() <= threshold || updateIfAnomaly){
>>>>             model.addWindow(sample.f0);
>>>>             microModel.update(model);
>>>>         }
>>>>         collector.collect(new Tuple2<>(res,sample.f1));
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>

Reply via email to