hive streaning 问题请教
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); tEnv.getConfig().addConfiguration( new Configuration() .set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(30))); tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS 'org.example.flink.TestFunca' LANGUAGE JAVA"); tEnv.executeSql("CREATE TABLE datagen (\n" + " name STRING,\n" + " pass STRING,\n" + " type1 INT,\n" + " t1 STRING,\n" + " t2 STRING,\n" + " ts AS localtimestamp,\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'datagen',\n" + " 'rows-per-second'='1',\n" + " 'fields.type1.min'='1',\n" + " 'fields.type1.max'='10'\n" + ")"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.executeSql("CREATE TABLE hive_table (\n" + " user_id STRING,\n" + " order_amount STRING\n" + ") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.delay'='1 h',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'\n" + ")"); tEnv.executeSql("insert into hive_table select t1,t2,TestFunca(type1),TestFunca(type1) from datagen"); Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) ... 18 more 发送自 Windows 10 版邮件应用
Flink SQL 1.11如何获取执行计划 & StreamGraph
1.11好像改了接口,用StreamExecutionEnvironment.getExecutionPlan()会报"No operators defined in streaming topology. Cannot execute." 1.10是可以正常执行的
Re:Re: Re: flink1.11加载外部jar包进行UDF注册
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常 2线程上下文类加载器是什么 不太明白这两点,可以写个代码例子看看吗 在 2020-10-15 19:47:20,"amen...@163.com" 写道: >追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗? >那这种设置env的方式有可能还会造成其他什么问题? > >best, >amenhub > >发件人: amen...@163.com >发送时间: 2020-10-15 19:22 >收件人: user-zh >主题: Re: Re: flink1.11加载外部jar包进行UDF注册 >非常感谢您的回复! > >对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗? >因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF >jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。 > >期待您的回复,谢谢~ > >best, >amenhub >发件人: cxydeve...@163.com >发送时间: 2020-10-15 17:46 >收件人: user-zh >主题: Re: flink1.11加载外部jar包进行UDF注册 >我们用方法是通过反射设置env的配置,增加pipeline.classpaths >具体代码如下 >public static void main(final String[] args) throws Exception { >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >EnvironmentSettings settings = >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >StreamTableEnvironment tableEnvironment = >StreamTableEnvironment.create(env, settings); >//String path = "file:///D:/cxy/idea_workspace/...xxx.jar"; >String path = "https://...xxx.jar;; >loadJar(new URL(path)); >Field configuration = >StreamExecutionEnvironment.class.getDeclaredField("configuration"); >configuration.setAccessible(true); >Configuration o = (Configuration)configuration.get(env); >Field confData = Configuration.class.getDeclaredField("confData"); >confData.setAccessible(true); >Map temp = (Map)confData.get(o); >List jarList = new ArrayList<>(); >jarList.add(path); >temp.put("pipeline.classpaths",jarList); >tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS >'flinksql.function.udf.CxyTestReturnSelf'"); >tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" + >" f_sequence INT,\n" + >" f_random INT,\n" + >" f_random_str STRING,\n" + >" ts AS localtimestamp,\n" + >" WATERMARK FOR ts AS ts\n" + >") WITH (\n" + >" 'connector' = 'datagen',\n" + >" 'rows-per-second'='5',\n" + >"\n" + >" 'fields.f_sequence.kind'='sequence',\n" + >" 'fields.f_sequence.start'='1',\n" + >" 'fields.f_sequence.end'='1000',\n" + >"\n" + >" 'fields.f_random.min'='1',\n" + >" 'fields.f_random.max'='1000',\n" + >"\n" + >" 'fields.f_random_str.length'='10'\n" + >")"); >tableEnvironment.executeSql("CREATE TABLE sinktable (\n" + >"f_random_str STRING" + >") WITH (\n" + >"'connector' = 'print'\n" + >")"); >tableEnvironment.executeSql( >"insert into sinktable " + >"select CxyTestReturnSelf(f_random_str) " + >"from sourceTable"); >} >//动态加载Jar >public static void loadJar(URL jarUrl) { >//从URLClassLoader类加载器中获取类的addURL方法 >Method method = null; >try { >method = URLClassLoader.class.getDeclaredMethod("addURL", >URL.class); >} catch (NoSuchMethodException | SecurityException e1) { >e1.printStackTrace(); >} >// 获取方法的访问权限 >boolean accessible = method.isAccessible(); >try { >//修改访问权限为可写 >if (accessible == false) { >method.setAccessible(true); >} >// 获取系统类加载器 >URLClassLoader classLoader = (URLClassLoader) >ClassLoader.getSystemClassLoader(); >//jar路径加入到系统url路径里 >method.invoke(classLoader, jarUrl); >} catch (Exception e) { >e.printStackTrace(); >} finally { >method.setAccessible(accessible); >} >} >-- >Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql 窗口函数对分区的这个列进行过滤
因为列会有默认值,也有真实的,我想取到真实的那个列,这个功能如何实现一下。想到了窗口函数,发现不能进行过滤,还有一种骚操作是求max min。之后if来操作。
关于内存大小设置以及预测
Hi all, 最近也是遇到比较常见的内存溢出的错误OutOfMemoryError: Java heap space,JM:1g TM:2g,简单粗暴的设置成2g、4g就可以运行了, INFO [] - Loading configuration property: cluster.termination-message-path, /flink/log/termination.log INFO [] - Final TaskExecutor Memory configuration: INFO [] - Total Process Memory: 3.906gb (4194304000 bytes) INFO [] - Total Flink Memory: 3.266gb (3506438138 bytes) INFO [] - Total JVM Heap Memory: 1.508gb (1619001315 bytes) INFO [] - Framework: 128.000mb (134217728 bytes) INFO [] - Task:1.383gb (1484783587 bytes) INFO [] - Total Off-heap Memory: 1.758gb (1887436823 bytes) INFO [] - Managed: 1.306gb (1402575276 bytes) INFO [] - Total JVM Direct Memory: 462.400mb (484861547 bytes) INFO [] - Framework: 128.000mb (134217728 bytes) INFO [] - Task: 0 bytes INFO [] - Network: 334.400mb (350643819 bytes) INFO [] - JVM Metaspace: 256.000mb (268435456 bytes) INFO [] - JVM Overhead:400.000mb (419430406 bytes) 请问有没有指标能够事前估算JM、TM需要的内存大小? Best
回复:回复: 回复: flink 自定义udf注册后不能使用
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年10月16日 15:45,amen...@163.com 写道: 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug best, amenhub 发件人: 史 正超 发送时间: 2020-10-16 15:26 收件人: user-zh@flink.apache.org 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:回复: 回复: flink 自定义udf注册后不能使用
我想问一下,这种udf方式,只能写成一个jar上传到集群中解释执行,还是说还可以直接在sql-client中,直接提交sql代码 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由网易邮箱大师定制 在2020年10月16日 15:45,amen...@163.com 写道: 是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug best, amenhub 发件人: 史 正超 发送时间: 2020-10-16 15:26 收件人: user-zh@flink.apache.org 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: 回复: flink 自定义udf注册后不能使用
是的,同款TEMPORARY FUNCTION错误,但是使用SYSTEMTEMPORARY就没有问题,不知是否是flink的bug best, amenhub 发件人: 史 正超 发送时间: 2020-10-16 15:26 收件人: user-zh@flink.apache.org 主题: 回复: 回复:回复: flink 自定义udf注册后不能使用 你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: 回复:回复: flink 自定义udf注册后不能使用
你这样创建试一下,或者换个名字试试 CREATE TEMPORARY SYSTEM FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 我刚才创建了一个 UpperCase的function,也是一样的错误,用TEMPORARY SYSTEM覆盖系统的函数(有可能存在)后,就可以了,换个字也可以 发件人: 奔跑的小飞袁 发送时间: 2020年10月16日 6:47 收件人: user-zh@flink.apache.org 主题: Re: 回复:回复: flink 自定义udf注册后不能使用 是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复:回复: flink 自定义udf注册后不能使用
是的,是我传参有问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复:回复: flink 自定义udf注册后不能使用
是的,我这个函数只需要一个参数 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:回复: flink 自定义udf注册后不能使用
你好,没看错的话,只有一个参? -- 发件人:奔跑的小飞袁 发送时间:2020年10月16日(星期五) 14:18 收件人:user-zh 主 题:Re: 回复: flink 自定义udf注册后不能使用 完整的sql执行文件 SET stream.enableCheckpointing=1000*60; SET stream.setParallelism=4; CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; -- Kafka cdbp zdao source 表 create TABLE cloud_behavior_source( operation VARCHAR, operation_channel VARCHAR, `time` VARCHAR, ip VARCHAR, lat VARCHAR, lng VARCHAR, user_id VARCHAR, device_id VARCHAR, imei VARCHAR, targets ARRAY>, product_name VARCHAR, product_version VARCHAR, product_vendor VARCHAR, platform VARCHAR, platform_version VARCHAR, `languaage` VARCHAR, locale VARCHAR, other_para MAP ) with ( 'connector'='kafka', 'topic'='cloud_behavior', 'properties.bootstrap.servers'='', 'properties.group.id'='testGroup', 'format'='avro', 'scan.startup.mode'='earliest-offset' ); -- Hbase zdao uv 统计 Sink 表 create TABLE cloud_behavior_sink( operation VARCHAR, operation_channel VARCHAR, `time` VARCHAR, ip VARCHAR, lat VARCHAR, lng VARCHAR, user_id VARCHAR, device_id VARCHAR, imei VARCHAR, product_name VARCHAR, product_version VARCHAR, product_vendor VARCHAR, platform VARCHAR, platform_version VARCHAR, `languaage` VARCHAR, locale VARCHAR )with ( 'connector'='filesystem', 'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db', 'format'='parquet', 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.rollover-interval'='10min' ); -- 业务过程 insert into cloud_behavior_sink select operation, operation_channel, `time`, ip, lat, lng, user_id, device_id, imei_encrypt(imei) AS imei, product_name, product_version, product_vendor, platform, platform_version, `languaage`, locale FROM cloud_behavior_source; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: flink 自定义udf注册后不能使用
完整的sql执行文件 SET stream.enableCheckpointing=1000*60; SET stream.setParallelism=4; CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; -- Kafka cdbp zdao source 表 create TABLE cloud_behavior_source( operation VARCHAR, operation_channel VARCHAR, `time` VARCHAR, ip VARCHAR, lat VARCHAR, lng VARCHAR, user_id VARCHAR, device_id VARCHAR, imei VARCHAR, targets ARRAY>, product_name VARCHAR, product_version VARCHAR, product_vendor VARCHAR, platform VARCHAR, platform_version VARCHAR, `languaage` VARCHAR, locale VARCHAR, other_para MAP ) with ( 'connector'='kafka', 'topic'='cloud_behavior', 'properties.bootstrap.servers'='', 'properties.group.id'='testGroup', 'format'='avro', 'scan.startup.mode'='earliest-offset' ); -- Hbase zdao uv 统计 Sink 表 create TABLE cloud_behavior_sink( operation VARCHAR, operation_channel VARCHAR, `time` VARCHAR, ip VARCHAR, lat VARCHAR, lng VARCHAR, user_id VARCHAR, device_id VARCHAR, imei VARCHAR, product_name VARCHAR, product_version VARCHAR, product_vendor VARCHAR, platform VARCHAR, platform_version VARCHAR, `languaage` VARCHAR, locale VARCHAR )with ( 'connector'='filesystem', 'path'='hdfs:///data_test/hongliang_song/working_sql_test_parquet.db', 'format'='parquet', 'sink.rolling-policy.file-size'='128MB', 'sink.rolling-policy.rollover-interval'='10min' ); -- 业务过程 insert into cloud_behavior_sink select operation, operation_channel, `time`, ip, lat, lng, user_id, device_id, imei_encrypt(imei) AS imei, product_name, product_version, product_vendor, platform, platform_version, `languaage`, locale FROM cloud_behavior_source; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink作业运行失败
我的是hadoop-2.10,flink-sql-connector-hive-1.2.2_2.11-1.11.1.jar。 > 2020年10月16日 下午12:01,Jeff Zhang 写道: > > 你是hadoop2 吗?我记得这个情况只有hadoop3才会出现 > > > gangzi <1139872...@qq.com> 于2020年10月16日周五 上午11:22写道: > >> TM >> 的CLASSPATH确实没有hadoop-mapreduce-client-core.jar。这个难道是hadoop集群的问题吗?还是一定要shade-hadoop包,官方不推荐shade-hadoop包了。 >> >>> 2020年10月16日 上午10:50,Jeff Zhang 写道: >>> >>> 你看看TM的log,里面有CLASSPATH的 >>> >>> gangzi <1139872...@qq.com> 于2020年10月16日周五 上午10:11写道: >>> 我按照flink官方文档的做法,在hadoop集群每个节点上都:export HADOOP_CLASSPATH =`hadoop classpath`,但是报:java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf >> 不知道这个是不是flink的bug,按照这个报错,是缺少:hadoop-mapreduce-client-core.jar这个jar包,但是这个包是在/usr/local/hadoop-2.10.0/share/hadoop/mapreduce/*:这个目录下的,这个目录是包含在HADOOP_CLASSPATH上的,按理说是会加载到的。 > 2020年10月16日 上午9:59,Shubin Ruan 写道: > > export HADOOP_CLASSPATH= >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >> >> > > -- > Best Regards > > Jeff Zhang
Re: 回复: flink 自定义udf注册后不能使用
这是我的udf声明 CREATE FUNCTION imei_encrypt AS 'com.intsig.flink.udf.IMEIEncrypt' LANGUAGE JAVA; 以下是udf实现 public class IMEIEncrypt extends ScalarFunction { public String eval(String column_type,String value) { EncryptUtils encryptUtils = new EncryptUtils(); return encryptUtils.encrypt(column_type,value); } } -- Sent from: http://apache-flink.147419.n8.nabble.com/