Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 amen...@163.com
是的,正如@chenxuying 和@zhisheng 所说,

我这边采用的方案是通过pipeline.classpaths参数将需要的udf jar添加到类路径中,但是当task被分配到tm去执行时仍需要找到所需udf 
jar才行,所以在1.11版本中我采用-yt参数将/plugins插件目录上传至hdfs,即可解决这个问题~

best,
amenhub



 
发件人: zhisheng
发送时间: 2020-10-22 23:28
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
hi
 
flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持
 
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs
 
Best
zhisheng
 
Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:
 
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11加载外部jar包进行UDF注册

2020-10-22 文章 zhisheng
hi

flink  1.11 如果是要管理 udf jar 的话应该是可以通过 yarn-provided-lib-dirs [1] 这个参数去控制 udf
jar 的路径地址,ps,这个参数只在 1.11 才支持

 [1]
https://ci.apache.org/projects/flink/flink-docs-stable/zh/ops/config.html#yarn-provided-lib-dirs

Best
zhisheng

Husky Zeng <568793...@qq.com> 于2020年10月22日周四 上午11:31写道:

>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Is-there-a-way-to-avoid-submit-hive-udf-s-resources-when-we-submit-a-job-td38204.html
>
>
>
> https://issues.apache.org/jira/browse/FLINK-19335?focusedCommentId=17199927=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17199927
>
>
>
> 我们也在搞一个从hdfs路径加载udf的功能,你看下是不是同一个问题?可以交流一下。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 文章 chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar;;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
那这种设置env的方式有可能还会造成其他什么问题?

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-10-15 19:22
收件人: user-zh
主题: Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复!
 
对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
 
期待您的回复,谢谢~
 
best, 
amenhub
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 amen...@163.com
非常感谢您的回复!

对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。

期待您的回复,谢谢~

best, 
amenhub
 
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));
 
Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);
 
Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);
 
tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}
//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11加载外部jar包进行UDF注册

2020-10-15 文章 cxydeve...@163.com
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
String path = "https://...xxx.jar;;
loadJar(new URL(path));

Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
configuration.setAccessible(true);
Configuration o = (Configuration)configuration.get(env);

Field confData = Configuration.class.getDeclaredField("confData");
confData.setAccessible(true);
Map temp = (Map)confData.get(o);
List jarList = new ArrayList<>();
jarList.add(path);
temp.put("pipeline.classpaths",jarList);

tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
" f_sequence INT,\n" +
" f_random INT,\n" +
" f_random_str STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='5',\n" +
"\n" +
" 'fields.f_sequence.kind'='sequence',\n" +
" 'fields.f_sequence.start'='1',\n" +
" 'fields.f_sequence.end'='1000',\n" +
"\n" +
" 'fields.f_random.min'='1',\n" +
" 'fields.f_random.max'='1000',\n" +
"\n" +
" 'fields.f_random_str.length'='10'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
"f_random_str STRING" +
") WITH (\n" +
"'connector' = 'print'\n" +
")");
tableEnvironment.executeSql(
"insert into sinktable " +
"select CxyTestReturnSelf(f_random_str) " +
"from sourceTable");
}

//动态加载Jar
public static void loadJar(URL jarUrl) {
//从URLClassLoader类加载器中获取类的addURL方法
Method method = null;
try {
method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
} catch (NoSuchMethodException | SecurityException e1) {
e1.printStackTrace();
}
// 获取方法的访问权限
boolean accessible = method.isAccessible();
try {
//修改访问权限为可写
if (accessible == false) {
method.setAccessible(true);
}
// 获取系统类加载器
URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
//jar路径加入到系统url路径里
method.invoke(classLoader, jarUrl);
} catch (Exception e) {
e.printStackTrace();
} finally {
method.setAccessible(accessible);
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11加载外部jar包进行UDF注册

2020-10-13 文章 amen...@163.com
hi, everyone

近期有做一个关于从外部路径加载UDF的开发,但报了如下异常:(截取主要的异常信息)

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user 
class: com.xxx.xxx.udf.Uppercase
ClassLoader info: URL ClassLoader:
file: 
'/tmp/blobStore-1318a525-1b5a-4c07-808e-f62083c3fb11/job_a5501605ff554915a81ae12e3018e77d/blob_p-b0411adc6fb3d602ed03076ddc3d1bf3e6a63319-48d1e8f3c1b25d4e2b78242429834e31'
 (valid JAR)
Class not resolvable through given classloader.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:155)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
 ~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) 
~[flink-streaming-java_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171]
Caused by: java.lang.ClassNotFoundException: com.xxx.xxx.udf.Uppercase
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_171]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_171]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_171]
at java.lang.Class.forName0(Native Method) ~[?:1.8.0_171]
at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_171]

我这边猜测的原因是进行外部jar包加载进行createTemporarySystemFunction的时候,在flink运行环境中没有将外部jar加载进来,但对这个猜测也还是很迷惑,菜鸟操作,希望有大佬帮忙提供解决方案或思路,谢谢~

best,
amenhub