[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection
[ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-12114: --- Labels: auto-deprioritized-major (was: stale-major) > Try to perform UDF By Reflection > > > Key: FLINK-12114 > URL: https://issues.apache.org/jira/browse/FLINK-12114 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.6.5 >Reporter: dong >Priority: Major > Labels: auto-deprioritized-major > > Hi Team: > I recently worked on a Flink SQL-based project and then one of the > requirements was to dynamically execute the Flink SQL UDF by loading a > user-defined UDF. I first used cglib to dynamically add the eval method to > the ScalarFunction and then dynamically create the ScalarFunction instance. > And call the user-defined UDF by reflection on the line. > this my code > {code:java} > package com.ximalaya.flink.dsl.stream.udf > import java.lang.reflect.Method > import com.ximalaya.flink.dsl.stream.`type`.FieldType > import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, > UserUdfContext} > import net.sf.cglib.core.Signature > import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, > MethodProxy} > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils > import net.sf.cglib.asm.Type > import scala.collection.mutable.\{Map ⇒ IMap} > /** > * > * @author martin.dong > * > **/ > private class UserUdfFunction extends ScalarFunction{ > override def isDeterministic: Boolean = false > } > private class UdfMethodInterceptor(val name:String, > val fullyQualifiedClassName:String) extends MethodInterceptor with > Serializable { > private var userUdf: AbstractUserUdf = _ > private var evalMethods:IMap[MethodSignature,Method]=IMap() > private var closeMethod:Method = _ > private var openMethod:Method = _ > override def intercept(o: scala.Any, > method: Method, > objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { > val methodName=method.getName > methodName match { > case "open"⇒ > this.userUdf = > Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] > this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). > foreach(method ⇒ > evalMethods.put(MethodSignature.createMethodSignature(method), method)) > this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") > this.openMethod = > classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) > openMethod.invoke(userUdf,null) > case "eval"⇒ > val methodSignature = MethodSignature.createMethodSignature(method) > evalMethods(methodSignature).invoke(userUdf,objects:_*) > case "close"⇒ > closeMethod.invoke(userUdf) > case _⇒ > methodProxy.invokeSuper(o,objects) > } > } > } > private class MethodSignature (val fieldTypes:Array[FieldType]){ > def this(clazzArray:Array[Class[_]]){ > this(clazzArray.map(clazz⇒FieldType.get(clazz))) > } > override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum > override def equals(obj: scala.Any): Boolean = { > if(this.eq(obj.asInstanceOf[AnyRef])){ > return true > } > obj match { > case _: MethodSignature⇒ > ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) > case _ ⇒ false > } > } > override def toString: String =fieldTypes.map(_.toString).mkString(",") > } > private object MethodSignature{ > def createMethodSignature(method:Method):MethodSignature={ > new MethodSignature(method.getParameterTypes) > } > } > case class > EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) > object UserUdfFactory { > def > createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ > val enhancer = new Enhancer > enhancer.setSuperclass(classOf[UserUdfFunction]) > enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) > enhancer.setInterfaces(evalMethods.map(method⇒{ > val returnType=Type.getType(method.returnType.getClazz) > val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) > (new Signature("eval",returnType,parameters),method.exceptions) > }).map{ case(signature,exceptions)⇒ > val im = new InterfaceMaker > im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) > im.create() > }.toArray) > enhancer.create().asInstanceOf[ScalarFunction] > } > } > {code} > Can be executed in local mode but cannot be executed in yarn mode, the > following error will occur > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: > Cannot determine simple type name "com" > at >
[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection
[ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-12114: --- Priority: Minor (was: Major) > Try to perform UDF By Reflection > > > Key: FLINK-12114 > URL: https://issues.apache.org/jira/browse/FLINK-12114 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.6.5 >Reporter: dong >Priority: Minor > Labels: auto-deprioritized-major > > Hi Team: > I recently worked on a Flink SQL-based project and then one of the > requirements was to dynamically execute the Flink SQL UDF by loading a > user-defined UDF. I first used cglib to dynamically add the eval method to > the ScalarFunction and then dynamically create the ScalarFunction instance. > And call the user-defined UDF by reflection on the line. > this my code > {code:java} > package com.ximalaya.flink.dsl.stream.udf > import java.lang.reflect.Method > import com.ximalaya.flink.dsl.stream.`type`.FieldType > import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, > UserUdfContext} > import net.sf.cglib.core.Signature > import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, > MethodProxy} > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils > import net.sf.cglib.asm.Type > import scala.collection.mutable.\{Map ⇒ IMap} > /** > * > * @author martin.dong > * > **/ > private class UserUdfFunction extends ScalarFunction{ > override def isDeterministic: Boolean = false > } > private class UdfMethodInterceptor(val name:String, > val fullyQualifiedClassName:String) extends MethodInterceptor with > Serializable { > private var userUdf: AbstractUserUdf = _ > private var evalMethods:IMap[MethodSignature,Method]=IMap() > private var closeMethod:Method = _ > private var openMethod:Method = _ > override def intercept(o: scala.Any, > method: Method, > objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { > val methodName=method.getName > methodName match { > case "open"⇒ > this.userUdf = > Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] > this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). > foreach(method ⇒ > evalMethods.put(MethodSignature.createMethodSignature(method), method)) > this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") > this.openMethod = > classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) > openMethod.invoke(userUdf,null) > case "eval"⇒ > val methodSignature = MethodSignature.createMethodSignature(method) > evalMethods(methodSignature).invoke(userUdf,objects:_*) > case "close"⇒ > closeMethod.invoke(userUdf) > case _⇒ > methodProxy.invokeSuper(o,objects) > } > } > } > private class MethodSignature (val fieldTypes:Array[FieldType]){ > def this(clazzArray:Array[Class[_]]){ > this(clazzArray.map(clazz⇒FieldType.get(clazz))) > } > override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum > override def equals(obj: scala.Any): Boolean = { > if(this.eq(obj.asInstanceOf[AnyRef])){ > return true > } > obj match { > case _: MethodSignature⇒ > ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) > case _ ⇒ false > } > } > override def toString: String =fieldTypes.map(_.toString).mkString(",") > } > private object MethodSignature{ > def createMethodSignature(method:Method):MethodSignature={ > new MethodSignature(method.getParameterTypes) > } > } > case class > EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) > object UserUdfFactory { > def > createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ > val enhancer = new Enhancer > enhancer.setSuperclass(classOf[UserUdfFunction]) > enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) > enhancer.setInterfaces(evalMethods.map(method⇒{ > val returnType=Type.getType(method.returnType.getClazz) > val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) > (new Signature("eval",returnType,parameters),method.exceptions) > }).map{ case(signature,exceptions)⇒ > val im = new InterfaceMaker > im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) > im.create() > }.toArray) > enhancer.create().asInstanceOf[ScalarFunction] > } > } > {code} > Can be executed in local mode but cannot be executed in yarn mode, the > following error will occur > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: > Cannot determine simple type name "com" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) >
[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection
[ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flink Jira Bot updated FLINK-12114: --- Labels: stale-major (was: ) > Try to perform UDF By Reflection > > > Key: FLINK-12114 > URL: https://issues.apache.org/jira/browse/FLINK-12114 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.6.5 >Reporter: dong >Priority: Major > Labels: stale-major > > Hi Team: > I recently worked on a Flink SQL-based project and then one of the > requirements was to dynamically execute the Flink SQL UDF by loading a > user-defined UDF. I first used cglib to dynamically add the eval method to > the ScalarFunction and then dynamically create the ScalarFunction instance. > And call the user-defined UDF by reflection on the line. > this my code > {code:java} > package com.ximalaya.flink.dsl.stream.udf > import java.lang.reflect.Method > import com.ximalaya.flink.dsl.stream.`type`.FieldType > import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, > UserUdfContext} > import net.sf.cglib.core.Signature > import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, > MethodProxy} > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils > import net.sf.cglib.asm.Type > import scala.collection.mutable.\{Map ⇒ IMap} > /** > * > * @author martin.dong > * > **/ > private class UserUdfFunction extends ScalarFunction{ > override def isDeterministic: Boolean = false > } > private class UdfMethodInterceptor(val name:String, > val fullyQualifiedClassName:String) extends MethodInterceptor with > Serializable { > private var userUdf: AbstractUserUdf = _ > private var evalMethods:IMap[MethodSignature,Method]=IMap() > private var closeMethod:Method = _ > private var openMethod:Method = _ > override def intercept(o: scala.Any, > method: Method, > objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { > val methodName=method.getName > methodName match { > case "open"⇒ > this.userUdf = > Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] > this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). > foreach(method ⇒ > evalMethods.put(MethodSignature.createMethodSignature(method), method)) > this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") > this.openMethod = > classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) > openMethod.invoke(userUdf,null) > case "eval"⇒ > val methodSignature = MethodSignature.createMethodSignature(method) > evalMethods(methodSignature).invoke(userUdf,objects:_*) > case "close"⇒ > closeMethod.invoke(userUdf) > case _⇒ > methodProxy.invokeSuper(o,objects) > } > } > } > private class MethodSignature (val fieldTypes:Array[FieldType]){ > def this(clazzArray:Array[Class[_]]){ > this(clazzArray.map(clazz⇒FieldType.get(clazz))) > } > override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum > override def equals(obj: scala.Any): Boolean = { > if(this.eq(obj.asInstanceOf[AnyRef])){ > return true > } > obj match { > case _: MethodSignature⇒ > ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) > case _ ⇒ false > } > } > override def toString: String =fieldTypes.map(_.toString).mkString(",") > } > private object MethodSignature{ > def createMethodSignature(method:Method):MethodSignature={ > new MethodSignature(method.getParameterTypes) > } > } > case class > EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) > object UserUdfFactory { > def > createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ > val enhancer = new Enhancer > enhancer.setSuperclass(classOf[UserUdfFunction]) > enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) > enhancer.setInterfaces(evalMethods.map(method⇒{ > val returnType=Type.getType(method.returnType.getClazz) > val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) > (new Signature("eval",returnType,parameters),method.exceptions) > }).map{ case(signature,exceptions)⇒ > val im = new InterfaceMaker > im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) > im.create() > }.toArray) > enhancer.create().asInstanceOf[ScalarFunction] > } > } > {code} > Can be executed in local mode but cannot be executed in yarn mode, the > following error will occur > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: > Cannot determine simple type name "com" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at >
[jira] [Updated] (FLINK-12114) Try to perform UDF By Reflection
[ https://issues.apache.org/jira/browse/FLINK-12114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dong updated FLINK-12114: - Affects Version/s: (was: 1.7.2) 1.6.5 > Try to perform UDF By Reflection > > > Key: FLINK-12114 > URL: https://issues.apache.org/jira/browse/FLINK-12114 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.6.5 >Reporter: dong >Priority: Major > > Hi Team: > I recently worked on a Flink SQL-based project and then one of the > requirements was to dynamically execute the Flink SQL UDF by loading a > user-defined UDF. I first used cglib to dynamically add the eval method to > the ScalarFunction and then dynamically create the ScalarFunction instance. > And call the user-defined UDF by reflection on the line. > this my code > {code:java} > package com.ximalaya.flink.dsl.stream.udf > import java.lang.reflect.Method > import com.ximalaya.flink.dsl.stream.`type`.FieldType > import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, > UserUdfContext} > import net.sf.cglib.core.Signature > import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, > MethodProxy} > import org.apache.flink.table.functions.ScalarFunction > import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils > import net.sf.cglib.asm.Type > import scala.collection.mutable.\{Map ⇒ IMap} > /** > * > * @author martin.dong > * > **/ > private class UserUdfFunction extends ScalarFunction{ > override def isDeterministic: Boolean = false > } > private class UdfMethodInterceptor(val name:String, > val fullyQualifiedClassName:String) extends MethodInterceptor with > Serializable { > private var userUdf: AbstractUserUdf = _ > private var evalMethods:IMap[MethodSignature,Method]=IMap() > private var closeMethod:Method = _ > private var openMethod:Method = _ > override def intercept(o: scala.Any, > method: Method, > objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { > val methodName=method.getName > methodName match { > case "open"⇒ > this.userUdf = > Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] > this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). > foreach(method ⇒ > evalMethods.put(MethodSignature.createMethodSignature(method), method)) > this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") > this.openMethod = > classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) > openMethod.invoke(userUdf,null) > case "eval"⇒ > val methodSignature = MethodSignature.createMethodSignature(method) > evalMethods(methodSignature).invoke(userUdf,objects:_*) > case "close"⇒ > closeMethod.invoke(userUdf) > case _⇒ > methodProxy.invokeSuper(o,objects) > } > } > } > private class MethodSignature (val fieldTypes:Array[FieldType]){ > def this(clazzArray:Array[Class[_]]){ > this(clazzArray.map(clazz⇒FieldType.get(clazz))) > } > override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum > override def equals(obj: scala.Any): Boolean = { > if(this.eq(obj.asInstanceOf[AnyRef])){ > return true > } > obj match { > case _: MethodSignature⇒ > ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) > case _ ⇒ false > } > } > override def toString: String =fieldTypes.map(_.toString).mkString(",") > } > private object MethodSignature{ > def createMethodSignature(method:Method):MethodSignature={ > new MethodSignature(method.getParameterTypes) > } > } > case class > EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) > object UserUdfFactory { > def > createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ > val enhancer = new Enhancer > enhancer.setSuperclass(classOf[UserUdfFunction]) > enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) > enhancer.setInterfaces(evalMethods.map(method⇒{ > val returnType=Type.getType(method.returnType.getClazz) > val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) > (new Signature("eval",returnType,parameters),method.exceptions) > }).map{ case(signature,exceptions)⇒ > val im = new InterfaceMaker > im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) > im.create() > }.toArray) > enhancer.create().asInstanceOf[ScalarFunction] > } > } > {code} > Can be executed in local mode but cannot be executed in yarn mode, the > following error will occur > {code:java} > Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: > Cannot determine simple type name "com" > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) > at >