AW: Flink UDF registration from jar at runtime

2020-12-08 Thread Jakub N
Hi Guowei,


  1.  Unfortunately the UDF and the job are not in the same fatjar. Essentially 
there is only one "fatjar" containing the Flink environment + the job, the UDF 
is separate.
  2.  Yes,  that is correct.
  3.  As explained in 1.  I don't submit job jars to the Flink environment, 
instead the job is created and submitted within the "fatjar"

Codewise nothing changed except for where the location of the UDF was specified.
"Submitting to the environment" works as follows:

  1.  Create a StreamExecutionEnvironment -> StreamTableEnvironment
  2.  (Register UDF's)
  3.  Create tables
  4.  Query on the tables
  5.  Execute the environment

The overall process is executed as one program.
Apologies if any of these explanations are unclear or too vague.

Kind regards,

Jakub


Von: Guowei Ma 
Gesendet: Dienstag, 8. Dezember 2020 06:34
An: Jakub N 
Cc: user@flink.apache.org 
Betreff: Re: Flink UDF registration from jar at runtime

Hi, Jakub
I am not familiar with the `sbt pack`. But I assume you are doing following 
(correct me if I misunderstand you)
1. The UDF and Job jar are in the same "fatjar"
2. You "new" a UDF object in the job().
3. You submit the  "fatjar" to the local Flink environment.

In theory there should not be any problem. Could share how you change the code 
and how you submit your job to the local environment.

Best,
Guowei


On Tue, Dec 8, 2020 at 2:53 AM Jakub N 
mailto:jakub1...@hotmail.de>> wrote:
Hi Guowei,

It turned out for my application I unfortunately can't have the UDF in the 
"job's"  classpath. As I am using a local Flink environment and `sbt pack` 
(similar to a fatjar) to create launch scripts therefore, to my understanding, 
I can't access the classpath (when the project is packed).
Are there any ways to add these UDF's from outside the classpath?

Kind regards,

Jakub


Von: Jakub N mailto:jakub1...@hotmail.de>>
Gesendet: Montag, 7. Dezember 2020 12:59
An: Guowei Ma mailto:guowei@gmail.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Betreff: Re: Flink UDF registration from jar at runtime

Hi Guowei,

Great thanks for your help. Your suggestion indeed solved the issue. I moved 
`myFunction` to the class path where execution starts.

Kind regards,

Jakub


Von: Guowei Ma mailto:guowei@gmail.com>>
Gesendet: Montag, 7. Dezember 2020 12:16
An: Jakub N mailto:jakub1...@hotmail.de>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Betreff: Re: Flink UDF registration from jar at runtime

Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in the 
thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N 
mailto:jakub1...@hotmail.de>> wrote:
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):

val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new 
File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new 
URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

def execute(): Unit = fsEnv.execute()

myFunction.java

import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

public String eval(String s) {
return "myFunction - " + s;
}

}

Execution works as follows: A QueryCommand instance is created, some properties 
are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can 
also have a look at the source code here 
(https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma mailto:guowei@gmail.com>>
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N mailto:jakub1...@hotmail.de>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Betreff: Re: Flink UDF registration from jar at runtime

Hi, Jakub
In theory there should not be any problem because you could 

AW: Flink UDF registration from jar at runtime

2020-12-07 Thread Jakub N
Hi Guowei,

Thanks for your help,
here is the relevant code (QueryCommand class):

val fsSettings: EnvironmentSettings = EnvironmentSettings
  .newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()

val fsEnv: StreamExecutionEnvironment =
  StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.enableObjectReuse()

val fsTableEnv: StreamTableEnvironment =
  StreamTableEnvironment.create(fsEnv, fsSettings)

val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
compiler.run(null, null, null, new 
File("target/custom/myFunction.java").getPath)

val root = new File("target/custom")
val classLoader: URLClassLoader = new 
URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
val cls = classLoader.loadClass("myFunction")
val instance = cls.newInstance();
val udf = instance.asInstanceOf[ScalarFunction]
fsTableEnv.createTemporaryFunction("myFunction", udf)

//creating Table...

fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")

def execute(): Unit = fsEnv.execute()

myFunction.java

import org.apache.flink.table.functions.ScalarFunction;

public class myFunction extends ScalarFunction {

public String eval(String s) {
return "myFunction - " + s;
}

}

Execution works as follows: A QueryCommand instance is created, some properties 
are being set, execute() will be invoked

Let me know if any other relevant information is missing, alternatively you can 
also have a look at the source code here 
(https://github.com/codefeedr/kafkaquery).

Kind regards,

Jakub



Von: Guowei Ma 
Gesendet: Montag, 7. Dezember 2020 02:55
An: Jakub N 
Cc: user@flink.apache.org 
Betreff: Re: Flink UDF registration from jar at runtime

Hi, Jakub
In theory there should not be any problem because you could register the 
function object.
So would you like to share your code and the shell command that you submit your 
job?
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N 
mailto:jakub1...@hotmail.de>> wrote:
The current setup is: Data in Kafka -> Kafka Connector -> 
StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java 
class file during runtime. What I have tried so far is using Java's Classloader 
getting an instance of a ScalarFunction (UDF) and registering it in the 
StreamTableEnvironment. When I try executing a query making use of the UDF I 
get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at