Hi Tran Nam-Luc! Java 8 lambdas are a bit tricky right now, because of Java's generic type erasure. Flink needs the type information of all Functions before sending programs to the execution engine. (for more details, see here: https://github.com/apache/flink/blob/master/docs/internal_types_serialization.md )
The type analysis is part of how we make program execution very robust, even on really large data. By not just passing Java Objects and using ad-hoc serialization frameworks, but by understanding the types up front and parameterizing the runtime accordingly, we get efficiency and reliability in the execution. We can also do a better job in finding errors before the parallel execution. The JDT compiler accidentally dropped generic types on lambdas a while back, but we submitted a patch to reintroduce them (see [1]) and it will be part of the next major Eclipse release. Until then, to use Java 8 lambdas you need to follow these points: - If the function returns a simple type (like int, long, String) or any non-generic class, it should work out of the box. - If the function returns a generic type (like tuple), you need to use a type hint to tell the system the return type. Below is an example for that. DataSet<Long> data = ... DataSet<Tuple2<Long, Double>> = data.map( value -> new Tuple2<>(value, value * 0.5) ).returns("Tuple2<Long,Double>"); The type hints are a workaround that will be obsolete once Java compiler support is available. Greetings, Stephan [1] https://bugs.eclipse.org/bugs/show_bug.cgi?id=449063 On Fri, Feb 6, 2015 at 12:27 PM, Nam-Luc Tran <namluc.t...@euranova.eu> wrote: > Hello, > > I am trying to use Java 8 lambdas in my project and hit the following > error: > > Exception in thread "main" > org.apache.flink.api.common.functions.InvalidTypesException: The > generic type parameters of 'Tuple2' are missing. > It seems that your compiler has not stored them into the .class > file. > Currently, only the Eclipse JDT compiler preserves the type > information necessary to use the lambdas feature type-safely. > See the documentation for more information about how to compile jobs > containing lambda expressions. > at > > org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:779) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:765) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:135) > at > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:78) > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > at eu.euranova.flink.Axa.main(Axa.java:62) > > My very simple code is the following: > > File directory = new File( > "PATH TO A DIRECTORY WITH CSV FILES"); > DataSet set = env.fromElements(new Tuple3(0, 0.0, 0.0)); > for (File file : directory.listFiles()) { > int pathID = 0; > String filePath = "file://" + file.getAbsolutePath(); > DataSet set2 = > > env.readCsvFile(filePath).ignoreFirstLine().includeFields("11").types(Double.class,Double.class); > DataSet set3 = set2.map(tuple -> new Tuple3(pathID, tuple.f0, > tuple.f1)); > set = set.union(set3); > } > > I followed the steps in the Java 8 documentation section > ( > http://flink.apache.org/docs/0.8/java8_programming_guide.html#compiler-limitations > ) > and have applied the following to the pom.xml file created using the > flink archetype: > - Modified java 1.6 reference to 1.8 > - Uncommented the section related to Java 8 lambdas > - Installed Eclipse Java developer tools (JDT) > - Installed m2e-jdt connector > > The pom.xml does not show any error and builds fine. > > Am I missing something? Do I need to explicity set up Eclipse JDT? The > only installed environment shown in my preferences is the > /usr/java/jdk-1.8.0_31 from oracle. > > Thanks and best regards, > > Tran Nam-Luc >