1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常


在 2020-10-15 19:47:20,"amen...@163.com" <amen...@163.com> 写道:
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>public static void main(final String[] args) throws Exception {
>        StreamExecutionEnvironment env =
>        EnvironmentSettings settings =
>        StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//        String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>        String path = "https://...xxx.jar";;
>        loadJar(new URL(path));
>        Field configuration =
>        configuration.setAccessible(true);
>        Configuration o = (Configuration)configuration.get(env);
>        Field confData = Configuration.class.getDeclaredField("confData");
>        confData.setAccessible(true);
>        Map<String,Object> temp = (Map<String,Object>)confData.get(o);
>        List<String> jarList = new ArrayList<>();
>        jarList.add(path);
>        temp.put("pipeline.classpaths",jarList);
>        tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>        tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>                " f_sequence INT,\n" +
>                " f_random INT,\n" +
>                " f_random_str STRING,\n" +
>                " ts AS localtimestamp,\n" +
>                " WATERMARK FOR ts AS ts\n" +
>                ") WITH (\n" +
>                " 'connector' = 'datagen',\n" +
>                " 'rows-per-second'='5',\n" +
>                "\n" +
>                " 'fields.f_sequence.kind'='sequence',\n" +
>                " 'fields.f_sequence.start'='1',\n" +
>                " 'fields.f_sequence.end'='1000',\n" +
>                "\n" +
>                " 'fields.f_random.min'='1',\n" +
>                " 'fields.f_random.max'='1000',\n" +
>                "\n" +
>                " 'fields.f_random_str.length'='10'\n" +
>                ")");
>        tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>                "    f_random_str STRING" +
>                ") WITH (\n" +
>                "    'connector' = 'print'\n" +
>                ")");
>        tableEnvironment.executeSql(
>                "insert into sinktable " +
>                    "select CxyTestReturnSelf(f_random_str) " +
>                    "from sourceTable");
>    }
>    //动态加载Jar
>    public static void loadJar(URL jarUrl) {
>        //从URLClassLoader类加载器中获取类的addURL方法
>        Method method = null;
>        try {
>            method = URLClassLoader.class.getDeclaredMethod("addURL",
>        } catch (NoSuchMethodException | SecurityException e1) {
>            e1.printStackTrace();
>        }
>        // 获取方法的访问权限
>        boolean accessible = method.isAccessible();
>        try {
>            //修改访问权限为可写
>            if (accessible == false) {
>                method.setAccessible(true);
>            }
>            // 获取系统类加载器
>            URLClassLoader classLoader = (URLClassLoader)
>            //jar路径加入到系统url路径里
>            method.invoke(classLoader, jarUrl);
>        } catch (Exception e) {
>            e.printStackTrace();
>        } finally {
>            method.setAccessible(accessible);
>        }
>    }
>Sent from: http://apache-flink.147419.n8.nabble.com/

Reply via email to