Repository: flink Updated Branches: refs/heads/master 6731ec1e4 -> 1dda3ad00
[FLINK-4793] [types] Improve lambda constructor reference handling This closes #2621. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dda3ad0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dda3ad0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dda3ad0 Branch: refs/heads/master Commit: 1dda3ad009667697a620359e997e83a5ba2447dd Parents: 6731ec1 Author: twalthr <twal...@apache.org> Authored: Tue Oct 11 15:33:20 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Wed Oct 12 11:02:45 2016 +0200 ---------------------------------------------------------------------- flink-core/pom.xml | 7 + .../common/functions/util/FunctionUtils.java | 69 -------- .../java/typeutils/TypeExtractionException.java | 57 +++++++ .../api/java/typeutils/TypeExtractionUtils.java | 167 +++++++++++++++++++ .../flink/api/java/typeutils/TypeExtractor.java | 80 ++++----- .../java/type/lambdas/LambdaExtractionTest.java | 27 +-- .../javaApiOperators/lambdas/MapITCase.java | 20 ++- 7 files changed, 306 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 865a253..cfa2cbb 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -80,6 +80,13 @@ under the License. </exclusions> </dependency> + <!-- ASM is needed for type extraction --> + <dependency> + <groupId>org.ow2.asm</groupId> + <artifactId>asm-all</artifactId> + <version>${asm.version}</version> + </dependency> + <!-- test dependencies --> <dependency> <groupId>org.apache.flink</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java index ffd885b..2bb1cb3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java @@ -18,8 +18,6 @@ package org.apache.flink.api.common.functions.util; -import java.lang.reflect.Method; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RichFunction; @@ -62,73 +60,6 @@ public final class FunctionUtils { return defaultContext; } } - - public static Method checkAndExtractLambdaMethod(Function function) { - try { - // get serialized lambda - Object serializedLambda = null; - for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) { - try { - Method replaceMethod = clazz.getDeclaredMethod("writeReplace"); - replaceMethod.setAccessible(true); - Object serialVersion = replaceMethod.invoke(function); - - // check if class is a lambda function - if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) { - - // check if SerializedLambda class is present - try { - Class.forName("java.lang.invoke.SerializedLambda"); - } - catch (Exception e) { - throw new UnsupportedOperationException("User code tries to use lambdas, but framework is running with a Java version < 8"); - } - serializedLambda = serialVersion; - break; - } - } - catch (NoSuchMethodException e) { - // thrown if the method is not there. fall through the loop - } - } - - // not a lambda method -> return null - if (serializedLambda == null) { - return null; - } - - // find lambda method - Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass"); - Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName"); - - String className = (String) implClassMethod.invoke(serializedLambda); - String methodName = (String) implMethodNameMethod.invoke(serializedLambda); - - Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader()); - - Method[] methods = implClass.getDeclaredMethods(); - Method parameterizedMethod = null; - for (Method method : methods) { - if(method.getName().equals(methodName)) { - if(parameterizedMethod != null) { - // It is very unlikely that a class contains multiple e.g. "lambda$2()" but its possible - // Actually, the signature need to be checked, but this is very complex - throw new Exception("Lambda method name is not unique."); - } - else { - parameterizedMethod = method; - } - } - } - if (parameterizedMethod == null) { - throw new Exception("No lambda method found."); - } - return parameterizedMethod; - } - catch (Exception e) { - throw new RuntimeException("Could not extract lambda method out of function: " + e.getClass().getSimpleName() + " - " + e.getMessage(), e); - } - } /** * Private constructor to prevent instantiation. http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java new file mode 100644 index 0000000..0dad55d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionException.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.annotation.Internal; + +/** + * Type extraction always contains some uncertainty due to unpredictable JVM differences + * between vendors or versions. This exception is thrown if an assumption failed during extraction. + */ +@Internal +public class TypeExtractionException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Creates a new exception with no message. + */ + public TypeExtractionException() { + super(); + } + + /** + * Creates a new exception with the given message. + * + * @param message The exception message. + */ + public TypeExtractionException(String message) { + super(message); + } + + /** + * Creates a new exception with the given message and cause. + * + * @param message The exception message. + * @param e cause + */ + public TypeExtractionException(String message, Throwable e) { + super(message, e); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java new file mode 100644 index 0000000..4439612 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.functions.Function; +import static org.objectweb.asm.Type.getConstructorDescriptor; +import static org.objectweb.asm.Type.getMethodDescriptor; + +@Internal +public class TypeExtractionUtils { + + private TypeExtractionUtils() { + // do not allow instantiation + } + + /** + * Similar to a Java 8 Executable but with a return type. + */ + public static class LambdaExecutable { + + private Type[] parameterTypes; + private Type returnType; + private String name; + private Object executable; + + public LambdaExecutable(Constructor<?> constructor) { + this.parameterTypes = constructor.getGenericParameterTypes(); + this.returnType = constructor.getDeclaringClass(); + this.name = constructor.getName(); + this.executable = constructor; + } + + public LambdaExecutable(Method method) { + this.parameterTypes = method.getGenericParameterTypes(); + this.returnType = method.getGenericReturnType(); + this.name = method.getName(); + this.executable = method; + } + + public Type[] getParameterTypes() { + return parameterTypes; + } + + public Type getReturnType() { + return returnType; + } + + public String getName() { + return name; + } + + public boolean executablesEquals(Method m) { + return executable.equals(m); + } + + public boolean executablesEquals(Constructor<?> c) { + return executable.equals(c); + } + } + + public static LambdaExecutable checkAndExtractLambda(Function function) throws TypeExtractionException { + try { + // get serialized lambda + Object serializedLambda = null; + for (Class<?> clazz = function.getClass(); clazz != null; clazz = clazz.getSuperclass()) { + try { + Method replaceMethod = clazz.getDeclaredMethod("writeReplace"); + replaceMethod.setAccessible(true); + Object serialVersion = replaceMethod.invoke(function); + + // check if class is a lambda function + if (serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda")) { + + // check if SerializedLambda class is present + try { + Class.forName("java.lang.invoke.SerializedLambda"); + } + catch (Exception e) { + throw new TypeExtractionException("User code tries to use lambdas, but framework is running with a Java version < 8"); + } + serializedLambda = serialVersion; + break; + } + } + catch (NoSuchMethodException e) { + // thrown if the method is not there. fall through the loop + } + } + + // not a lambda method -> return null + if (serializedLambda == null) { + return null; + } + + // find lambda method + Method implClassMethod = serializedLambda.getClass().getDeclaredMethod("getImplClass"); + Method implMethodNameMethod = serializedLambda.getClass().getDeclaredMethod("getImplMethodName"); + Method implMethodSig = serializedLambda.getClass().getDeclaredMethod("getImplMethodSignature"); + + String className = (String) implClassMethod.invoke(serializedLambda); + String methodName = (String) implMethodNameMethod.invoke(serializedLambda); + String methodSig = (String) implMethodSig.invoke(serializedLambda); + + Class<?> implClass = Class.forName(className.replace('/', '.'), true, Thread.currentThread().getContextClassLoader()); + + // find constructor + if (methodName.equals("<init>")) { + Constructor<?>[] constructors = implClass.getDeclaredConstructors(); + for (Constructor<?> constructor : constructors) { + if(getConstructorDescriptor(constructor).equals(methodSig)) { + return new LambdaExecutable(constructor); + } + } + } + // find method + else { + List<Method> methods = getAllDeclaredMethods(implClass); + for (Method method : methods) { + if(method.getName().equals(methodName) && getMethodDescriptor(method).equals(methodSig)) { + return new LambdaExecutable(method); + } + } + } + throw new TypeExtractionException("No lambda method found."); + } + catch (Exception e) { + throw new TypeExtractionException("Could not extract lambda method out of function: " + + e.getClass().getSimpleName() + " - " + e.getMessage(), e); + } + } + + /** + * Returns all declared methods of a class including methods of superclasses. + */ + public static List<Method> getAllDeclaredMethods(Class<?> clazz) { + List<Method> result = new ArrayList<>(); + while (clazz != null) { + Method[] methods = clazz.getDeclaredMethods(); + Collections.addAll(result, methods); + clazz = clazz.getSuperclass(); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index a0b09f5..c1febea 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -53,7 +53,6 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.Partitioner; -import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -66,6 +65,9 @@ import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods; import org.apache.flink.types.Either; import org.apache.flink.types.Value; import org.apache.flink.util.InstantiationUtil; @@ -380,22 +382,27 @@ public class TypeExtractor { String functionName, boolean allowMissing) { try { - final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); - if (m != null) { + final LambdaExecutable exec; + try { + exec = checkAndExtractLambda(function); + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Internal error occurred.", e); + } + if (exec != null) { // check for lambda type erasure - validateLambdaGenericParameters(m); + validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = m.getGenericParameterTypes().length - 1; + final int paramLen = exec.getParameterTypes().length - 1; - // method references "this" implicitly + // executable references "this" implicitly if (paramLen < 0) { - // methods declaring class can also be a super class of the input type - // we only validate if the method exists in input type - validateInputContainsMethod(m, inType); + // executable declaring class can also be a super class of the input type + // we only validate if the executable exists in input type + validateInputContainsExecutable(exec, inType); } else { - final Type input = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + final Type input = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input, inputTypeArgumentIndex) : input, inType); } @@ -403,7 +410,7 @@ public class TypeExtractor { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), + (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), inType, null); } @@ -496,22 +503,27 @@ public class TypeExtractor { String functionName, boolean allowMissing) { try { - final Method m = FunctionUtils.checkAndExtractLambdaMethod(function); - if (m != null) { + final LambdaExecutable exec; + try { + exec = checkAndExtractLambda(function); + } catch (TypeExtractionException e) { + throw new InvalidTypesException("Internal error occurred.", e); + } + if (exec != null) { // check for lambda type erasure - validateLambdaGenericParameters(m); + validateLambdaGenericParameters(exec); // parameters must be accessed from behind, since JVM can add additional parameters e.g. when using local variables inside lambda function - final int paramLen = m.getGenericParameterTypes().length - 1; - final Type input1 = (outputTypeArgumentIndex >= 0) ? m.getGenericParameterTypes()[paramLen - 2] : m.getGenericParameterTypes()[paramLen - 1]; - final Type input2 = (outputTypeArgumentIndex >= 0 ) ? m.getGenericParameterTypes()[paramLen - 1] : m.getGenericParameterTypes()[paramLen]; + final int paramLen = exec.getParameterTypes().length - 1; + final Type input1 = (outputTypeArgumentIndex >= 0) ? exec.getParameterTypes()[paramLen - 2] : exec.getParameterTypes()[paramLen - 1]; + final Type input2 = (outputTypeArgumentIndex >= 0 ) ? exec.getParameterTypes()[paramLen - 1] : exec.getParameterTypes()[paramLen]; validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input1, inputTypeArgumentIndex) : input1, in1Type); validateInputType((inputTypeArgumentIndex >= 0) ? extractTypeArgument(input2, inputTypeArgumentIndex) : input2, in2Type); if(function instanceof ResultTypeQueryable) { return ((ResultTypeQueryable<OUT>) function).getProducedType(); } return new TypeExtractor().privateCreateTypeInfo( - (outputTypeArgumentIndex >= 0) ? extractTypeArgument(m.getGenericParameterTypes()[paramLen], outputTypeArgumentIndex) : m.getGenericReturnType(), + (outputTypeArgumentIndex >= 0) ? extractTypeArgument(exec.getParameterTypes()[paramLen], outputTypeArgumentIndex) : exec.getReturnType(), in1Type, in2Type); } @@ -1358,14 +1370,20 @@ public class TypeExtractor { } } - private static void validateInputContainsMethod(Method m, TypeInformation<?> typeInfo) { + private static void validateInputContainsExecutable(LambdaExecutable exec, TypeInformation<?> typeInfo) { List<Method> methods = getAllDeclaredMethods(typeInfo.getTypeClass()); for (Method method : methods) { - if (method.equals(m)) { + if (exec.executablesEquals(method)) { return; } } - throw new InvalidTypesException("Type contains no method '" + m.getName() + "'."); + Constructor<?>[] constructors = typeInfo.getTypeClass().getDeclaredConstructors(); + for (Constructor<?> constructor : constructors) { + if (exec.executablesEquals(constructor)) { + return; + } + } + throw new InvalidTypesException("Type contains no executable '" + exec.getName() + "'."); } // -------------------------------------------------------------------------------------------- @@ -1488,14 +1506,14 @@ public class TypeExtractor { } } - private static void validateLambdaGenericParameters(Method m) { + private static void validateLambdaGenericParameters(LambdaExecutable exec) { // check the arguments - for (Type t : m.getGenericParameterTypes()) { + for (Type t : exec.getParameterTypes()) { validateLambdaGenericParameter(t); } // check the return type - validateLambdaGenericParameter(m.getGenericReturnType()); + validateLambdaGenericParameter(exec.getReturnType()); } private static void validateLambdaGenericParameter(Type t) { @@ -1974,20 +1992,6 @@ public class TypeExtractor { return false; } - - // recursively determine all declared methods - private static List<Method> getAllDeclaredMethods(Class<?> clazz) { - List<Method> result = new ArrayList<Method>(); - while (clazz != null) { - Method[] methods = clazz.getDeclaredMethods(); - for (Method method : methods) { - result.add(method); - } - clazz = clazz.getSuperclass(); - } - return result; - } - @Internal public static Class<?> typeToClass(Type t) { if (t instanceof Class) { http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java index 64b7ae7..0d7415a 100644 --- a/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java +++ b/flink-java8/src/test/java/org/apache/flink/api/java/type/lambdas/LambdaExtractionTest.java @@ -18,6 +18,8 @@ package org.apache.flink.api.java.type.lambdas; +import org.apache.flink.api.java.typeutils.TypeExtractionUtils; +import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; @@ -33,7 +35,6 @@ import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.MapPartitionFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; @@ -74,14 +75,14 @@ public class LambdaExtractionTest { MapFunction<Integer, String> instanceLambda = Object::toString; MapFunction<String, Integer> constructorLambda = Integer::new; - assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromInterface)); - assertNull(FunctionUtils.checkAndExtractLambdaMethod(anonymousFromClass)); - assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromProperClass)); - assertNull(FunctionUtils.checkAndExtractLambdaMethod(fromDerived)); - assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(staticLambda)); - assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(instanceLambda)); - assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(constructorLambda)); - assertNotNull(FunctionUtils.checkAndExtractLambdaMethod(STATIC_LAMBDA)); + assertNull(checkAndExtractLambda(anonymousFromInterface)); + assertNull(checkAndExtractLambda(anonymousFromClass)); + assertNull(checkAndExtractLambda(fromProperClass)); + assertNull(checkAndExtractLambda(fromDerived)); + assertNotNull(checkAndExtractLambda(staticLambda)); + assertNotNull(checkAndExtractLambda(instanceLambda)); + assertNotNull(checkAndExtractLambda(constructorLambda)); + assertNotNull(checkAndExtractLambda(STATIC_LAMBDA)); } catch (Exception e) { e.printStackTrace(); @@ -272,14 +273,14 @@ public class LambdaExtractionTest { public void testInstanceMethodRefSameType() { MapFunction<MyType, Integer> f = MyType::getKey; TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MyType.class)); - Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); } @Test public void testInstanceMethodRefSuperType() { MapFunction<Integer, String> f = Object::toString; TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.INT_TYPE_INFO); - Assert.assertEquals(ti, BasicTypeInfo.STRING_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti); } public static class MySubtype extends MyType { @@ -290,14 +291,14 @@ public class LambdaExtractionTest { public void testInstanceMethodRefSuperTypeProtected() { MapFunction<MySubtype, Integer> f = MyType::getKey2; TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, TypeExtractor.createTypeInfo(MySubtype.class)); - Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); } @Test public void testConstructorMethodRef() { MapFunction<String, Integer> f = Integer::new; TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(f, BasicTypeInfo.STRING_TYPE_INFO); - Assert.assertEquals(ti, BasicTypeInfo.INT_TYPE_INFO); + Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1dda3ad0/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java ---------------------------------------------------------------------- diff --git a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java index cda1f1c..87c1fa5 100644 --- a/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java +++ b/flink-java8/src/test/java/org/apache/flink/test/javaApiOperators/lambdas/MapITCase.java @@ -24,6 +24,20 @@ import org.apache.flink.test.util.JavaProgramTestBase; public class MapITCase extends JavaProgramTestBase { + public static class Trade { + + public String v; + + public Trade(String v) { + this.v = v; + } + + @Override + public String toString() { + return v; + } + } + private static final String EXPECTED_RESULT = "22\n" + "22\n" + "23\n" + @@ -41,7 +55,11 @@ public class MapITCase extends JavaProgramTestBase { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> stringDs = env.fromElements(11, 12, 13, 14); - DataSet<String> mappedDs = stringDs.map(Object::toString).map (s -> s.replace("1", "2")); + DataSet<String> mappedDs = stringDs + .map(Object::toString) + .map (s -> s.replace("1", "2")) + .map(Trade::new) + .map(Trade::toString); mappedDs.writeAsText(resultPath); env.execute(); }