Re: flink table over 窗口报错

2021-08-03 文章 yanyunpeng
Table table = tableEnv
 .from("t_yyp_test")
 .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
 .orderBy($("f_time"))
 .preceding("unbounded_range")
 .following(CURRENT_RANGE)
 .as("w"))
 .select($("f_value"),
 $("f_h"),
 $("f_l"),
 $("f_j"),
 $("f_value").avg().over($("w")),
 $("f_value").varPop().over($("w")),
 $("f_value").stddevPop().over($("w")));
也是一样的
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.

在 2021年8月4日 14:34,Caizhi Weng 写道:


Hi! order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。 yanyunpeng 
 于2021年8月4日周三 下午2:30写道: > 代码如下: > 
EnvironmentSettings bbSettings = > 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > 
TableEnvironment tableEnv = TableEnvironment.create(bbSettings); > 
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" + > " f_id INT,\n" + > " f_h 
STRING,\n" + > " f_l STRING,\n" + > " f_j STRING,\n" + > " f_value DOUBLE,\n" + 
> " f_time TIMESTAMP(3)\n, " + > " f_time_bak TIMESTAMP(3)\n, " + > " PRIMARY 
KEY (f_id) NOT ENFORCED,\n" + > " WATERMARK FOR f_time AS f_time \n" + > ") 
WITH (\n" + > " 'connector' = 'jdbc',\n" + > " 'url' = 'jdbc:mysql://***',\n" + 
> " 'table-name' = '123',\n" + > " 'username' = '123',\n" + > " 'password' = 
'123'\n" + > ")"); > tableEnv.registerFunction("GaussianFunction", new 
GaussianFunction()); > Table table = tableEnv > .from("t_yyp_test") > 
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j")) > 
.orderBy($("f_time_bak")) > .preceding("unbounded_range") > 
.following(CURRENT_RANGE) > .as("w")) > .select($("f_h"), > $("f_l"), > 
$("f_j"), > $("f_value").avg().over($("w")), > 
$("f_value").varPop().over($("w")), > $("f_value").stddevPop().over($("w"))); > 
> > 已经定义了eventTime 使用eventTIme或者别的时间字段排序都报错 > > > Exception in thread "main" 
org.apache.flink.table.api.ValidationException: > Ordering must be defined on a 
time attribute. > > > 请问这是什么原因

Re: flink table over 窗口报错

2021-08-03 文章 Caizhi Weng
Hi!

order by 的字段是 f_time_bak,但是 watermark 的字段是 f_time,这两个不一致。

yanyunpeng  于2021年8月4日周三 下午2:30写道:

> 代码如下:
> EnvironmentSettings bbSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
> tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
> "  f_id INT,\n" +
> "  f_h STRING,\n" +
> "  f_l STRING,\n" +
> "  f_j STRING,\n" +
> "  f_value DOUBLE,\n" +
> "  f_time TIMESTAMP(3)\n, " +
> "  f_time_bak TIMESTAMP(3)\n, " +
> "  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
> "  WATERMARK FOR f_time AS f_time \n" +
> ") WITH (\n" +
> "   'connector' = 'jdbc',\n" +
> "   'url' = 'jdbc:mysql://***',\n" +
> "   'table-name' = '123',\n" +
> "   'username' = '123',\n" +
> "   'password' = '123'\n" +
> ")");
> tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
> Table table = tableEnv
> .from("t_yyp_test")
> .window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
> .orderBy($("f_time_bak"))
> .preceding("unbounded_range")
> .following(CURRENT_RANGE)
> .as("w"))
> .select($("f_h"),
> $("f_l"),
> $("f_j"),
> $("f_value").avg().over($("w")),
> $("f_value").varPop().over($("w")),
> $("f_value").stddevPop().over($("w")));
>
>
> 已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错
>
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Ordering must be defined on a time attribute.
>
>
> 请问这是什么原因


flink table over 窗口报错

