Hey, I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:
@Test public void testClassloader() throws IOException, ClassNotFoundException {
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
folder.newFolder(), "BoolToInt.java", "" + "import
org.apache.flink.table.functions.ScalarFunction;" + "\n" + "public class
BoolToInt extends ScalarFunction {\n" + "\tpublic int eval(boolean b) {\n" +
"\t\treturn b ? 1 : 0;\n" + "\t}\n" + "}" ); TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings .newInstance()
.useBlinkPlanner()
.build()); Class<ScalarFunction> boolToInt = (Class<ScalarFunction>)
functionClassloader.loadClass("BoolToInt"); try (TemporaryClassLoaderContext
ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
tEnv.createFunction("BoolToInt", boolToInt); TableResult tableResult =
tEnv.executeSql("SELECT BoolToInt(TRUE)"); tableResult.print(); }
}
I verified this runs on the current master. The ClassLoaderUtils is a
Flink utility which writes out the provided code and loads it into a
classloader. As far as I can tell it mimics your situation pretty well.
Best,
Dawid
On 10/12/2020 20:16, Jakub N wrote:
> Hi Dawid,
>
> According to your suggestion, given that a I spawn a LocalEnvironment,
> I tried the following:
>
> val root = new File("custom")
> val classLoader: URLClassLoader = new
> URLClassLoader(Array[URL](root.toURI.toURL),
> Thread.currentThread().getContextClassLoader)
> val cls = classLoader.loadClass("myFunction")
> val instance = cls.newInstance(); val udf =
> instance.asInstanceOf[ScalarFunction]
>
> val ignored = TemporaryClassLoaderContext.of(classLoader)
> try {
> fsTableEnv.createTemporaryFunction("myFunction", udf)
> fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
> }
> unfortunately this still results in a ClassNotFoundException when
> executing the environment. (The class is located outside of the
> classpath and is loaded succesfully, instances of it behave as expected)
> Did I possibly missunderstand what you were proposing?
>
> Kind regards,
>
> Jakub
>
>
>
> ------------------------------------------------------------------------
> *Von:* Dawid Wysakowicz
> *Gesendet:* Donnerstag, 10. Dezember 2020 09:59
> *Bis:* Guowei Ma; Jakub N
> *Cc:* [email protected]
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi Jakub,
>
> As Guowei said the UDF must be present in the user classloader. It
> must be there when compiling the program and when executing on the
> cluster. As of now the TableEnvironment uses the Thread context
> classloader as the "user classloader" when compiling the query.
> Therefore you can do the trick via:
>
> |ClassLoader yourClassloader = ... // create your classloader with the
> UDF|
>
> |try (TemporaryClassLoaderContext ignored =
> TemporaryClassLoaderContext.of(|||yourClassloader|)) {|
>
> | fsTableEnv.createTemporaryFunction("myFunction", udf)||
> ||
> || fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")|
>
> |}
> |
>
> Take a look at the TemporaryClassLoaderContext[1] for a nice way how
> to do it with a cleanup at the end.
>
> To solve the second problem of having the UDF on the classpath when
> executing. If you are just spawning a LocalEnvironment the above
> should do the trick as it will use the context classloader. If you are
> submitting to a cluster, you can submit multiple jars as part of a
> single job either via the RemoteEnvironment or the flink run command.
>
> That's how we submit UDFs from separate jars in the sql-client. You
> can try to go through a few classes there and see how it is done. I am
> afraid it's not the easiest task as there are quite a few classes to
> navigate through. You could start from e.g.
> org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java
>
> [2]
> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305
>
>
>
> On 10/12/2020 09:15, Guowei Ma wrote:
>> Hi, Jakub
>> If I understand correctly you want the job, which you submitted,
>> could load some table function which does not in the job jar.
>> I don't think Flink could support this natively.(Maybe other guys know).
>> But I think this requirement is like some code generated. You need to
>> submit the "code" to the job. I think you could refer to the [1].
>>
>> [1]
>> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29
>>
>> Best,
>> Guowei
>>
>>
>> On Tue, Dec 8, 2020 at 8:40 PM Jakub N <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Guowei,
>>
>> 1. Unfortunately the UDF and the job are not in the same fatjar.
>> Essentially there is only one "fatjar" containing the Flink
>> environment + the job, the UDF is separate.
>> 2. Yes, that is correct.
>> 3. As explained in 1. I don't submit job jars to the Flink
>> environment, instead the job is created and submitted within
>> the "fatjar"
>>
>>
>> Codewise nothing changed except for where the location of the UDF
>> was specified.
>> "Submitting to the environment" works as follows:
>>
>> 1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
>> 2. (Register UDF's)
>> 3. Create tables
>> 4. Query on the tables
>> 5. Execute the environment
>>
>> The overall process is executed as one program.
>> Apologies if any of these explanations are unclear or too vague.
>>
>> Kind regards,
>>
>> Jakub
>>
>> ------------------------------------------------------------------------
>> *Von:* Guowei Ma <[email protected] <mailto:[email protected]>>
>> *Gesendet:* Dienstag, 8. Dezember 2020 06:34
>> *An:* Jakub N <[email protected] <mailto:[email protected]>>
>> *Cc:* [email protected] <mailto:[email protected]>
>> <[email protected] <mailto:[email protected]>>
>> *Betreff:* Re: Flink UDF registration from jar at runtime
>>
>> Hi, Jakub
>> I am not familiar with the `sbt pack`. But I assume you are doing
>> following (correct me if I misunderstand you)
>> 1. The UDF and Job jar are in the same "fatjar"
>> 2. You "new" a UDF object in the job().
>> 3. You submit the "fatjar" to the local Flink environment.
>>
>> In theory there should not be any problem. Could share how you
>> change the code and how you submit your job to the local environment.
>>
>> Best,
>> Guowei
>>
>>
>> On Tue, Dec 8, 2020 at 2:53 AM Jakub N <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Guowei,
>>
>> It turned out for my application I unfortunately can't have
>> the UDF in the "job's" classpath. As I am using a local
>> Flink environment and `sbt pack` (similar to a fatjar) to
>> create launch scripts therefore, to my understanding, I can't
>> access the classpath (when the project is packed).
>> Are there any ways to add these UDF's from outside the classpath?
>>
>> Kind regards,
>>
>> Jakub
>>
>>
>> ------------------------------------------------------------------------
>> *Von:* Jakub N <[email protected]
>> <mailto:[email protected]>>
>> *Gesendet:* Montag, 7. Dezember 2020 12:59
>> *An:* Guowei Ma <[email protected]
>> <mailto:[email protected]>>
>> *Cc:* [email protected] <mailto:[email protected]>
>> <[email protected] <mailto:[email protected]>>
>> *Betreff:* Re: Flink UDF registration from jar at runtime
>>
>> Hi Guowei,
>>
>> Great thanks for your help. Your suggestion indeed solved the
>> issue. I moved `myFunction` to the class path where execution
>> starts.
>>
>> Kind regards,
>>
>> Jakub
>>
>>
>> ------------------------------------------------------------------------
>> *Von:* Guowei Ma <[email protected]
>> <mailto:[email protected]>>
>> *Gesendet:* Montag, 7. Dezember 2020 12:16
>> *An:* Jakub N <[email protected]
>> <mailto:[email protected]>>
>> *Cc:* [email protected] <mailto:[email protected]>
>> <[email protected] <mailto:[email protected]>>
>> *Betreff:* Re: Flink UDF registration from jar at runtime
>>
>> Hi Jakub,
>> I think the problem is that the `cls`, which you load at
>> runtime, is not in the thread context classloader.
>> Flink deserializes the `myFunction` object with the context
>> classloader.
>> So maybe you could put the myFunction in the job's class path.
>> Best,
>> Guowei
>>
>>
>> On Mon, Dec 7, 2020 at 5:54 PM Jakub N <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Guowei,
>>
>> Thanks for your help,
>> here is the relevant code (QueryCommand class):
>>
>> val fsSettings: EnvironmentSettings = EnvironmentSettings
>> .newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build()
>>
>> val fsEnv: StreamExecutionEnvironment =
>> StreamExecutionEnvironment.getExecutionEnvironment
>> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> fsEnv.getConfig.enableObjectReuse()
>>
>> val fsTableEnv: StreamTableEnvironment =
>> StreamTableEnvironment.create(fsEnv, fsSettings)
>>
>> val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
>> compiler.run(null, null, null, new
>> File("target/custom/myFunction.java").getPath)
>>
>> val root = new File("target/custom")
>> val classLoader: URLClassLoader = new
>> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
>> val cls = classLoader.loadClass("myFunction")
>> val instance = cls.newInstance(); val udf =
>> instance.asInstanceOf[ScalarFunction]
>> fsTableEnv.createTemporaryFunction("myFunction", udf)
>>
>> //creating Table... fsTableEnv.executeSql("SELECT a,
>> myFunction(a) FROM myTopic")
>>
>> def execute(): Unit = fsEnv.execute()
>>
>> myFunction.java
>>
>> import org.apache.flink.table.functions.ScalarFunction; public
>> class myFunction extends ScalarFunction {
>>
>> public String eval(String s) {
>> return "myFunction - " + s; }
>>
>> }
>>
>> Execution works as follows: A QueryCommand instance is
>> created, some properties are being set, /execute()/ will
>> be invoked
>>
>> Let me know if any other relevant information is missing,
>> alternatively you can also have a look at the source code
>> here (https://github.com/codefeedr/kafkaquery).
>>
>> Kind regards,
>>
>> Jakub
>>
>>
>>
>> ------------------------------------------------------------------------
>> *Von:* Guowei Ma <[email protected]
>> <mailto:[email protected]>>
>> *Gesendet:* Montag, 7. Dezember 2020 02:55
>> *An:* Jakub N <[email protected]
>> <mailto:[email protected]>>
>> *Cc:* [email protected]
>> <mailto:[email protected]> <[email protected]
>> <mailto:[email protected]>>
>> *Betreff:* Re: Flink UDF registration from jar at runtime
>>
>> Hi, Jakub
>> In theory there should not be any problem because you
>> could register the function object.
>> So would you like to share your code and the shell
>> command that you submit your job?
>> Best,
>> Guowei
>>
>>
>> On Mon, Dec 7, 2020 at 3:19 AM Jakub N
>> <[email protected] <mailto:[email protected]>> wrote:
>>
>> The current setup is: Data in Kafka -> Kafka
>> Connector -> StreamTableEnvironment -> execute Flink
>> SQL queries
>>
>> I would like to register Flink's User-defined
>> Functions from a jar or java class file during
>> runtime. What I have tried so far is using Java's
>> Classloader getting an instance of a ScalarFunction
>> (UDF) and registering it in the
>> StreamTableEnvironment. When I try executing a query
>> making use of the UDF I get the following exception:
>>
>>
>> Exception in thread "main"
>> java.lang.ClassNotFoundException: myFunction
>>
>> at
>>
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>>
>> at
>>
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>
>> at
>>
>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>
>> at java.base/java.lang.Class.forName0(Native
>> Method)
>>
>> at
>> java.base/java.lang.Class.forName(Class.java:398)
>>
>> at
>>
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>>
>> at
>>
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>
>> at
>>
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>
>> at
>>
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>
>> at
>>
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>>
>> at
>>
>> org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>>
>> at
>>
>> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>>
>> at
>>
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>>
>> at
>>
>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>>
>> at
>>
>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>>
>> at
>>
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>>
>> at
>>
>> scala.collection.TraversableLike.map(TraversableLike.scala:285)
>>
>> at
>>
>> scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>>
>> at
>>
>> scala.collection.AbstractTraversable.map(Traversable.scala:108)
>> ...
>>
>>
>> I have verified that the generated instance of the
>> UDF behaves as expected when invoking any of its methods.
>>
>> Do you have any ideas on why this is failing?
>>
signature.asc
Description: OpenPGP digital signature
