我们将开发的udf放在远程服务器,需要动态地加载jar包。Flink版本1.10,代码如下
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment exeEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings envSet = EnvironmentSettings
            .newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();
    StreamTableEnvironment stEnv = StreamTableEnvironment.create(exeEnv, 
envSet);
    String udfPath = "http://www.xxx.com/udf.jar";;
    loadClassPath(udfPath);
    stEnv.sqlUpdate("create function test as 'com.xx.TestUdf' ");
    stEnv.sqlUpdate("create table ....");
    exeEnv.execute("remote_udf_sql_job");
}


/** 通过类加载器加载class,此处省略代码 **/
public static void loadClassPath(String jar) {
}


目前生成任务执行拓扑图是没有问题的,提交任务后运行会报找不到com.xx.TestUdf类。猜测是JobManager生成执行拓扑图的时候本地通过loadClassPath方法成功加载了class,但是到了TM执行的是算子中的逻辑,不会执行main方法里面的加载类的步骤,所以出现ClassNotFoundException。
目前试了几种方法:
1. 在jobGraph中增加依赖的jar包,但是exeEnv.execute(streamGraph)会重新生成jobGraph,导致增加的jars被清空。
StreamGraph streamGraph = exeEnv.getStreamGraph("remote_udf_sql_job", false);
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.adddJars(jar);
exeEnv.execute(streamGraph);
2. 按照网上的一个方法https://blog.csdn.net/weixin_28893597/article/details/112467465  
,增加配置pipeline.classpaths和pipeline.jars均无效。


flink报错信息:
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: com.xx.TestUdf
ClassLoader info: URL ClassLoader:
    file: '/flink-1.10.2/lib/slf4j-api-1.7.30.jar' (valid JAR)
    file: '/flink-1.10.2/lib/logback-core-1.2.3.jar' (valid JAR)
    file: '/flink-1.10.2/lib/logback-classic-1.2.3.jar' (valid JAR)
    file: '/flink-1.10.2/lib/log4j-over-slf4j-1.7.30.jar' (valid JAR)
    file: '/flink-1.10.2/lib/jcl-over-slf4j-1.7.30.jar' (valid JAR)
    file: 
'/tmp/io_tmp_dirs/blobStore-b767d164-c9df-4dd6-a04f-c53982967372/job_35484161537386467514242760035484/blob_p-635c9dc04420693880fbf0f9505979d4e1a9c976-e1d64094d470046aec598c075fde0c30'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:442)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:789)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:594)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.xx.TestUdf
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:85)
at 
org.apache.flink.util.ParentFirstClassLoader.loadClassWithoutExceptionHandling(ParentFirstClassLoader.java:64)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:71)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:242)
... 12 more





回复