我们将开发的udf放在远程服务器,需要动态地加载jar包。Flink版本1.10,代码如下 public static void main(String[] args) throws Exception { StreamExecutionEnvironment exeEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSet = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment stEnv = StreamTableEnvironment.create(exeEnv, envSet); String udfPath = "http://www.xxx.com/udf.jar"; loadClassPath(udfPath); stEnv.sqlUpdate("create function test as 'com.xx.TestUdf' "); stEnv.sqlUpdate("create table ...."); exeEnv.execute("remote_udf_sql_job"); }
/** 通过类加载器加载class,此处省略代码 **/ public static void loadClassPath(String jar) { } 目前生成任务执行拓扑图是没有问题的,提交任务后运行会报找不到com.xx.TestUdf类。猜测是JobManager生成执行拓扑图的时候本地通过loadClassPath方法成功加载了class,但是到了TM执行的是算子中的逻辑,不会执行main方法里面的加载类的步骤,所以出现ClassNotFoundException。 目前试了几种方法: 1. 在jobGraph中增加依赖的jar包,但是exeEnv.execute(streamGraph)会重新生成jobGraph,导致增加的jars被清空。 StreamGraph streamGraph = exeEnv.getStreamGraph("remote_udf_sql_job", false); JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.adddJars(jar); exeEnv.execute(streamGraph); 2. 按照网上的一个方法https://blog.csdn.net/weixin_28893597/article/details/112467465 ,增加配置pipeline.classpaths和pipeline.jars均无效。 flink报错信息: Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.xx.TestUdf ClassLoader info: URL ClassLoader: file: '/flink-1.10.2/lib/slf4j-api-1.7.30.jar' (valid JAR) file: '/flink-1.10.2/lib/logback-core-1.2.3.jar' (valid JAR) file: '/flink-1.10.2/lib/logback-classic-1.2.3.jar' (valid JAR) file: '/flink-1.10.2/lib/log4j-over-slf4j-1.7.30.jar' (valid JAR) file: '/flink-1.10.2/lib/jcl-over-slf4j-1.7.30.jar' (valid JAR) file: '/tmp/io_tmp_dirs/blobStore-b767d164-c9df-4dd6-a04f-c53982967372/job_35484161537386467514242760035484/blob_p-635c9dc04420693880fbf0f9505979d4e1a9c976-e1d64094d470046aec598c075fde0c30' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:419) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:442) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:789) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:594) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.xx.TestUdf at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:85) at org.apache.flink.util.ParentFirstClassLoader.loadClassWithoutExceptionHandling(ParentFirstClassLoader.java:64) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:71) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:242) ... 12 more