Hi Robert,
On 9 July 2016 at 00:25, Robert Metzger <[email protected]> wrote:
> Hi Yukun,
>
> can you also post the code how you are invoking the GenericFlatMapper on
> the mailing list?
>
Here is the code defining the topology:
DataStream<String> stream = ...;
stream
.keyBy(new KeySelector<String, Integer>() {
@Override
public Integer getKey(String x) throws Exception {
return x.hashCode() % 10;
}
})
.timeWindow(Time.seconds(10))
.fold(new TreeMap<String, Long>(), new FoldFunction<String,
SortedMap<String, Long>>() {
@Override
public SortedMap<String, Long> fold(SortedMap<String, Long> map,
String x) {
Long current = map.get(x);
Long updated = current != null ? current + 1 : 1;
map.put(x, updated);
return map;
}
})
.flatMap(new GenericFlatMapper<String>())
.returns(new TypeHint<Tuple2<String, Long>>(){}.getTypeInfo())
// throws InvalidTypesException if you comment out this line
.print();
>
> The Java compiler is usually dropping the generic types during compilation
> ("type erasure"), that's why we can not infer the types.
>
>
The error message implies type extraction should be possible when "all
variables in the return type can be deduced from the input type(s)". This
is true for flatMap(Tuple2<Long, T>, Collector<Tuple2<T, String>>), but if
the signature is changed to void flatMap(SortedMap<T, Long>,
Collector<Tuple2<T, Long>>), type inference fails.
>
> On Fri, Jul 8, 2016 at 12:27 PM, Yukun Guo <[email protected]> wrote:
>
>> Hi,
>> When I run the code implementing a generic FlatMapFunction, Flink
>> complained about InvalidTypesException:
>>
>> public class GenericFlatMapper<T> implements FlatMapFunction<SortedMap<T,
>> Long>, Tuple2<T, Long>> {
>> @Override
>> public void flatMap(SortedMap<T, Long> m, Collector<Tuple2<T, Long>>
>> out) throws Exception {
>> for (Map.Entry<T, Long> entry : m.entrySet()) {
>> out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
>> }
>> }
>> }
>>
>>
>> *Exception in thread "main"
>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>> type of function could not be determined automatically, due to type
>> erasure. You can give type information hints by using the returns(...)
>> method on the result of the transformation call, or by letting your
>> function implement the 'ResultTypeQueryable' interface.*
>>
>> *...*
>> *Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
>> Type of TypeVariable 'T' in 'class GenericFlatMapper' could not be
>> determined. This is most likely a type erasure problem. The type extraction
>> currently supports types with generic variables only in cases where all
>> variables in the return type can be deduced from the input type(s).*
>>
>> This puzzles me as Flink should be able to infer the type from arguments.
>> I know returns(...) or other workarounds to give type hint, but they are
>> kind of verbose. Any suggestions?
>>
>>
>