Re: Re: flink1.11加载外部jar包进行UDF注册
是的,正如@chenxuying 和@zhisheng 所说, 我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~ best, amenhub 发件人: zhisheng 发送时间: 2020-10-22 23:28 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册 hi flink 1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf jar 的路径地址,ps,这个参数只在 1.11 才支持 [1] https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs Best zhisheng Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html > > > > https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927 > > > > 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink1.11加载外部jar包进行UDF注册
hi flink 1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf jar 的路径地址,ps,这个参数只在 1.11 才支持 [1] https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs Best zhisheng Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html > > > > https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927 > > > > 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink1.11加载外部jar包进行UDF注册
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: Re: flink1.11加载外部jar包进行UDF注册
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常 2线程上下文类加载器是什么 不太明白这两点,可以写个代码例子看看吗 在 2020-10-15 19:47:20,"amen...@163.com" 写道: >追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? >那这种设置env的方式有可能还会造成其他什么问题? > >best, >amenhub > >发件人: amen...@163.com >发送时间: 2020-10-15 19:22 >收件人: user-zh >主题: Re: Re: flink1.11加载外部jar包进行UDF注册 >非常感谢您的回复! > >对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? >因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF >jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 > >期待您的回复,谢谢~ > >best, >amenhub >发件人: cxydeve...@163.com >发送时间: 2020-10-15 17:46 >收件人: user-zh >主题: Re: flink1.11加载外部jar包进行UDF注册 >我们用方法是通过反射设置env的配置,增加pipeline.classpaths >具体代码如下 >public static void main(final String[] args) throws Exception { >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >StreamTableEnvironment tableEnvironment = >StreamTableEnvironment.create(env, settings); >//String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; >String path = "https://...xxx.jar";; >loadJar(new URL(path)); >Field configuration = >StreamExecutionEnvironment.class.getDeclaredField("configuration"); >configuration.setAccessible(true); >Configuration o = (Configuration)configuration.get(env); >Field confData = Configuration.class.getDeclaredField("confData"); >confData.setAccessible(true); >Map temp = (Map)confData.get(o); >List jarList = new ArrayList<>(); >jarList.add(path); >temp.put("pipeline.classpaths",jarList); >tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS >'flinksql.function.udf.CxyTestReturnSelf'"); >tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + >" f_sequence INT,\n" + >" f_random INT,\n" + >" f_random_str STRING,\n" + >" ts AS localtimestamp,\n" + >" WATERMARK FOR ts AS ts\n" + >") WITH (\n" + >" 'connector' = 'datagen',\n" + >" 'rows-per-second'='5',\n" + >"\n" + >" 'fields.f_sequence.kind'='sequence',\n" + >" 'fields.f_sequence.start'='1',\n" + >" 'fields.f_sequence.end'='1000',\n" + >"\n" + >" 'fields.f_random.min'='1',\n" + >" 'fields.f_random.max'='1000',\n" + >"\n" + >" 'fields.f_random_str.length'='10'\n" + >")"); >tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + >"f_random_str STRING" + >") WITH (\n" + >"'connector' = 'print'\n" + >")"); >tableEnvironment.executeSql( >"insert into sinktable " + >"select CxyTestReturnSelf(f_random_str) " + >"from sourceTable"); >} >//动态加载Jar >public static void loadJar(URL jarUrl) { >//从URLClassLoader类加载器中获取类的addURL方法 >Method method = null; >try { >method = URLClassLoader.class.getDeclaredMethod("addURL", >URL.class); >} catch (NoSuchMethodException | SecurityException e1) { >e1.printStackTrace(); >} >// 获取方法的访问权限 >boolean accessible = method.isAccessible(); >try { >//修改访问权限为可写 >if (accessible == false) { >method.setAccessible(true); >} >// 获取系统类加载器 >URLClassLoader classLoader = (URLClassLoader) >ClassLoader.getSystemClassLoader(); >//jar路径加入到系统url路径里 >method.invoke(classLoader, jarUrl); >} catch (Exception e) { >e.printStackTrace(); >} finally { >method.setAccessible(accessible); >} >} >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: flink1.11加载外部jar包进行UDF注册
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? 那这种设置env的方式有可能还会造成其他什么问题? best, amenhub 发件人: amen...@163.com 发送时间: 2020-10-15 19:22 收件人: user-zh 主题: Re: Re: flink1.11加载外部jar包进行UDF注册 非常感谢您的回复! 对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? 因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 期待您的回复,谢谢~ best, amenhub 发件人: cxydeve...@163.com 发送时间: 2020-10-15 17:46 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册 我们用方法是通过反射设置env的配置,增加pipeline.classpaths 具体代码如下 public static void main(final String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); //String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; String path = "https://...xxx.jar";; loadJar(new URL(path)); Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configuration.setAccessible(true); Configuration o = (Configuration)configuration.get(env); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(path); temp.put("pipeline.classpaths",jarList); tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS 'flinksql.function.udf.CxyTestReturnSelf'"); tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + "\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + "\n" + " 'fields.f_random_str.length'='10'\n" + ")"); tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + "f_random_str STRING" + ") WITH (\n" + "'connector' = 'print'\n" + ")"); tableEnvironment.executeSql( "insert into sinktable " + "select CxyTestReturnSelf(f_random_str) " + "from sourceTable"); } //动态加载Jar public static void loadJar(URL jarUrl) { //从URLClassLoader类加载器中获取类的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (accessible == false) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复! 对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? 因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 期待您的回复,谢谢~ best, amenhub 发件人: cxydeve...@163.com 发送时间: 2020-10-15 17:46 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册 我们用方法是通过反射设置env的配置,增加pipeline.classpaths 具体代码如下 public static void main(final String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); //String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; String path = "https://...xxx.jar";; loadJar(new URL(path)); Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configuration.setAccessible(true); Configuration o = (Configuration)configuration.get(env); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(path); temp.put("pipeline.classpaths",jarList); tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS 'flinksql.function.udf.CxyTestReturnSelf'"); tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + "\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + "\n" + " 'fields.f_random_str.length'='10'\n" + ")"); tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + "f_random_str STRING" + ") WITH (\n" + "'connector' = 'print'\n" + ")"); tableEnvironment.executeSql( "insert into sinktable " + "select CxyTestReturnSelf(f_random_str) " + "from sourceTable"); } //动态加载Jar public static void loadJar(URL jarUrl) { //从URLClassLoader类加载器中获取类的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (accessible == false) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths 具体代码如下 public static void main(final String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); //String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; String path = "https://...xxx.jar";; loadJar(new URL(path)); Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configuration.setAccessible(true); Configuration o = (Configuration)configuration.get(env); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(path); temp.put("pipeline.classpaths",jarList); tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS 'flinksql.function.udf.CxyTestReturnSelf'"); tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + "\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + "\n" + " 'fields.f_random_str.length'='10'\n" + ")"); tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + "f_random_str STRING" + ") WITH (\n" + "'connector' = 'print'\n" + ")"); tableEnvironment.executeSql( "insert into sinktable " + "select CxyTestReturnSelf(f_random_str) " + "from sourceTable"); } //动态加载Jar public static void loadJar(URL jarUrl) { //从URLClassLoader类加载器中获取类的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (accessible == false) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/
flink1.11加载外部jar包进行UDF注册
hi, everyone 近期有做一个关于从外部路径加载UDF的开发,但报了如下异常:(截取主要的异常信息) org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.xxx.xxx.udf.Uppercase ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-1318a525-1b5a-4c07-808e-f62083c3fb11/job_a5501605ff554915a81ae12e3018e77d/blob_p-b0411adc6fb3d602ed03076ddc3d1bf3e6a63319-48d1e8f3c1b25d4e2b78242429834e31' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) [flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.lang.ClassNotFoundException: com.xxx.xxx.udf.Uppercase at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_171] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_171] 我这边猜测的原因是进行外部jar包加载进行createTemporarySystemFunction的时候,在flink运行环境中没有将外部jar加载进来,但对这个猜测也还是很迷惑,菜鸟操作,希望有大佬帮忙提供解决方案或思路,谢谢~ best, amenhub