环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager
自定义函数的代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start, Integer end) { return s.substring(start.intValue(), end.intValue()); } } 提交到集群的sql代码 [ "DROP TABLE IF EXISTS source_datagen", "CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')", "DROP TABLE IF EXISTS print_sink", "CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr(f_random_str,1,4) AS str FROM source_datagen" ] controller的业务逻辑为 public String executeDefaultSql(String sql) throws Exception { log.info(sql); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port); env.setStateBackend(new HashMapStateBackend()); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION ); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); List<String> jars = new ArrayList<>(); jars.add("hdfs:///xxx/file/function/flink/d78721345f45422da269fa0411127eda0453523812.jar"); try { for (String jar : jars) { log.info(jar); EnvUtil.loadJar(URLUtil.url(jar)); } tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); tableEnv.executeSql("CREATE FUNCTION mysubstr AS 'com.example.udf.SubStr' LANGUAGE JAVA").print(); log.info("完成加载hdfs上的udf"); }catch (Exception e){ e.printStackTrace(); } List<String> list = JSON.parseArray(sql, String.class); TableResult result = null; for (String s : list) { result = tableEnv.executeSql(s); } String jobId = ""; log.info(result.getResultKind().name()); if (result.getJobClient().isPresent()) { log.info(JSON.toJSONString(result.getJobClient().get().getJobStatus())); jobId = result.getJobClient().get().getJobID().toString(); log.info("jobId:"+jobId); }else{ result.print(); } return jobId; } 报错信息为 2022-04-21 10:12:37 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.SubStr ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.example.udf.SubStr at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322) ... 10 more 如果将提交的SQL最后一段INSERT INTO 中自定义函数第一个入参改为自定义函数则正常打印 [ "DROP TABLE IF EXISTS source_datagen", "CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')", "DROP TABLE IF EXISTS print_sink", "CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr('ABCDEFGHIJK',1,4) AS str FROM source_datagen" ] 打印结果为 +I[1, 248, 2022-04-21 10:24:46.748, EDF] +I[2, 718, 2022-04-21 10:24:46.749, EDF] +I[3, 841, 2022-04-21 10:24:46.750, EDF] +I[4, 788, 2022-04-21 10:24:46.750, EDF] +I[5, 633, 2022-04-21 10:24:46.750, EDF] +I[6, 524, 2022-04-21 10:24:47.742, EDF] +I[7, 343, 2022-04-21 10:24:47.742, EDF] +I[8, 391, 2022-04-21 10:24:47.742, EDF] +I[9, 442, 2022-04-21 10:24:47.742, EDF] +I[10, 992, 2022-04-21 10:24:47.742, EDF] +I[11, 311, 2022-04-21 10:24:48.743, EDF] +I[12, 567, 2022-04-21 10:24:48.743, EDF] +I[13, 474, 2022-04-21 10:24:48.743, EDF] +I[14, 796, 2022-04-21 10:24:48.743, EDF] +I[15, 465, 2022-04-21 10:24:48.743, EDF] +I[16, 945, 2022-04-21 10:24:49.742, EDF] +I[17, 671, 2022-04-21 10:24:49.742, EDF] +I[18, 154, 2022-04-21 10:24:49.743, EDF] +I[19, 951, 2022-04-21 10:24:49.743, EDF] +I[20, 619, 2022-04-21 10:24:49.743, EDF] +I[21, 45, 2022-04-21 10:24:50.742, EDF] +I[22, 577, 2022-04-21 10:24:50.742, EDF] +I[23, 203, 2022-04-21 10:24:50.742, EDF] +I[24, 472, 2022-04-21 10:24:50.742, EDF] +I[25, 464, 2022-04-21 10:24:50.742, EDF] +I[26, 36, 2022-04-21 10:24:51.743, EDF] ... 在Apache Flink Dashboard上看每次处理业务的Task Managers都是node02 799590...@qq.com