Hi Jakub,

As Guowei said the UDF must be present in the user classloader. It must
be there when compiling the program and when executing on the cluster.
As of now the TableEnvironment uses the Thread context classloader as
the "user classloader" when compiling the query. Therefore you can do
the trick via:

|ClassLoader yourClassloader = ... // create your classloader with the UDF|

|try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(|||yourClassloader|)) {|

|    fsTableEnv.createTemporaryFunction("myFunction", udf)||
||
||    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")|

|}
|

Take a look at the TemporaryClassLoaderContext[1] for a nice way how to
do it with a cleanup at the end.

To solve the second problem of having the UDF on the classpath when
executing. If you are just spawning a LocalEnvironment the above should
do the trick as it will use the context classloader. If you are
submitting to a cluster, you can submit multiple jars as part of a
single job either via the RemoteEnvironment or the flink run command.

That's how we submit UDFs from separate jars in the sql-client. You can
try to go through a few classes there and see how it is done. I am
afraid it's not the easiest task as there are quite a few classes to
navigate through. You could start from e.g.
org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]

[1]
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java

[2]
https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305



On 10/12/2020 09:15, Guowei Ma wrote:
> Hi,  Jakub
> If I understand correctly you want the job, which you submitted, could
> load some table function which does not in the job jar.
> I don't think Flink could support this natively.(Maybe other guys know).
> But I think this requirement is like some code generated. You need to
> submit the "code" to the job. I think you could refer to the [1].
>
> [1]
> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29
>
> Best,
> Guowei
>
>
> On Tue, Dec 8, 2020 at 8:40 PM Jakub N <jakub1...@hotmail.de
> <mailto:jakub1...@hotmail.de>> wrote:
>
>     Hi Guowei,
>
>      1. Unfortunately the UDF and the job are not in the same fatjar.
>         Essentially there is only one "fatjar" containing the Flink
>         environment + the job, the UDF is separate. 
>      2. Yes,  that is correct.
>      3. As explained in 1.  I don't submit job jars to the Flink
>         environment, instead the job is created and submitted within
>         the "fatjar"
>
>
>     Codewise nothing changed except for where the location of the UDF
>     was specified. 
>     "Submitting to the environment" works as follows:
>
>      1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
>      2. (Register UDF's)
>      3. Create tables
>      4. Query on the tables
>      5. Execute the environment
>
>     The overall process is executed as one program.
>     Apologies if any of these explanations are unclear or too vague.
>
>     Kind regards,
>
>     Jakub
>
>     ------------------------------------------------------------------------
>     *Von:* Guowei Ma <guowei....@gmail.com <mailto:guowei....@gmail.com>>
>     *Gesendet:* Dienstag, 8. Dezember 2020 06:34
>     *An:* Jakub N <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>>
>     *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>     <user@flink.apache.org <mailto:user@flink.apache.org>>
>     *Betreff:* Re: Flink UDF registration from jar at runtime
>      
>     Hi, Jakub
>     I am not familiar with the `sbt pack`. But I assume you are doing
>     following (correct me if I misunderstand you)
>     1. The UDF and Job jar are in the same "fatjar" 
>     2. You "new" a UDF object in the job(). 
>     3. You submit the  "fatjar" to the local Flink environment. 
>
>     In theory there should not be any problem. Could share how you
>     change the code and how you submit your job to the local environment.
>
>     Best,
>     Guowei
>
>
>     On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de
>     <mailto:jakub1...@hotmail.de>> wrote:
>
>         Hi Guowei,
>
>         It turned out for my application I unfortunately can't have
>         the UDF in the "job's"  classpath. As I am using a local Flink
>         environment and `sbt pack` (similar to a fatjar) to create
>         launch scripts therefore, to my understanding, I can't access
>         the classpath (when the project is packed). 
>         Are there any ways to add these UDF's from outside the classpath?
>
>         Kind regards,
>
>         Jakub
>
>         
> ------------------------------------------------------------------------
>         *Von:* Jakub N <jakub1...@hotmail.de
>         <mailto:jakub1...@hotmail.de>>
>         *Gesendet:* Montag, 7. Dezember 2020 12:59
>         *An:* Guowei Ma <guowei....@gmail.com
>         <mailto:guowei....@gmail.com>>
>         *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>         <user@flink.apache.org <mailto:user@flink.apache.org>>
>         *Betreff:* Re: Flink UDF registration from jar at runtime
>          
>         Hi Guowei,
>
>         Great thanks for your help. Your suggestion indeed solved the
>         issue. I moved `myFunction` to the class path where execution
>         starts.
>
>         Kind regards,
>
>         Jakub 
>
>         
> ------------------------------------------------------------------------
>         *Von:* Guowei Ma <guowei....@gmail.com
>         <mailto:guowei....@gmail.com>>
>         *Gesendet:* Montag, 7. Dezember 2020 12:16
>         *An:* Jakub N <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>>
>         *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>         <user@flink.apache.org <mailto:user@flink.apache.org>>
>         *Betreff:* Re: Flink UDF registration from jar at runtime
>          
>         Hi Jakub,
>         I think the problem is that the `cls`, which you load at
>         runtime, is not in the thread context classloader.
>         Flink deserializes the `myFunction` object with the context
>         classloader.
>         So maybe you could put the myFunction in the job's class path.
>         Best,
>         Guowei
>
>
>         On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de
>         <mailto:jakub1...@hotmail.de>> wrote:
>
>             Hi Guowei,
>
>             Thanks for your help,
>             here is the relevant code (QueryCommand class):
>
>             val fsSettings: EnvironmentSettings = EnvironmentSettings
>               .newInstance()
>               .useBlinkPlanner()
>               .inStreamingMode()
>               .build()
>
>             val fsEnv: StreamExecutionEnvironment =
>               StreamExecutionEnvironment.getExecutionEnvironment 
> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>             fsEnv.getConfig.enableObjectReuse()
>
>             val fsTableEnv: StreamTableEnvironment =
>               StreamTableEnvironment.create(fsEnv, fsSettings)
>
>             val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler 
> compiler.run(null, null, null, new 
> File("target/custom/myFunction.java").getPath)
>
>             val root = new File("target/custom")
>             val classLoader: URLClassLoader = new 
> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
>             val cls = classLoader.loadClass("myFunction")
>             val instance = cls.newInstance(); val udf = 
> instance.asInstanceOf[ScalarFunction]
>             fsTableEnv.createTemporaryFunction("myFunction", udf)
>
>             //creating Table... fsTableEnv.executeSql("SELECT a, 
> myFunction(a) FROM myTopic")
>
>             def execute(): Unit = fsEnv.execute()
>
>             myFunction.java
>
>             import org.apache.flink.table.functions.ScalarFunction; public 
> class myFunction extends ScalarFunction {
>
>                 public String eval(String s) {
>                     return "myFunction - " + s;     }
>
>             }
>
>             Execution works as follows: A QueryCommand instance is
>             created, some properties are being set, /execute()/ will
>             be invoked
>
>             Let me know if any other relevant information is missing,
>             alternatively you can also have a look at the source code
>             here (https://github.com/codefeedr/kafkaquery).
>
>             Kind regards,
>
>             Jakub
>
>
>             
> ------------------------------------------------------------------------
>             *Von:* Guowei Ma <guowei....@gmail.com
>             <mailto:guowei....@gmail.com>>
>             *Gesendet:* Montag, 7. Dezember 2020 02:55
>             *An:* Jakub N <jakub1...@hotmail.de
>             <mailto:jakub1...@hotmail.de>>
>             *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>             <user@flink.apache.org <mailto:user@flink.apache.org>>
>             *Betreff:* Re: Flink UDF registration from jar at runtime
>              
>             Hi, Jakub
>             In theory there should not be any problem because you
>             could register the function object.
>             So would you like to share your code and the shell command
>             that you submit your job? 
>             Best,
>             Guowei
>
>
>             On Mon, Dec 7, 2020 at 3:19 AM Jakub N
>             <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>> wrote:
>
>                 The current setup is: Data in Kafka -> Kafka Connector
>                 -> StreamTableEnvironment -> execute Flink SQL queries
>
>                 I would like to register Flink's User-defined
>                 Functions from a jar or java class file during
>                 runtime. What I have tried so far is using Java's
>                 Classloader getting an instance of a ScalarFunction
>                 (UDF) and registering it in the
>                 StreamTableEnvironment. When I try executing a query
>                 making use of the UDF I get the following exception:
>
>
>                         Exception in thread "main"
>                         java.lang.ClassNotFoundException: myFunction
>
>                         at
>                         
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
>                         at
>                         
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
>                         at
>                         
> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
>                         at java.base/java.lang.Class.forName0(Native
>                         Method)
>
>                         at
>                         java.base/java.lang.Class.forName(Class.java:398)
>
>                         at
>                         
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>
>                         at
>                         
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>
>                         at
>                         
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>
>                         at
>                         
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>
>                         at
>                         
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>
>                         at
>                         
> org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>
>                         at
>                         
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>
>                         at
>                         
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>
>                         at
>                         
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
>                         at
>                         
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
>                         at
>                         
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
>                         at
>                         
> scala.collection.TraversableLike.map(TraversableLike.scala:285)
>
>                         at
>                         
> scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>
>                         at
>                         
> scala.collection.AbstractTraversable.map(Traversable.scala:108)
>                         ...
>
>
>                 I have verified that the generated instance of the UDF
>                 behaves as expected when invoking any of its methods.
>
>                 Do you have any ideas on why this is failing?
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to