追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
那这种设置env的方式有可能还会造成其他什么问题?

best,
amenhub
 
发件人: amen...@163.com
发送时间: 2020-10-15 19:22
收件人: user-zh
主题: Re: Re: flink1.11加载外部jar包进行UDF注册
非常感谢您的回复!
 
对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
 
期待您的回复,谢谢~
 
best, 
amenhub
发件人: cxydeve...@163.com
发送时间: 2020-10-15 17:46
收件人: user-zh
主题: Re: flink1.11加载外部jar包进行UDF注册
我们用方法是通过反射设置env的配置,增加pipeline.classpaths
具体代码如下
public static void main(final String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(env, settings);
//        String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
        String path = "https://...xxx.jar";;
        loadJar(new URL(path));
        Field configuration =
StreamExecutionEnvironment.class.getDeclaredField("configuration");
        configuration.setAccessible(true);
        Configuration o = (Configuration)configuration.get(env);
        Field confData = Configuration.class.getDeclaredField("confData");
        confData.setAccessible(true);
        Map<String,Object> temp = (Map<String,Object>)confData.get(o);
        List<String> jarList = new ArrayList<>();
        jarList.add(path);
        temp.put("pipeline.classpaths",jarList);
        tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
'flinksql.function.udf.CxyTestReturnSelf'");
        tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
                " f_sequence INT,\n" +
                " f_random INT,\n" +
                " f_random_str STRING,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                "\n" +
                " 'fields.f_sequence.kind'='sequence',\n" +
                " 'fields.f_sequence.start'='1',\n" +
                " 'fields.f_sequence.end'='1000',\n" +
                "\n" +
                " 'fields.f_random.min'='1',\n" +
                " 'fields.f_random.max'='1000',\n" +
                "\n" +
                " 'fields.f_random_str.length'='10'\n" +
                ")");
        tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
                "    f_random_str STRING" +
                ") WITH (\n" +
                "    'connector' = 'print'\n" +
                ")");
        tableEnvironment.executeSql(
                "insert into sinktable " +
                    "select CxyTestReturnSelf(f_random_str) " +
                    "from sourceTable");
    }
    //动态加载Jar
    public static void loadJar(URL jarUrl) {
        //从URLClassLoader类加载器中获取类的addURL方法
        Method method = null;
        try {
            method = URLClassLoader.class.getDeclaredMethod("addURL",
URL.class);
        } catch (NoSuchMethodException | SecurityException e1) {
            e1.printStackTrace();
        }
        // 获取方法的访问权限
        boolean accessible = method.isAccessible();
        try {
            //修改访问权限为可写
            if (accessible == false) {
                method.setAccessible(true);
            }
            // 获取系统类加载器
            URLClassLoader classLoader = (URLClassLoader)
ClassLoader.getSystemClassLoader();
            //jar路径加入到系统url路径里
            method.invoke(classLoader, jarUrl);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            method.setAccessible(accessible);
        }
    }
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复