[ 
https://issues.apache.org/jira/browse/FLINK-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14135341#comment-14135341
 ] 

ASF GitHub Bot commented on FLINK-1062:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/113#discussion_r17596867
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
 ---
    @@ -57,31 +57,71 @@ public static RuntimeContext getFunctionRuntimeContext 
(Function function, Runti
                        return defaultContext;
                }
        }
    +   
    +   public static Method checkAndExtractLambdaMethod(Function function) {
    +           try {
    +                   // get serialized lambda
    +                   Object serializedLambda = null;
    +                   for (Class<?> clazz = function.getClass(); clazz != 
null; clazz = clazz.getSuperclass()) {
    +                           try {
    +                                   Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
    +                                   replaceMethod.setAccessible(true);
    +                                   Object serialVersion = 
replaceMethod.invoke(function);
     
    -   public static boolean isLambdaFunction(Function function) {
    -           if (function == null) {
    -                   throw new IllegalArgumentException();
    -           }
    -           
    -           for (Class<?> clazz = function.getClass(); clazz != null; clazz 
= clazz.getSuperclass()) {
    -                   try {
    -                           Method replaceMethod = 
clazz.getDeclaredMethod("writeReplace");
    -                           replaceMethod.setAccessible(true);
    -                           Object serialVersion = 
replaceMethod.invoke(function);
    -                           
    -                           if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
    -                                   return true;
    +                                   // check if class is a lambda function
    +                                   if 
(serialVersion.getClass().getName().equals("java.lang.invoke.SerializedLambda"))
 {
    +
    +                                           // check if SerializedLambda 
class is present
    +                                           try {
    +                                                   
Class.forName("java.lang.invoke.SerializedLambda");
    +                                           }
    +                                           catch (Exception e) {
    +                                                   throw new 
UnsupportedOperationException("User code tries to use lambdas, but framework is 
running with a Java version < 8");
    +                                           }
    +                                           serializedLambda = 
serialVersion;
    +                                           break;
    +                                   }
    +                           }
    +                           catch (NoSuchMethodException e) {
    +                                   // thrown if the method is not there. 
fall through the loop
                                }
                        }
    -                   catch (NoSuchMethodException e) {
    -                           // thrown if the method is not there. fall 
through the loop
    +
    +                   // not a lambda method -> return null
    +                   if (serializedLambda == null) {
    +                           return null;
                        }
    -                   catch (Throwable t) {
    -                           // this should not happen, we are not executing 
any method code.
    -                           throw new RuntimeException("Error while 
checking whether function is a lambda.", t);
    +
    +                   // find lambda method
    +                   Method implClassMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplClass");
    +                   Method implMethodNameMethod = 
serializedLambda.getClass().getDeclaredMethod("getImplMethodName");
    +
    +                   String className = (String) 
implClassMethod.invoke(serializedLambda);
    +                   String methodName = (String) 
implMethodNameMethod.invoke(serializedLambda);
    +
    +                   Class<?> implClass = 
Class.forName(className.replace('/', '.'));
    +
    +                   Method[] methods = implClass.getDeclaredMethods();
    +                   Method parameterizedMethod = null;
    +                   for(Method method : methods) {
    +                           if(method.getName().equals(methodName)) {
    +                                   if(parameterizedMethod != null) {
    +                                           // It is very unlikely that a 
class contains multiple e.g. "lambda$2()" but its possible
    +                                           // Actually, the signature need 
to be checked, but this is very complex
    +                                           throw new Exception("Lambda 
method name is not unique.");
    --- End diff --
    
    Good catch!


> Type Extraction for Lambdas
> ---------------------------
>
>                 Key: FLINK-1062
>                 URL: https://issues.apache.org/jira/browse/FLINK-1062
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API
>    Affects Versions: 0.7-incubating
>            Reporter: Stephan Ewen
>            Assignee: Timo Walther
>             Fix For: 0.7-incubating
>
>
> Lambdas currently work only for {{filter}} and {{reduce(a,b)}}, because 
> Lambda type extraction is not in place right now.
> We need to extend the type extraction for lambdas to support the other 
> functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to