Hi, I changed the WordCount example as below and I am wondering, why this does not work. Looking at the Flink internal implemenation, it make sense that it fails. However, from an API point of view it should work. Is this a bug or do I miss understand the semantic of .returns(...)?
The change is about type inference and generics. I removed the generic
type information in tokenizer and provide the output type via
returns(...) methods. Whoever, I get an exception (see below).
Tokenizer is change to this (removed generics and added cast to String):
> public static final class Tokenizer implements FlatMapFunction {
> public void flatMap(Object value, Collector out) {
> String[] tokens = ((String) value).toLowerCase().split("\\W+");
> for (String token : tokens) {
> if (token.length() > 0) {
> out.collect(new Tuple2<String, Integer>(token,
> 1));
> }
> }
> }
> }
I added call to "returns(....)" here:
> DataSet<Tuple2<String, Integer>> counts =
> text.flatMap(new Tokenizer()).returns("Tuple2<String,Integer>")
> .groupBy(0).sum(1);
The exception is:
> Exception in thread "main" java.lang.IllegalArgumentException: The types of
> the interface org.apache.flink.api.common.functions.FlatMapFunction could not
> be inferred. Support for synthetic interfaces, lambdas, and generic types is
> limited at this point.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:686)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterTypeFromGenericType(TypeExtractor.java:710)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:673)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:365)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:279)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:120)
> at org.apache.flink.api.java.DataSet.flatMap(DataSet.java:262)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:69)
-Matthias
signature.asc
Description: OpenPGP digital signature
