??????????flink-sql????????????????????????????State????

2021-01-05 文章 ??????
flink??flink-on-yarn??jobTimeStampcurrent_dateenv.setStateBackend(new
 
MemoryStateBackend())??job??State??connector??upsert-kafka??dwd??sqlsqldwdupsert-kafka
|  select
|   TO_DATE(cast(doi.DeliveryTime as String),'-MM-dd') 
as  days,
|   doi.UserId,
|   count(doi.Code) as   SendTime,
|   sum(doi.PayAmount / 100) as SendCashcharge,
|   sum(doi.PayAmount / 100 - ChargeAmount / 100 + 
UseBalance / 100) as  SendCashuse,
|   sum(doi.CashMoney / 100)as  SendCash
|from dwd_order_info doi
|where doi.DeliveryTime cast(current_date AS TIMESTAMP) 
and doi.OrderType = 29 and doi.Status = 50 and doi.Status < 60
|group by TO_DATE(cast(doi.DeliveryTime as 
String),'-MM-dd'), doi.UserId

flink-sql????-??????????state

2020-12-17 文章 ??????
?? flink 
sql??cdccdc??state??
 state??
val config: TableConfig = tabEnv.getConfig
config.setIdleStateRetention(Duration.ofHours(1))

flink sql state queryable ?

2020-10-15 文章 kandy.wang
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key 
才可以查询么。
诉求就是想知道state里到底存的啥

?????? ?????????????? Flink SQL State ????????

2020-05-26 文章 ????
Hi,


??




----
??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

LakeShen 

Re: 使用滚动窗口的 Flink SQL State 一直增加

2020-05-26 文章 Benchao Li
Hi,

这应该是个bug,之前也有人跟我提过,我没在意。现在看来应该的确是bug,我在本地复现了一下。我建了一个issue[1] 来跟踪和修复。

[1] https://issues.apache.org/jira/browse/FLINK-17942

LakeShen  于2020年5月26日周二 下午8:14写道:

> Hi,
>
> 看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by
>
> Best,
> LakeShen
>
> Benchao Li  于2020年5月26日周二 下午6:50写道:
>
> > Hi,
> >
> > 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
> > 1.
> 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
> > 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
> >
> > 瓜牛  于2020年5月26日周二 下午6:07写道:
> >
> > > hi,大家好!
> > >
> > > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
> > >
> > > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> > > role_id 的去重数
> > >
> > > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> > > 大小却一直是增加的,是 SQL 写得有问题吗?
> > >
> > > 麻烦大家帮我看一下
> > >
> > > 谢谢!
> > >
> > > 
> > >
> > > CREATE TABLE source_kafka (
> > >   dtime string,
> > >   wm as cast(dtime as TIMESTAMP(3)),
> > >   server string,
> > >   reason string,
> > >   role_id string,
> > >   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> > > ) WITH (
> > >   'connector.type' = 'kafka',
> > >   'connector.version' = '0.11',
> > >   'connector.topic' = 'xxx',
> > >   'connector.properties.bootstrap.servers' = 'xxx',
> > >   'connector.properties.zookeeper.connect' = 'xxx',
> > >   'connector.properties.group.id' = 'xxx',
> > >   'format.type' = 'json',
> > > )
> > > -
> > >
> > > CREATE TABLE sink_kafka (
> > >   window_time string,
> > >   server string,
> > >   reason string,
> > >   role_id_distinct_cnt BIGINT,
> > >   log_cnt BIGINT
> > > ) WITH (
> > >   'connector.type' = 'kafka',
> > >   'connector.version' = '0.11',
> > >   'connector.topic' = 'xxx',
> > >   'connector.properties.bootstrap.servers' = 'xxx',
> > >   'connector.properties.zookeeper.connect' = 'xxx',
> > >   'format.type' = 'json'
> > > )
> > > -
> > >
> > > INSERT INTO sink_kafka
> > > SELECT
> > >  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd
> > HH:mm:ss')
> > > AS window_time,
> > >  server,
> > >  reason,
> > >  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
> > >  COUNT(1) AS log_cnt
> > > FROM source_kafka
> > > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
> >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: 使用滚动窗口的 Flink SQL State 一直增加

2020-05-26 文章 LakeShen
Hi,

看下是否存在热点问题,我看你根据 server,reason 这两个字段来进行 group by

Best,
LakeShen

Benchao Li  于2020年5月26日周二 下午6:50写道:

