追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? 那这种设置env的方式有可能还会造成其他什么问题?
best, amenhub 发件人: amen...@163.com 发送时间: 2020-10-15 19:22 收件人: user-zh 主题: Re: Re: flink1.11加载外部jar包进行UDF注册 非常感谢您的回复! 对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? 因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 期待您的回复,谢谢~ best, amenhub 发件人: cxydeve...@163.com 发送时间: 2020-10-15 17:46 收件人: user-zh 主题: Re: flink1.11加载外部jar包进行UDF注册 我们用方法是通过反射设置env的配置,增加pipeline.classpaths 具体代码如下 public static void main(final String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); // String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; String path = "https://...xxx.jar"; loadJar(new URL(path)); Field configuration = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configuration.setAccessible(true); Configuration o = (Configuration)configuration.get(env); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map<String,Object> temp = (Map<String,Object>)confData.get(o); List<String> jarList = new ArrayList<>(); jarList.add(path); temp.put("pipeline.classpaths",jarList); tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS 'flinksql.function.udf.CxyTestReturnSelf'"); tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + " f_sequence INT,\n" + " f_random INT,\n" + " f_random_str STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='5',\n" + "\n" + " 'fields.f_sequence.kind'='sequence',\n" + " 'fields.f_sequence.start'='1',\n" + " 'fields.f_sequence.end'='1000',\n" + "\n" + " 'fields.f_random.min'='1',\n" + " 'fields.f_random.max'='1000',\n" + "\n" + " 'fields.f_random_str.length'='10'\n" + ")"); tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + " f_random_str STRING" + ") WITH (\n" + " 'connector' = 'print'\n" + ")"); tableEnvironment.executeSql( "insert into sinktable " + "select CxyTestReturnSelf(f_random_str) " + "from sourceTable"); } //动态加载Jar public static void loadJar(URL jarUrl) { //从URLClassLoader类加载器中获取类的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (accessible == false) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/