Re: flink table over 窗口报错
Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_value"), $("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 也是一样的 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 在 2021年8月4日 14:34,Caizhi Weng 写道: Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
Re: flink table over 窗口报错
Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 于2021年8月4日周三 下午2:30写道: > 代码如下: > EnvironmentSettings bbSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + > " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + > " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = '123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > .orderBy($("f_time_bak")) > .preceding("unbounded_range") > .following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > $("f_j"), > $("f_value").avg().over($("w")), > $("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > > > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Ordering must be defined on a time attribute. > > > 请问这是什么原因
flink table over 窗口报错
代码如下: EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(bbSettings); tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + " f_id INT,\n" + " f_h STRING,\n" + " f_l STRING,\n" + " f_j STRING,\n" + " f_value DOUBLE,\n" + " f_time TIMESTAMP(3)\n, " + " f_time_bak TIMESTAMP(3)\n, " + " PRIMARY KEY (f_id) NOT ENFORCED,\n" + " WATERMARK FOR f_time AS f_time \n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***',\n" + " 'table-name' = '123',\n" + " 'username' = '123',\n" + " 'password' = '123'\n" + ")"); tableEnv.registerFunction("GaussianFunction", new GaussianFunction()); Table table = tableEnv .from("t_yyp_test") .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) .orderBy($("f_time_bak")) .preceding("unbounded_range") .following(CURRENT_RANGE) .as("w")) .select($("f_h"), $("f_l"), $("f_j"), $("f_value").avg().over($("w")), $("f_value").varPop().over($("w")), $("f_value").stddevPop().over($("w"))); 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 Exception in thread "main" org.apache.flink.table.api.ValidationException: Ordering must be defined on a time attribute. 请问这是什么原因
Re: Flink sql 维表聚合问题请教
Hi! 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal table join 了。 carlc 于2021年8月4日周三 上午10:41写道: > 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > > -- 模拟需求(有点牵强...): > -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist > 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 > > -- 1. 创建user_blacklist表 > CREATE TABLE `user_blacklist` ( > `user_id` bigint(20) NOT NULL, > `create_time` datetime NOT NULL, > PRIMARY KEY (`user_id`,`create_time`) > ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > INSERT INTO user_blacklist (`user_id`, `create_time`) > VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), > (2,'2021-01-04 00:00:00'); > > -- 2. 模拟kafka数据: > -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 > 00:00:00"} > -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 > 00:00:00"} > > -- 操作步骤: > 当发送第1条kafka数据得到如下输出: > | OP| user_id| event_type | current_ts| bl_count | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | > 当再次发送第1条kafka数据得到如下输出: > | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | > | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | > > — SQL 如下: > > create table kafka_user_event > ( > `user_id` BIGINT, > `event_type` STRING, > `current_ts` timestamp(3), > `proc_time` AS PROCTIME() > ) WITH ( > 'connector' = 'kafka', > ... > ); > > create table mysql_user_blacklist > ( > user_id BIGINT, > create_time timestamp(3), > primary key (user_id,create_time) not enforced > ) WITH ( > 'connector' = 'jdbc', > … > ); > > create view v2_user_event as ( > select t1.`user_id` > , t1.`event_type` > , t1.`current_ts` > , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count > from kafka_user_event t1 > left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 > on t1.`user_id` = t2.`user_id` > where t1.`event_type` = 'LOGIN' > ); > > select * from v2_user_event; > >
??????kafka appender????
-- -- ??: "user-zh" <17610775...@163.com>; : 2021??8??4??(??) 12:45 ??: "user-zh@flink.apache.org"
??????kafka appender????
Hi error ? ?? CPU ?? IO ? Best JasonLee ??2021??08??4?? 12:25??datafollower<609326...@qq.com.INVALID> ?? hi allflink??5000+??error??kafka?? log4j2 ??kafkaappender ??error??kafka?? 1.kafka appender??error log10 2.error??kafka
kafka appender????
hi allflink??5000+??error??kafka?? log4j2 ??kafkaappender ??error??kafka?? 1.kafka appender??error log10 2.error??kafka
?????? $internal.yarn.log-config-file
?? -- -- ??: "user-zh"
flink sql统计IP出现次数TopN问题
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件 源表数据 user_name, source_ip, ts 张三, 100, 00:08 张三, 104, 00:12 张三, 100, 00:15 张三, 101, 00:35 张三, 100, 00:38 张三, 102, 00:40 张三, 102, 00:45 张三, 101, 00:47 张三, 100, 00:55 张三, 100, 01:15 李四, 200, 01:17 李四, 200, 01:19 李四, 200, 01:27 王五, 302, 01:35 目标表数据 user_name, source_ip, occur_times, window_start, window_end 张三, 100, 4, 00:00, 01:00 张三, 101, 2, 00:00, 01:00 张三, 102, 2, 00:00, 01:00 = create TEMPORARY table event_table ( user_name STRING, source_ip STRING, ts TIMESTAMP ) with ('connector' = 'datagen'); create TEMPORARY table alert_table ( user_name STRING, source_ip STRING, occur_times BIGINT, ts TIMESTAMP ) with ('connector' = 'print'); 请问 1. 用flink 1.12 sql要怎么实现? 2. 用flink 1.13 window TopN要如何实现? 谢谢!
Re:几个Flink 1.12. 2超时问题
应该可以从两个层面查一下: 1、调度层面。native application是先启动JM容器,然后由JM容器与K8s交互拉起TM的,可以看一下K8s日志,看看整个流程是否有瓶颈点,比如镜像的拉取,TM容器的启动之类。 2、网络层面。如果调度没有问题,各容器启动的过程和速度都很正常,那就要看网络层面是否存在瓶颈,必要的时候可以tcpdump一下。 在 2021-08-03 14:02:53,"Chenyu Zheng" 写道: 开发者您好, 我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。 在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。 我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢? 附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。 谢谢!
Re: 几个Flink 1.12. 2超时问题
是因为上游事件源速率比较大,需要提高并行度来匹配速率 谢谢! On 2021/8/3, 2:41 PM, "Ye Chen" wrote: 你好, 请问一下为什么要设置128并行度,这个数值有点太大了,出于什么考虑设置的 在 2021-08-03 14:02:53,"Chenyu Zheng" 写道: 开发者您好, 我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。 在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。 我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢? 附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。 谢谢!
Flink sql 维表聚合问题请教
请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ -- 模拟需求(有点牵强...): -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 -- 1. 创建user_blacklist表 CREATE TABLE `user_blacklist` ( `user_id` bigint(20) NOT NULL, `create_time` datetime NOT NULL, PRIMARY KEY (`user_id`,`create_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO user_blacklist (`user_id`, `create_time`) VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04 00:00:00'); -- 2. 模拟kafka数据: -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"} -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"} -- 操作步骤: 当发送第1条kafka数据得到如下输出: | OP| user_id| event_type | current_ts| bl_count | | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | 当再次发送第1条kafka数据得到如下输出: | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | — SQL 如下: create table kafka_user_event ( `user_id` BIGINT, `event_type` STRING, `current_ts` timestamp(3), `proc_time` AS PROCTIME() ) WITH ( 'connector' = 'kafka', ... ); create table mysql_user_blacklist ( user_id BIGINT, create_time timestamp(3), primary key (user_id,create_time) not enforced ) WITH ( 'connector' = 'jdbc', … ); create view v2_user_event as ( select t1.`user_id` , t1.`event_type` , t1.`current_ts` , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count from kafka_user_event t1 left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on t1.`user_id` = t2.`user_id` where t1.`event_type` = 'LOGIN' ); select * from v2_user_event;
Re: $internal.yarn.log-config-file
在我们的生产环境中使用YarnClient和YarnClusterDescriptor等api往yarn提交flink任务,此时我们使用 YarnConfigOptionsInternal里面的APPLICATION_LOG_CONFIG_FILE 来给 每一个任务设置它单独的log4j配置文件路径 Caizhi Weng 于2021年7月30日周五 上午11:02写道: > Hi! > > 实际上 yarn log config file 所在的 config 目录可以通过 FLINK_CONF_DIR > 这个环境变量指定。不过这要求客户端的 FLINK_CONF_DIR 目录和集群上的 FLINK_CONF_DIR 目录一样才可以。 > > comsir <609326...@qq.com.invalid> 于2021年7月30日周五 上午10:21写道: > > > hi all : > > > > > 像$internal.yarn.log-config-file,$internal.yarn.resourcemanager.enable-vcore-matching > > 这种internal的变量,有啥办法能自定义指定他的值? > > 如果不能,$internal.yarn.log-config-file为啥要这么设计,日志路径不让自定义指定? >
Re: 非对齐检查点还能保证exactly once语义吗
Hi! 1.13 的文档对 unaligned checkpoint 的介绍在 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints 和 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/stateful-stream-processing/#unaligned-checkpointing shimin huang 于2021年8月3日周二 下午5:06写道: > Hi! > 这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly > once的非对齐机制貌似没在官方文档上看到 > > Caizhi Weng 于2021年8月2日周一 下午7:28写道: > > > Hi! > > > > shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint > 原本就是不对齐的。 > > > > Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once > > 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。 > > > > 张锴 于2021年8月2日周一 下午7:20写道: > > > > > 这个原理能说明一下吗,咋做到的 > > > > > > 东东 于2021年8月2日周一 下午7:16写道: > > > > > > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。 > > > > > > > > 在 2021-08-02 18:53:11,"张锴" 写道: > > > > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N > > + > > > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次? > > > > > > > > > >
Re: 非对齐检查点还能保证exactly once语义吗
Hi! 这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly once的非对齐机制貌似没在官方文档上看到 Caizhi Weng 于2021年8月2日周一 下午7:28写道: > Hi! > > shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。 > > Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once > 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。 > > 张锴 于2021年8月2日周一 下午7:20写道: > > > 这个原理能说明一下吗,咋做到的 > > > > 东东 于2021年8月2日周一 下午7:16写道: > > > > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。 > > > > > > 在 2021-08-02 18:53:11,"张锴" 写道: > > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N > + > > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次? > > > > > >
Re:eventTime语义一些设备的数据总是迟到被丢弃
你好, 设备数据迟到多久?maxOutofOrderness设置一个合适的值,让迟到的数据到达窗口后再计算。 best regards 在 2021-08-03 15:15:37,"zwoi" <318666...@qq.com.INVALID> 写道: >hi > 我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标 value, > 在eventTime语义下watermark 生成方式为new >Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness), > 我需要对设备数据 做 keyby(id) >分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗?
Re: eventTime语义一些设备的数据总是迟到被丢弃
Hi! keyby 之后是做窗口聚合吗?如果是的话,SQL API 有一个配置项 table.exec.emit.late-fire.enabled 和 table.exec.emit.late-fire.delay 可以处理迟到数据。当这个功能 enabled 之后,若 delay = 0 则每来一条迟到数据就会输出一次修正的结果,若 delay > 0 则是窗口结束后每隔 delay 的时间输出一次。能接受的迟到的时长和 state ttl 是一样的。详细说明见 WindowEmitStrategy 这个类。 zwoi <318666...@qq.com.invalid> 于2021年8月3日周二 下午3:15写道: > hi > 我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标 > value, > 在eventTime语义下watermark 生成方式为new > Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness), > 我需要对设备数据 做 keyby(id) > 分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗?
eventTime????????????????????????????????
hi ?? id id(??), ?? time?? value?? ??eventTime??watermark ??new Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness)?? ?? keyby(id) ??