dong created FLINK-12114:
----------------------------

             Summary: Try to perform UDF By Reflection
                 Key: FLINK-12114
                 URL: https://issues.apache.org/jira/browse/FLINK-12114
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / API
    Affects Versions: 1.7.2
            Reporter: dong


Hi Team:

     I recently worked on a Flink SQL-based project and then one of the 
requirements was to dynamically execute the Flink SQL UDF by loading a 
user-defined UDF. I first used cglib to dynamically add the eval method to the 
ScalarFunction and then dynamically create the ScalarFunction instance. And 
call the user-defined UDF by reflection on the line.

this my code
{code:java}
package com.ximalaya.flink.dsl.stream.udf

import java.lang.reflect.Method

import com.ximalaya.flink.dsl.stream.`type`.FieldType
import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, UserUdfContext}
import net.sf.cglib.core.Signature
import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, 
MethodProxy}
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils
import net.sf.cglib.asm.Type

import scala.collection.mutable.\{Map ⇒ IMap}
/**
 *
 * @author martin.dong
 *
 **/
private class UserUdfFunction extends ScalarFunction{
 override def isDeterministic: Boolean = false
}
private class UdfMethodInterceptor(val name:String,
 val fullyQualifiedClassName:String) extends MethodInterceptor with 
Serializable {

 private var userUdf: AbstractUserUdf = _
 private var evalMethods:IMap[MethodSignature,Method]=IMap()
 private var closeMethod:Method = _
 private var openMethod:Method = _

 override def intercept(o: scala.Any,
 method: Method,
 objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = {
 val methodName=method.getName
 methodName match {
 case "open"⇒
 this.userUdf = 
Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf]
 this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval").
 foreach(method ⇒ 
evalMethods.put(MethodSignature.createMethodSignature(method), method))
 this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close")
 this.openMethod = 
classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext])
 openMethod.invoke(userUdf,null)
 case "eval"⇒
 val methodSignature = MethodSignature.createMethodSignature(method)
 evalMethods(methodSignature).invoke(userUdf,objects:_*)
 case "close"⇒
 closeMethod.invoke(userUdf)
 case _⇒
 methodProxy.invokeSuper(o,objects)
 }
 }
}

private class MethodSignature (val fieldTypes:Array[FieldType]){
 def this(clazzArray:Array[Class[_]]){
 this(clazzArray.map(clazz⇒FieldType.get(clazz)))
 }

 override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum
 override def equals(obj: scala.Any): Boolean = {
 if(this.eq(obj.asInstanceOf[AnyRef])){
 return true
 }
 obj match {
 case _: MethodSignature⇒ 
ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes)
 case _ ⇒ false
 }
 }

 override def toString: String =fieldTypes.map(_.toString).mkString(",")
}
private object MethodSignature{
 def createMethodSignature(method:Method):MethodSignature={
 new MethodSignature(method.getParameterTypes)
 }
}
case class 
EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]])
object UserUdfFactory {
 def 
createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={
 val enhancer = new Enhancer
 enhancer.setSuperclass(classOf[UserUdfFunction])
 enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName))
 enhancer.setInterfaces(evalMethods.map(method⇒{
 val returnType=Type.getType(method.returnType.getClazz)
 val parameters=method.parameters.map(p⇒Type.getType(p.getClazz))
 (new Signature("eval",returnType,parameters),method.exceptions)
 }).map{ case(signature,exceptions)⇒
 val im = new InterfaceMaker
 im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray)
 im.create()
 }.toArray)
 enhancer.create().asInstanceOf[ScalarFunction]
 }
}
{code}
Can be executed in local mode but cannot be executed in yarn mode, the 
following error will occur
{code:java}
Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: 
Cannot determine simple type name "com"
        at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
        at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156)
        at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212)
        at 
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064)
        at 
org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059)
        at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754)
        at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059)
        at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052)
        at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753)
        at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
        at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212)
        at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456)
        at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212)
        at 
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082)
        at 
org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077)
        at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136)
        at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
        at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)
        at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
        at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
        at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
        at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
        at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438)
        at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212)
        at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080)
        at 
org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077)
        at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050)
        at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077)
        at 
org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073)
        at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974)
        at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073)
        at 
org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052)
        at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942)
{code}
anyone can help me?





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to