2021-08-03 文章 yanyunpeng
代码如下:
EnvironmentSettings bbSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(bbSettings);
tableEnv.executeSql("CREATE TABLE t_yyp_test (\n" +
"  f_id INT,\n" +
"  f_h STRING,\n" +
"  f_l STRING,\n" +
"  f_j STRING,\n" +
"  f_value DOUBLE,\n" +
"  f_time TIMESTAMP(3)\n, " +
"  f_time_bak TIMESTAMP(3)\n, " +
"  PRIMARY KEY (f_id) NOT ENFORCED,\n" +
"  WATERMARK FOR f_time AS f_time \n" +
") WITH (\n" +
"   'connector' = 'jdbc',\n" +
"   'url' = 'jdbc:mysql://***',\n" +
"   'table-name' = '123',\n" +
"   'username' = '123',\n" +
"   'password' = '123'\n" +
")");
tableEnv.registerFunction("GaussianFunction", new GaussianFunction());
Table table = tableEnv
.from("t_yyp_test")
.window(Over.partitionBy($("f_h"), $("f_l"), $("f_j"))
.orderBy($("f_time_bak"))
.preceding("unbounded_range")
.following(CURRENT_RANGE)
.as("w"))
.select($("f_h"),
$("f_l"),
$("f_j"),
$("f_value").avg().over($("w")),
$("f_value").varPop().over($("w")),
$("f_value").stddevPop().over($("w")));


已经定义了eventTime  使用eventTIme或者别的时间字段排序都报错


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Ordering must be defined on a time attribute.  


请问这是什么原因

Re: Flink sql 维表聚合问题请教

2021-08-03 文章 Caizhi Weng
Hi!

这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。

为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
table join 了。

carlc  于2021年8月4日周三 上午10:41写道:

> 请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>
> -- 模拟需求(有点牵强...):
> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>
> -- 1. 创建user_blacklist表
> CREATE TABLE `user_blacklist` (
> `user_id` bigint(20) NOT NULL,
> `create_time` datetime NOT NULL,
> PRIMARY KEY (`user_id`,`create_time`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> INSERT INTO user_blacklist (`user_id`, `create_time`)
> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> (2,'2021-01-04 00:00:00');
>
> -- 2. 模拟kafka数据:
> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> 00:00:00"}
> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> 00:00:00"}
>
> -- 操作步骤:
> 当发送第1条kafka数据得到如下输出:
> | OP| user_id| event_type | current_ts| bl_count |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> 当再次发送第1条kafka数据得到如下输出:
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>
> — SQL 如下:
>
> create table kafka_user_event
> (
> `user_id` BIGINT,
> `event_type` STRING,
> `current_ts` timestamp(3),
> `proc_time` AS PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> ...
> );
>
> create table mysql_user_blacklist
> (
> user_id BIGINT,
> create_time timestamp(3),
> primary key (user_id,create_time) not enforced
> ) WITH (
> 'connector' = 'jdbc',
> …
> );
>
> create view v2_user_event as (
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> from kafka_user_event t1
> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
> on t1.`user_id` = t2.`user_id`
> where t1.`event_type` = 'LOGIN'
> );
>
> select * from v2_user_event;
>
>


??????kafka appender????

2021-08-03 文章 datafollower





--  --
??: 
   "user-zh"

<17610775...@163.com>;
: 2021??8??4??(??) 12:45
??: "user-zh@flink.apache.org"

??????kafka appender????

2021-08-03 文章 JasonLee
Hi


 error ? ?? CPU ??  IO 
?


Best
JasonLee


??2021??08??4?? 12:25??datafollower<609326...@qq.com.INVALID> ??
hi allflink??5000+??error??kafka?? 
log4j2 ??kafkaappender
??error??kafka??

1.kafka appender??error log10
2.error??kafka

kafka appender????

2021-08-03 文章 datafollower
hi allflink??5000+??error??kafka?? 
log4j2 ??kafkaappender
??error??kafka??

1.kafka appender??error log10
2.error??kafka

?????? $internal.yarn.log-config-file

2021-08-03 文章 comsir
??




--  --
??: 
   "user-zh"



flink sql统计IP出现次数TopN问题

2021-08-03 文章 casel.chen
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件


源表数据
user_name, source_ip, ts
张三, 100, 00:08
张三, 104, 00:12
张三, 100, 00:15
张三, 101, 00:35
张三, 100, 00:38
张三, 102, 00:40
张三, 102, 00:45
张三, 101, 00:47
张三, 100, 00:55


张三, 100, 01:15
李四, 200, 01:17
李四, 200, 01:19
李四, 200, 01:27
王五, 302, 01:35


目标表数据
user_name, source_ip, occur_times, window_start, window_end
张三, 100, 4, 00:00, 01:00
张三, 101, 2, 00:00, 01:00
张三, 102, 2, 00:00, 01:00


=
create TEMPORARY table event_table (
user_name STRING, source_ip STRING, ts TIMESTAMP
  )
with ('connector' = 'datagen');


create TEMPORARY table alert_table (
user_name STRING,
source_ip STRING,
occur_times BIGINT,
ts TIMESTAMP
  )
with ('connector' = 'print');


请问
1. 用flink 1.12 sql要怎么实现? 
2. 用flink 1.13 window TopN要如何实现?


谢谢!

Re:几个Flink 1.12. 2超时问题

2021-08-03 文章 东东


应该可以从两个层面查一下:
1、调度层面。native 
application是先启动JM容器,然后由JM容器与K8s交互拉起TM的,可以看一下K8s日志,看看整个流程是否有瓶颈点,比如镜像的拉取,TM容器的启动之类。

2、网络层面。如果调度没有问题,各容器启动的过程和速度都很正常,那就要看网络层面是否存在瓶颈,必要的时候可以tcpdump一下。







在 2021-08-03 14:02:53,"Chenyu Zheng"  写道:

开发者您好,

 

我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native 
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。

 

在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。

 

我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢?

 

附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。

 

谢谢!

 

Re: 几个Flink 1.12. 2超时问题

2021-08-03 文章 Chenyu Zheng
是因为上游事件源速率比较大,需要提高并行度来匹配速率

谢谢!

On 2021/8/3, 2:41 PM, "Ye Chen"  wrote:

你好,
请问一下为什么要设置128并行度,这个数值有点太大了,出于什么考虑设置的






在 2021-08-03 14:02:53,"Chenyu Zheng"  写道:

开发者您好,



我正在尝试在Kubernetes上部署Flink 1.12.2,使用的是native 
application部署模式。但是在测试中发现,当将作业并行度调大之后,各种timeout时有发生。根据监控看,JM和TM容器的cpu和内存都没有使用到k8s给分配的量。



在尝试调大akka.ask.timeout至1分钟,和heartbeat.timeout至2分钟之后,各种超时现象得以缓解。




我的问题是,当设置较大并行度(比如128)时,akka超时和心跳超时的各种现象都是正常的吗?如果不正常,需要用什么方式去troubleshot问题的根源呢?另外单纯一味调大各个组件的超时时间,会带来什么负面作用呢?



附件中有akka超时的jobmanager日志,TaskManager心跳超时日志稍后会发上来。



谢谢!




Flink sql 维表聚合问题请教

2021-08-03 文章 carlc
请教下如何在维表上做聚合操作?  如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~

-- 模拟需求(有点牵强...):
-- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 
user_id 在维表中的次数 -> 即: 在维表上做聚合操作

-- 1. 创建user_blacklist表
CREATE TABLE `user_blacklist` (
`user_id` bigint(20) NOT NULL,
`create_time` datetime NOT NULL,
PRIMARY KEY (`user_id`,`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO user_blacklist (`user_id`, `create_time`) 
VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04 
00:00:00');

-- 2. 模拟kafka数据:
-- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"}
-- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"}

-- 操作步骤:
当发送第1条kafka数据得到如下输出:
| OP| user_id| event_type | current_ts| bl_count |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
当再次发送第1条kafka数据得到如下输出:
| +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |

— SQL 如下: 

create table kafka_user_event
(
`user_id` BIGINT,
`event_type` STRING,
`current_ts` timestamp(3),
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'kafka',
...
);

create table mysql_user_blacklist
(
user_id BIGINT,
create_time timestamp(3),
primary key (user_id,create_time) not enforced
) WITH (
'connector' = 'jdbc',
…
);

create view v2_user_event as (
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
, count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
from kafka_user_event t1
left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on 
t1.`user_id` = t2.`user_id`
where t1.`event_type` = 'LOGIN'
);

select * from v2_user_event;



Re: $internal.yarn.log-config-file

2021-08-03 文章 liwei li
在我们的生产环境中使用YarnClient和YarnClusterDescriptor等api往yarn提交flink任务,此时我们使用
YarnConfigOptionsInternal里面的APPLICATION_LOG_CONFIG_FILE 来给
每一个任务设置它单独的log4j配置文件路径


Caizhi Weng  于2021年7月30日周五 上午11:02写道:

> Hi!
>
> 实际上 yarn log config file 所在的 config 目录可以通过 FLINK_CONF_DIR
> 这个环境变量指定。不过这要求客户端的 FLINK_CONF_DIR 目录和集群上的 FLINK_CONF_DIR 目录一样才可以。
>
> comsir <609326...@qq.com.invalid> 于2021年7月30日周五 上午10:21写道:
>
> > hi all :
> >
> >
> 像$internal.yarn.log-config-file,$internal.yarn.resourcemanager.enable-vcore-matching
> > 这种internal的变量,有啥办法能自定义指定他的值?
> > 如果不能,$internal.yarn.log-config-file为啥要这么设计,日志路径不让自定义指定?
>


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-03 文章 Caizhi Weng
Hi!

1.13 的文档对 unaligned checkpoint 的介绍在
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints
和
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/stateful-stream-processing/#unaligned-checkpointing

shimin huang  于2021年8月3日周二 下午5:06写道:

> Hi!
> 这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly
> once的非对齐机制貌似没在官方文档上看到
>
> Caizhi Weng  于2021年8月2日周一 下午7:28写道:
>
> > Hi!
> >
> > shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint
> 原本就是不对齐的。
> >
> > Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
> > 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
> >
> > 张锴  于2021年8月2日周一 下午7:20写道:
> >
> > > 这个原理能说明一下吗,咋做到的
> > >
> > > 东东  于2021年8月2日周一 下午7:16写道:
> > >
> > > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
> > > >
> > > > 在 2021-08-02 18:53:11,"张锴"  写道:
> > > >
> >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N
> > +
> > > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
> > > >
> > >
> >
>


Re: 非对齐检查点还能保证exactly once语义吗

2021-08-03 文章 shimin huang
Hi!
这个有相关的文档介绍吗,1.11版本左右简单了解过exactly once非对齐机制这块,1.13版本的exactly
once的非对齐机制貌似没在官方文档上看到

Caizhi Weng  于2021年8月2日周一 下午7:28写道:

> Hi!
>
> shimin huang 说的可能是原本的 at least once 的 checkpoint 机制,这种 checkpoint 原本就是不对齐的。
>
> Flink 1.13 完善了 exactly once 条件下的不对齐 checkpoint 机制,因此这是能保证 exactly once
> 的。实现原理简单来说就是把还没处理的数据一起写到 state 里,下次恢复的时候把这些还没处理的数据也恢复出来接着处理。
>
> 张锴  于2021年8月2日周一 下午7:20写道:
>
> > 这个原理能说明一下吗,咋做到的
> >
> > 东东  于2021年8月2日周一 下午7:16写道:
> >
> > > 对于每一个subtask,边界仍然是清晰的,所以精确一次可以保证,只不过ck会变大。
> > >
> > > 在 2021-08-02 18:53:11,"张锴"  写道:
> > > >flink最新特性中有非对齐检查点的特性,可以用来解决一些反压下的任务,但如果用了这个,还能保证精确一次吗?对齐的检查点有清晰的快照N~N
> +
> > > >1之间的边界,这个会将数据混在一起,如何在恢复的时候保证精确一次?
> > >
> >
>


Re:eventTime语义一些设备的数据总是迟到被丢弃

2021-08-03 文章 Ye Chen
你好,
设备数据迟到多久?maxOutofOrderness设置一个合适的值,让迟到的数据到达窗口后再计算。



best regards







在 2021-08-03 15:15:37,"zwoi" <318666...@qq.com.INVALID> 写道:
>hi
>       我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标 value,
>       在eventTime语义下watermark 生成方式为new 
>Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness),
>       我需要对设备数据 做 keyby(id) 
>分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗? 


Re: eventTime语义一些设备的数据总是迟到被丢弃

2021-08-03 文章 Caizhi Weng
Hi!

keyby 之后是做窗口聚合吗?如果是的话,SQL API 有一个配置项 table.exec.emit.late-fire.enabled
和 table.exec.emit.late-fire.delay 可以处理迟到数据。当这个功能 enabled 之后,若 delay = 0
则每来一条迟到数据就会输出一次修正的结果,若 delay > 0 则是窗口结束后每隔 delay 的时间输出一次。能接受的迟到的时长和 state
ttl 是一样的。详细说明见 WindowEmitStrategy 这个类。

zwoi <318666...@qq.com.invalid> 于2021年8月3日周二 下午3:15写道:

> hi
>        我的设备数据是这样的, 设备id id(设备的唯一标识), 时间戳 time,要处理的指标
> value,
>        在eventTime语义下watermark 生成方式为new
> Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness),
>        我需要对设备数据 做 keyby(id)
> 分组后再计算,但总有几个设备数据迟到,导致这几个设备数据就一直计算不到,请问有什么解决办法吗? 


eventTime????????????????????????????????

2021-08-03 文章 zwoi
hi
       ?? id id(??), 
?? time?? value??
       ??eventTime??watermark ??new 
Watermark(Math.max(time, currentMaxTimestamp) - maxOutofOrderness)??
        ?? keyby(id) 
??