??????????flink-sql????????????????????????????State????
flink??flink-on-yarn??jobTimeStampcurrent_dateenv.setStateBackend(new MemoryStateBackend())??job??State??connector??upsert-kafka??dwd??sqlsqldwdupsert-kafka | select | TO_DATE(cast(doi.DeliveryTime as String),'-MM-dd') as days, | doi.UserId, | count(doi.Code) as SendTime, | sum(doi.PayAmount / 100) as SendCashcharge, | sum(doi.PayAmount / 100 - ChargeAmount / 100 + UseBalance / 100) as SendCashuse, | sum(doi.CashMoney / 100)as SendCash |from dwd_order_info doi |where doi.DeliveryTime cast(current_date AS TIMESTAMP) and doi.OrderType = 29 and doi.Status = 50 and doi.Status < 60 |group by TO_DATE(cast(doi.DeliveryTime as String),'-MM-dd'), doi.UserId
flink-sql????-??????????state
?? flink sql??cdccdc??state?? state?? val config: TableConfig = tabEnv.getConfig config.setIdleStateRetention(Duration.ofHours(1))
flink sql state queryable ?
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 才可以查询么。 诉求就是想知道state里到底存的啥
?????? ?????????????? Flink SQL State ????????
Hi, ?? ---- ??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942 LakeShen
Re: 使用滚动窗口的 Flink SQL State 一直增加
Hi, 这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。 [1] https://issues.apache.org/jira/browse/FLINK-17942 LakeShen 于2020年5月26日周二 下午8:14写道: > Hi, > > 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by > > Best, > LakeShen > > Benchao Li 于2020年5月26日周二 下午6:50写道: > > > Hi, > > > > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: > > 1. > 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 > > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? > > > > 瓜牛 于2020年5月26日周二 下午6:07写道: > > > > > hi,大家好! > > > > > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > > > > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > > > role_id 的去重数 > > > > > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > > > 大小却一直是增加的,是 SQL 写得有问题吗? > > > > > > 麻烦大家帮我看一下 > > > > > > 谢谢! > > > > > > > > > > > > CREATE TABLE source_kafka ( > > > dtime string, > > > wm as cast(dtime as TIMESTAMP(3)), > > > server string, > > > reason string, > > > role_id string, > > > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'connector.properties.group.id' = 'xxx', > > > 'format.type' = 'json', > > > ) > > > - > > > > > > CREATE TABLE sink_kafka ( > > > window_time string, > > > server string, > > > reason string, > > > role_id_distinct_cnt BIGINT, > > > log_cnt BIGINT > > > ) WITH ( > > > 'connector.type' = 'kafka', > > > 'connector.version' = '0.11', > > > 'connector.topic' = 'xxx', > > > 'connector.properties.bootstrap.servers' = 'xxx', > > > 'connector.properties.zookeeper.connect' = 'xxx', > > > 'format.type' = 'json' > > > ) > > > - > > > > > > INSERT INTO sink_kafka > > > SELECT > > > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd > > HH:mm:ss') > > > AS window_time, > > > server, > > > reason, > > > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > > > COUNT(1) AS log_cnt > > > FROM source_kafka > > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: 使用滚动窗口的 Flink SQL State 一直增加
Hi, 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by Best, LakeShen Benchao Li 于2020年5月26日周二 下午6:50写道: > Hi, > > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: > 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? > > 瓜牛 于2020年5月26日周二 下午6:07写道: > > > hi,大家好! > > > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > > role_id 的去重数 > > > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > > 大小却一直是增加的,是 SQL 写得有问题吗? > > > > 麻烦大家帮我看一下 > > > > 谢谢! > > > > > > > > CREATE TABLE source_kafka ( > > dtime string, > > wm as cast(dtime as TIMESTAMP(3)), > > server string, > > reason string, > > role_id string, > > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'xxx', > > 'connector.properties.bootstrap.servers' = 'xxx', > > 'connector.properties.zookeeper.connect' = 'xxx', > > 'connector.properties.group.id' = 'xxx', > > 'format.type' = 'json', > > ) > > - > > > > CREATE TABLE sink_kafka ( > > window_time string, > > server string, > > reason string, > > role_id_distinct_cnt BIGINT, > > log_cnt BIGINT > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = '0.11', > > 'connector.topic' = 'xxx', > > 'connector.properties.bootstrap.servers' = 'xxx', > > 'connector.properties.zookeeper.connect' = 'xxx', > > 'format.type' = 'json' > > ) > > - > > > > INSERT INTO sink_kafka > > SELECT > > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd > HH:mm:ss') > > AS window_time, > > server, > > reason, > > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > > COUNT(1) AS log_cnt > > FROM source_kafka > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason > > > > -- > > Best, > Benchao Li >
Re: 使用滚动窗口的 Flink SQL State 一直增加
Hi, 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下: 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升? 瓜牛 于2020年5月26日周二 下午6:07写道: > hi,大家好! > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加 > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和 > role_id 的去重数 > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state > 大小却一直是增加的,是 SQL 写得有问题吗? > > 麻烦大家帮我看一下 > > 谢谢! > > > > CREATE TABLE source_kafka ( > dtime string, > wm as cast(dtime as TIMESTAMP(3)), > server string, > reason string, > role_id string, > WATERMARK FOR wm AS wm - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'connector.properties.group.id' = 'xxx', > 'format.type' = 'json', > ) > - > > CREATE TABLE sink_kafka ( > window_time string, > server string, > reason string, > role_id_distinct_cnt BIGINT, > log_cnt BIGINT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = '0.11', > 'connector.topic' = 'xxx', > 'connector.properties.bootstrap.servers' = 'xxx', > 'connector.properties.zookeeper.connect' = 'xxx', > 'format.type' = 'json' > ) > - > > INSERT INTO sink_kafka > SELECT > DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd HH:mm:ss') > AS window_time, > server, > reason, > COUNT(DISTINCT role_id) AS role_id_distinct_cnt, > COUNT(1) AS log_cnt > FROM source_kafka > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason -- Best, Benchao Li
?????????????? Flink SQL State ????????
hi! Flink SQL job state SQL??source ?? sink kafka ??5 server,reason role_id state source state ?? SQL ?? ?? ?? CREATE TABLE source_kafka ( dtime string, wm as cast(dtime as TIMESTAMP(3)), server string, reason string, role_id string, WATERMARK FOR wm AS wm - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.group.id' = 'xxx', 'format.type' = 'json', ) - CREATE TABLE sink_kafka ( window_time string, server string, reason string, role_id_distinct_cnt BIGINT, log_cnt BIGINT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'format.type' = 'json' ) - INSERT INTO sink_kafka SELECT DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd HH:mm:ss') AS window_time, server, reason, COUNT(DISTINCT role_id) AS role_id_distinct_cnt, COUNT(1) AS log_cnt FROM source_kafka GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
Re: Frequently checkpoint failure, could make the flink sql state not clear?
Hi, AFAIK, whether a timer will fire is irrelevant to checkpoint success or not. Best, Congxian LakeShen 于2020年1月16日周四 下午8:53写道: > Hi community, now I am using Flink sql , and I set the retention time, As > I all know is that Flink will set the timer for per key to clear their > state, if Flink task always checkpoint failure, are the key state cleared > by timer? > Thanks to your replay. >
Frequently checkpoint failure, could make the flink sql state not clear?
Hi community, now I am using Flink sql , and I set the retention time, As I all know is that Flink will set the timer for per key to clear their state, if Flink task always checkpoint failure, are the key state cleared by timer? Thanks to your replay.