为何雪花算法的udf 每次执行的结果都一样,如何能像UUID一样每次变化
环境信息 flink-1.13.6_scala_2.11 java 1.8 hive 1.1.0-cdh5.16.2 hbase 1.2.0-cdh5.16.2 使用的是yarn-session模式 UDF 类在附件中 应用场景是将hive表和kafka表join后输出到hbase,因为在每3秒倒序查询一次hbase的10条结果,故想将雪花算法生成的ID作为hbase的rowkey好做排序。 但是发现udf注册到flink之后,多次执行得到的结果是一样的,没有达到自动增长的目的,在sql-clinet下执行下面的SQL CREATE FUNCTION IF NOT EXISTS SnowflakeId AS 'com.chinaoly.SnowflakeId' LANGUAGE JAVA; SELECT SnowflakeId() AS a UNION ALL SELECT SnowflakeId() AS b UNION ALL SELECT SnowflakeId() AS c UNION ALL SELECT SnowflakeId() AS d; 得到的结果为 Table program finished. Page: Last of 1 Updated: 14:44:19.665 a 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 1606178806577209344 或者 SELECT SnowflakeId() AS a, SnowflakeId() AS b, SnowflakeId() AS c, SnowflakeId() AS d; 得到的结果为 a b c d 16061816225490288641606181622549028864 16061816225490288641606181622549028864 而系统自带的UUID确能每次生成不一样的结果,如 SELECT UUID() AS a, UUID() AS b, UUID() AS c, UUID() AS d, UUID() AS e, UUID() AS f, UUID() AS g, UUID() AS h; 得到的结果为: a (CHAR(36) NOT NULL): e16cd19c-b636-4a26-bca3-94424fee8313 b (CHAR(36) NOT NULL): bb242ce0-6d73-428a-bc7e-6d35d44e7f1c c (CHAR(36) NOT NULL): 357e56b6-a4ca-4666-9d09-0addf84db421 d (CHAR(36) NOT NULL): deb70796-dc96-464c-a477-d19416fd0c0d e (CHAR(36) NOT NULL): 233bf58e-9869-42d0-ad67-c21f653143d3 使用 UNION ALL的方式执行,结果也是一样的 SELECT UUID() AS a UNION ALL SELECT UUID() AS b UNION ALL SELECT UUID() AS c; 得到的结果为 a e6955237-64f3-4b72-bf15-03098~ e6955237-64f3-4b72-bf15-03098~ e6955237-64f3-4b72-bf15-03098~ 因为需求的原因,输出字段是不固定的,每办法用输出字段作为rowkey,请问sink到hbase时如何设计rowkey才能随时间倒序取最新的10条呢? 谌祥,杭州 - java后端开发 - 大数据方向 799590...@qq.com
使用自定义函数报入参填写字段名报Cannot load user class: com.example.udf.SubStr,填写字符串正常
环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager 自定义函数的代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start, Integer end) { return s.substring(start.intValue(), end.intValue()); } } 提交到集群的sql代码 [ "DROP TABLE IF EXISTS source_datagen", "CREATE TABLE source_datagen(f_sequence INT,f_random INT,f_random_str STRING,ts AS localtimestamp,WATERMARK FOR ts AS ts) WITH ('connector' = 'datagen','rows-per-second'='5','fields.f_sequence.kind'='sequence','fields.f_sequence.start'='1','fields.f_sequence.end'='1000','fields.f_random.min'='1','fields.f_random.max'='1000','fields.f_random_str.length'='10')", "DROP TABLE IF EXISTS print_sink", "CREATE TABLE print_sink(id STRING,user_id STRING,`status` STRING,`str` STRING) WITH ('connector' = 'print')","INSERT INTO print_sink SELECT CAST(f_sequence AS STRING) AS id, CAST(f_random AS STRING) AS user_id, CAST(ts AS STRING) AS status,mysubstr(f_random_str,1,4) AS str FROM source_datagen" ] controller的业务逻辑为 public String executeDefaultSql(String sql) throws Exception { log.info(sql); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port); env.setStateBackend(new HashMapStateBackend()); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION ); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); List jars = new ArrayList<>(); jars.add("hdfs:///xxx/file/function/flink/d78721345f45422da269fa0411127eda0453523812.jar"); try { for (String jar : jars) { log.info(jar); EnvUtil.loadJar(URLUtil.url(jar)); } tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); tableEnv.executeSql("CREATE FUNCTION mysubstr AS 'com.example.udf.SubStr' LANGUAGE JAVA").print(); log.info("完成加载hdfs上的udf"); }catch (Exception e){ e.printStackTrace(); } List list = JSON.parseArray(sql, String.class); TableResult result = null; for (String s : list) { result = tableEnv.executeSql(s); } String jobId = ""; log.info(result.getResultKind().name()); if (result.getJobClient().isPresent()) { log.info(JSON.toJSONString(result.getJobClient().get().getJobStatus())); jobId = result.getJobClient().get().getJobID().toString(); log.info("jobId:"+jobId); }else{ result.print(); } return jobId; } 报错信息为 2022-04-21 10:12:37 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.SubStr ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.example.udf.SubStr at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass
Re: Re: Python callback server start failed
是下面这个类吗? 没有import org.apache.flink.table.functions.ScalarFunction 用flinksql创建Function的时候没有要求import ScalarFunction 下面是上传py文件的逻辑代码 String originalFilename = file.getOriginalFilename(); destFile = new File(System.getProperty("user.dir") + uploadTempPath, originalFilename); FileUtils.copyInputStreamToFile(file.getInputStream(), destFile); String localPath = destFile.getPath(); copyToHdfsFromLocalFile(localPath,dst); hadoopPath = dst+originalFilename; String serviceName = configuration.get("fs.defaultFS"); StreamTableEnvironment tableEnv = getTableEnv(); String pyStr = tableEnv.getConfig().getConfiguration().getString("python.files", ""); log.info(pyStr); String[] split = pyStr.split(","); List list = new ArrayList<>(Arrays.asList(split)); list.add(serviceName+hadoopPath); tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list)); tableEnv.executeSql("DROP FUNCTION IF EXISTS "+catalogName+"."+defaultDatabase+"."+functionName).print(); String createSql = "CREATE FUNCTION IF NOT EXISTS "+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "' LANGUAGE PYTHON"; tableEnv.executeSql(createSql).print(); 799590...@qq.com From: Dian Fu Date: 2022-04-19 14:20 To: user-zh; 799590989 Subject: Re: Re: Python callback server start failed NameError: name 'ScalarFunction' is not defined 你 import ScalarFunction了吗? On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID <799590...@qq.com.invalid> wrote: 以下是刚刚的报错日志,现在里面没有Python callback server start failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:257 |org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as /home/tetris/conf 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:219 |org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog 'myhive' 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |HiveCatalog.java:299 |org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive metastore 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |CatalogManager.java:262 |org.apache.flink.table.catalog.CatalogManager |Set the current default catalog as [myhive] and the current default database as [tetris]. 2022-04-19 13:58:11 |INFO |http-nio-9532-exec-3 |SqlTaskRunService.java:599 |com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 |EnvUtil.java:31 |com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : org.apache.hadoop.fs.FsUrlStreamHandlerFactory 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 |EnvUtil.java:55 |com.chinaoly.tetris.flink.util.EnvUtil |hdfs://chinaoly/tetris/file/function/flink/myReplace.py 2022-04-19 13:58:12 |INFO |http-nio-9532-exec-3 |SqlTaskRunService.java:621 |com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf 13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty watch file list. Disabling 2022-04-19 13:58:21 |INFO |http-nio-9532-exec-3 |PythonEnvUtils.java:284 |org.apache.flink.client.python.PythonEnvUtils |Starting Python process with environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, PSModulePath=C:\Program Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules, SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), FPS_BROWSER_USER_PROFILE_STRING=Default, PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265, PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model 94 Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, LOCALAPPDATA=C:\Users\Administrator\AppData\Local, ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program Files\Java\jdk1.8.0_261, OneDrive=C:\Users\Administrator\OneDrive, APPDATA=C:\Users\Administrator\AppData\Roaming, ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files (x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, Pa
Re: Re: Python callback server start failed
t org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199) at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:493) at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:137) at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:798) at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:806) at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1498) at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748) 799590...@qq.com From: Dian Fu Date: 2022-04-19 13:49 To: user-zh; 799590989 Subject: Re: Python callback server start failed 发一下完整的日志文件? On Tue, Apr 19, 2022 at 10:53 AM 799590...@qq.com.INVALID <799590...@qq.com.invalid> wrote: > 软件版本 > > flink-1.13.6 > python3.6.8 > 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install > apache-flink==1.13.6 > standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6 > > 问题: > > 1、调用python udf时会报如下错误 > Servlet.service() for servlet [dispatcherServlet] in context with path [] > threw exception [Request processing failed; nested exception is > org.apache.flink.table.api.ValidationException: SQL validation failed. > Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with > root cause > java.lang.RuntimeException: Python callback server start failed! > > 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗? > > python udf文件myReplace.py的内容 > > from pyflink.table.expressions import call > > class MyReplace(ScalarFunction): > def __init__(self): > self.factor = 12 > > def eval(self, s): > return s.replace('buy','sale') > > 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment(host,port); > env.setStateBackend(new HashMapStateBackend()); > env.enableCheckpointing(1000); > env.getCheckpointConfig().setCheckpointStorage(new > JobManagerCheckpointStorage()); > > env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); > env.getCheckpointConfig().setCheckpointTimeout(6); > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); > env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); > env.getCheckpointConfig().setExternalizedCheckpointCleanup( > CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION > ); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env,bsSettings); > > tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true); > > tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); > if (!catalogName.equals(tableEnv.getCurrentCatalog())) { > HiveCatalog hiveCatalog = new HiveCatalog(catalogName, > defaultDatabase, hiveConfDir); > tableEnv.registerCatalog(catalogName, hiveCatalog); > } > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > List jars = new ArrayList<>(); > List pys = new ArrayList<>(); > log.info("开始加载hdfs上的udf"); > String prefix = "/file/function/flink/"; > try { > EnvUtil.registerFactory(new FsUrlStreamHandlerFactory()); > for (String hdf : EnvUtil.listHdfs(prefix,configuration)) { > if (hdf.endsWith(".jar")) { > jars.add(hdf); > EnvUtil.loadJar(URLUtil.url(hdf)); > } else if (hdf.endsWith(".py")){ > pys.add(hdf); > } > } > > tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); > > tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,",")); > log.info("完成加载hdfs上的udf"); > }catch (Exception e){ > e.printStackTrace(); > } > > python文件存放在hdfs指定的路劲下面 > > 上传py文件后通过tableEnv.executeSql 执行了 > > CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS > 'myReplace.MyReplace' LANGUAGE PYTHON > > 先行感谢flink官方同学的辛苦付出 > > > > 799590...@qq.com >
Python callback server start failed
软件版本 flink-1.13.6 python3.6.8 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install apache-flink==1.13.6 standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6 问题: 1、调用python udf时会报如下错误 Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with root cause java.lang.RuntimeException: Python callback server start failed! 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗? python udf文件myReplace.py的内容 from pyflink.table.expressions import call class MyReplace(ScalarFunction): def __init__(self): self.factor = 12 def eval(self, s): return s.replace('buy','sale') 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port); env.setStateBackend(new HashMapStateBackend()); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(6); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION ); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true); tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); if (!catalogName.equals(tableEnv.getCurrentCatalog())) { HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir); tableEnv.registerCatalog(catalogName, hiveCatalog); } tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase); List jars = new ArrayList<>(); List pys = new ArrayList<>(); log.info("开始加载hdfs上的udf"); String prefix = "/file/function/flink/"; try { EnvUtil.registerFactory(new FsUrlStreamHandlerFactory()); for (String hdf : EnvUtil.listHdfs(prefix,configuration)) { if (hdf.endsWith(".jar")) { jars.add(hdf); EnvUtil.loadJar(URLUtil.url(hdf)); } else if (hdf.endsWith(".py")){ pys.add(hdf); } } tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,",")); log.info("完成加载hdfs上的udf"); }catch (Exception e){ e.printStackTrace(); } python文件存放在hdfs指定的路劲下面 上传py文件后通过tableEnv.executeSql 执行了 CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS 'myReplace.MyReplace' LANGUAGE PYTHON 先行感谢flink官方同学的辛苦付出 799590...@qq.com
flinksql执行时提示自定义UDF无法加载的
环境信息 flink-1.13.6_scala_2.11 java 1.8 使用的是standalonesession集群模式,node01为jobmanager node02和node03为taskmanager UDF代码 package com.example.udf; import org.apache.flink.table.functions.ScalarFunction; public class SubStr extends ScalarFunction { public String eval(String s, Integer start,Integer end) { return s.substring(start,end); } } udf的jar存储在hdfs上面,每次客户端提交sql都会从hdfs将udf的jar列表通过类加载器加载,并设置pipeline.jars值为hdfs的udf jar路径列表,在执行下面的sql时报错 insert into output_2455_5070_model_1649729386269 select tablekeymd5(user_id) as mm ,proctime(),MD5(CONCAT_WS(CAST(user_id AS STRING))) from (select distinct id as id, user_id as user_id, status as status from (select id,user_id,status from data_2455_5068_model) where status < '4') 2022-04-12 10:26:36 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.udf.TableKeyMd5 ClassLoader info: URL ClassLoader: Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:336) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:656) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:629) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:569) at org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:551) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: com.example.udf.TableKeyMd5 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) 类加载器代码: public static void loadJar(URL jarUrl) { Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 获取方法的访问权限 boolean accessible = method.isAccessible(); try { //修改访问权限为可写 if (!accessible) { method.setAccessible(true); } // 获取系统类加载器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路径加入到系统url路径里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } } /** * 如果已经存在factory,则加一个装饰器,将原来的factory和用来读取hdfs的factory都封装进去,按需使用 * * @param fsUrlStreamHandlerFactory * @throws Exception */ public static void registerFactory(final FsUrlStreamHandlerFactory fsUrlStreamHandlerFactory) throws Exception { log.info("registerFactory : " + fsUrlStreamHandlerFactory.getClass().getName()); final Field factoryField = URL.class.getDeclaredField("factory"); factoryField.setAccessible(true); final Field lockField = URL.class.getDeclaredField("streamHandlerLock"); lockField.setAccessible(true); synchronized (lockField.get(null)) { final URLStreamHandlerFactory originalUrlStreamHandlerFactory = (URLStreamHandlerFactory) factoryField.get(null); factoryField.set(null, null); URL.setURLStreamHandlerFactory(protocol -> { if ("hdfs".equals(protocol)) { return fsUrlStreamHandlerFactory.createURLStreamHandler(protocol); } else { return originalUrlStreamHandlerFactory.createURLStreamHandler(protocol); } }); } } 799590...@qq.com
flink1.1.36用SQL方式如何设置输出到hive表为upsert模式?
问题:flink1.1.36用SQL方式如何设置输出到hive表为upsert模式? flink:1.13.6 hive:1.1.1 hadoop:2.6.0-cdh5.16.2 纯SQL的方式,使用kafka作为source,中间的转换会有DISTINCT 或者 GROUP 操作,将计算结果sink到hive表,会报下面的错误 doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[id, user_id, status, EXPR$3] 在网上找了答案,说需要将sink表设置为upsert模式,尝试过按照下列方式创建sink表,创建表能成功,但提交INSERT INTO时还是报错 source表 CREATE TABLE data_2432_5074_model( id STRING, user_id STRING,status STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'person', 'properties.bootstrap.servers' = '192.168.9.116:9092', 'properties.group.id' = 'chinaoly-group', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'json.fail-on-missing-field'='false', 'json.ignore-parse-errors'='true' ) sink表 CREATE TABLE output_2432_5076_model_1649226175146( id STRING, user_id STRING, status STRING, my_dt timestamp ) TBLPROPERTIES ( 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'all', 'streaming-source.partition-order' = 'create-time', 'sink.partition-commit.watermark-time-zone' = 'Asia/Shanghai', 'sink.partition-commit.policy.kind' = 'metastore,success-file', 'write.upsert.enable' = 'true', 'streaming-source.monitor-interval' = '1 min' ) 计算逻辑 INSERT INTO output_2432_5076_model_1649226175146 SELECT DISTINCT id AS id, user_id AS user_id, status AS status ,proctime() FROM (SELECT * FROM data_2432_5074_model) WHERE status = '1' 万能的官方,能否给我答案,先谢谢了。 799590...@qq.com
Re: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
测试了,确实是没有开启checkpoint的原因,刚刚在集群开启了checkpoint存储,代码里面开启了checkpoint的配置,hive中能新增kafka表的数据了。 感谢感谢! 谌祥,杭州 - java后端开发 - 大数据方向 799590...@qq.com 发件人: Caizhi Weng 发送时间: 2022-03-14 10:15 收件人: flink中文邮件组 主题: Re: flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题 Hi! 流作业写入 hive 的数据需要在 checkpoint 之后才可见。我看你的代码里没有启用 checkpoint,可以试一下启用 checkpoint。 jdbc sink 是一个写入即可见的 sink,但它只能保证最终一致性。也就是说如果中途作业出现 failover,那么从上一次 checkpoint 到 failover 之间写入 jdbc sink 的数据就是“冗余”的,要被作业重启后的新数据覆盖才能回归到一致性。 filesystem sink 写入的时候应该创建的是一个临时文件,filesystem source 是不会读这个文件的,只有 checkpoint 之后才会把临时文件重命名。 799590...@qq.com.INVALID <799590...@qq.com.invalid> 于2022年3月12日周六 14:51写道: > > 软件版本 > flink:1.13.6 > hive:1.1.1 > hadoop:2.6.0-cdh5.16.2 > > 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下, > > 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。 > > > 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。 > > String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini"; > String keytab = "/home/tetris/conf/company.keytab"; > String principal = "company"; > System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH); > > Configuration configuration = new Configuration(); > configuration.set("hadoop.security.authentication", "kerberos"); > configuration.set("keytab.file", keytab); > configuration.setBoolean("hadoop.security.authorization", true); > configuration.set("kerberos.principal", principal); > UserGroupInformation.setConfiguration(configuration); > UserGroupInformation.loginUserFromKeytab(principal, keytab); > > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081); > StreamTableEnvironment flinkTableEnv = > StreamTableEnvironment.create(env,bsSettings); > > HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", > "/home/tetris/conf", "1.1.1"); > flinkTableEnv.registerCatalog("myhive",hiveCatalog); > flinkTableEnv.useCatalog("myhive"); > flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print(); > flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id > STRING,status STRING) WITH (\n" + > "'connector' = 'kafka',\n" + > "'topic' = 'person',\n" + > "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" + > "'properties.group.id' = 'testGroup',\n" + > "'scan.startup.mode' = 'latest-offset',\n" + > "'format' = 'json',\n" + > > "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" + > ")").print(); > flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print(); > flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id > STRING,status STRING)").print(); > flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > TableResult result = flinkTableEnv.executeSql("INSERT INTO > output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928"); > System.out.println(result.getJobClient().get().getJobID()); > > > > 谌祥,杭州 - java后端开发 - 大数据方向 > 799590...@qq.com >
flink1.13.6用tableApi的方式从kafka写入hive失败,也不报错,但是写入文件或者mysql没有问题
软件版本 flink:1.13.6 hive:1.1.1 hadoop:2.6.0-cdh5.16.2 通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下, 运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。 测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。 String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini"; String keytab = "/home/tetris/conf/company.keytab"; String principal = "company"; System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH); Configuration configuration = new Configuration(); configuration.set("hadoop.security.authentication", "kerberos"); configuration.set("keytab.file", keytab); configuration.setBoolean("hadoop.security.authorization", true); configuration.set("kerberos.principal", principal); UserGroupInformation.setConfiguration(configuration); UserGroupInformation.loginUserFromKeytab(principal, keytab); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081); StreamTableEnvironment flinkTableEnv = StreamTableEnvironment.create(env,bsSettings); HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", "/home/tetris/conf", "1.1.1"); flinkTableEnv.registerCatalog("myhive",hiveCatalog); flinkTableEnv.useCatalog("myhive"); flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print(); flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id STRING,status STRING) WITH (\n" + "'connector' = 'kafka',\n" + "'topic' = 'person',\n" + "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" + "'properties.group.id' = 'testGroup',\n" + "'scan.startup.mode' = 'latest-offset',\n" + "'format' = 'json',\n" + "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" + ")").print(); flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print(); flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id STRING,status STRING)").print(); flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); TableResult result = flinkTableEnv.executeSql("INSERT INTO output_2431_4930 SELECT id, user_id ,`status` FROM data_2431_4928"); System.out.println(result.getJobClient().get().getJobID()); 谌祥,杭州 - java后端开发 - 大数据方向 799590...@qq.com