> Hi,
>
> 看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
> 1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
> 2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?
>
> 瓜牛  于2020年5月26日周二 下午6:07写道:
>
> > hi,大家好!
> >
> > 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
> >
> > SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> > role_id 的去重数
> >
> > 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> > 大小却一直是增加的,是 SQL 写得有问题吗?
> >
> > 麻烦大家帮我看一下
> >
> > 谢谢!
> >
> > 
> >
> > CREATE TABLE source_kafka (
> >   dtime string,
> >   wm as cast(dtime as TIMESTAMP(3)),
> >   server string,
> >   reason string,
> >   role_id string,
> >   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'connector.properties.group.id' = 'xxx',
> >   'format.type' = 'json',
> > )
> > -
> >
> > CREATE TABLE sink_kafka (
> >   window_time string,
> >   server string,
> >   reason string,
> >   role_id_distinct_cnt BIGINT,
> >   log_cnt BIGINT
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'xxx',
> >   'connector.properties.bootstrap.servers' = 'xxx',
> >   'connector.properties.zookeeper.connect' = 'xxx',
> >   'format.type' = 'json'
> > )
> > -
> >
> > INSERT INTO sink_kafka
> > SELECT
> >  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd
> HH:mm:ss')
> > AS window_time,
> >  server,
> >  reason,
> >  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
> >  COUNT(1) AS log_cnt
> > FROM source_kafka
> > GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: 使用滚动窗口的 Flink SQL State 一直增加

2020-05-26 文章 Benchao Li
Hi,

看起来你的写法应该没有太大问题。可能有两个问题需要确认一下:
1. 你的watermark生成的正确吗?也就是说window的结果有正常输出么?如果watermark延迟很高,是会导致有多个window同时存在的
2. 你是怎么判断state上升呢?通过checkpoint看出来的?还是看到heap一直上升?

瓜牛  于2020年5月26日周二 下午6:07写道:

> hi,大家好!
>
> 现象:在用纯 Flink SQL 来运行滚动窗口的 job 时,state 的大小一直在增加
>
> SQL:source 和 sink 都是 kafka 表,使用事件时间和滚动窗口,每5分钟根据 server,reason 分组,统计条数和
> role_id 的去重数
>
> 疑惑:理论上使用滚动窗口的话旧窗口应该会被清除,state 的大小应该是稳定维持在一定大小(source数据量平稳),但 state
> 大小却一直是增加的,是 SQL 写得有问题吗?
>
> 麻烦大家帮我看一下
>
> 谢谢!
>
> 
>
> CREATE TABLE source_kafka (
>   dtime string,
>   wm as cast(dtime as TIMESTAMP(3)),
>   server string,
>   reason string,
>   role_id string,
>   WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.11',
>   'connector.topic' = 'xxx',
>   'connector.properties.bootstrap.servers' = 'xxx',
>   'connector.properties.zookeeper.connect' = 'xxx',
>   'connector.properties.group.id' = 'xxx',
>   'format.type' = 'json',
> )
> -
>
> CREATE TABLE sink_kafka (
>   window_time string,
>   server string,
>   reason string,
>   role_id_distinct_cnt BIGINT,
>   log_cnt BIGINT
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.11',
>   'connector.topic' = 'xxx',
>   'connector.properties.bootstrap.servers' = 'xxx',
>   'connector.properties.zookeeper.connect' = 'xxx',
>   'format.type' = 'json'
> )
> -
>
> INSERT INTO sink_kafka
> SELECT
>  DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd HH:mm:ss')
> AS window_time,
>  server,
>  reason,
>  COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
>  COUNT(1) AS log_cnt
> FROM source_kafka
> GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason



-- 

Best,
Benchao Li


?????????????? Flink SQL State ????????

2020-05-26 文章 ????
hi!

 Flink SQL  job state 

SQL??source ?? sink  kafka ??5 
server,reason  role_id 

state 
source state 
?? SQL ??

??

??



CREATE TABLE source_kafka (
  dtime string,
  wm as cast(dtime as TIMESTAMP(3)),
  server string,
  reason string,
  role_id string,
  WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'format.type' = 'json',
)
-

CREATE TABLE sink_kafka (
  window_time string,
  server string,
  reason string,
  role_id_distinct_cnt BIGINT,
  log_cnt BIGINT
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'format.type' = 'json'
)
-

INSERT INTO sink_kafka 
SELECT 
 DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd HH:mm:ss') AS 
window_time,
 server,
 reason,
 COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
 COUNT(1) AS log_cnt
FROM source_kafka 
GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason

Re: Frequently checkpoint failure, could make the flink sql state not clear?

2020-01-16 文章 Congxian Qiu
Hi,

AFAIK, whether a timer will fire is irrelevant to checkpoint success or not.

Best,
Congxian


LakeShen  于2020年1月16日周四 下午8:53写道:

> Hi community, now I am using Flink sql , and I set the retention time, As
> I all know is that Flink will set the timer for per key to clear their
> state, if Flink task always checkpoint failure, are the  key state cleared
> by timer?
> Thanks to your replay.
>


Frequently checkpoint failure, could make the flink sql state not clear?

2020-01-16 文章 LakeShen
Hi community, now I am using Flink sql , and I set the retention time, As I
all know is that Flink will set the timer for per key to clear their state,
if Flink task always checkpoint failure, are the  key state cleared by
timer?
Thanks to your replay.