Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
flink 1.10.1 同样遇到这个问题 设置了ttl但是没有生效,请问题主解决该问题了吗? *sql*: select * from xx group by TUMBLE(monitor_processtime, INTERVAL '60' SECOND),topic_identity *60s的窗口,设置的过期时间是2分钟,但是checkpoint中状态还是在变大* *tEnv.getConfig().setIdleStateRetentionTime(Time.minutes(2), Time.minutes(5)); * -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
Hi, 你可以描述一下你的 Flink 版本,具体空闲状态保留时间的含义,请参考一下[1]: [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html#idle-state-retention-time Best, LakeShen claylin <1012539...@qq.com> 于2020年5月17日周日 下午10:24写道: > 过期时间是10-15分钟,按理说我是按照每分钟作为key分组的,应该很快就会过期,kafka数据流量的话每秒2-5M > > > --原始邮件-- > 发件人:"刘大龙" 发送时间:2020年5月17日(星期天) 晚上10:14 > 收件人:"user-zh" > 主题:Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > Hi, > 你的状态过期时间设置的是多久?对于普通的group by > agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > > > -原始邮件- > 发件人: claylin <1012539...@qq.com > 发送时间: 2020-05-17 17:41:13 (星期日) > 收件人: user-zh 抄送: > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > 链接这里nbsp; > https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > <https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4>; > > > > > --nbsp;原始邮件nbsp;------------------ > 发件人:nbsp;"tison" 发送时间:nbsp;2020年5月17日(星期天) 下午5:34 > 收件人:nbsp;"user-zh" > 主题:nbsp;Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 考虑把 SQL 贴成 gist 链接? > > Best, > tison. > > > claylin <1012539...@qq.comgt; 于2020年5月17日周日 下午5:32写道: > > gt; sql作业定义如下,也通过TableConfig设置了最大和最小idle > gt; > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > gt; TABLE yy_yapmnetwork_original > (nbsp;nbsp;nbsp;nbsp; happenAt > BIGINT,nbsp;nbsp;nbsp;nbsp; uid BIGINT, > gt;nbsp; appId > STRING,nbsp;nbsp;nbsp;nbsp; deviceId > STRING,nbsp;nbsp;nbsp;nbsp; appVer > STRING,nbsp;nbsp;nbsp;nbsp; dnsDur BIGINT, > gt;nbsp;nbsp;nbsp; useGlb > INT,nbsp;nbsp;nbsp;nbsp; hitCache > INT,nbsp;nbsp;nbsp;nbsp; requestSize > DOUBLE,nbsp;nbsp;nbsp;nbsp; responseSize > gt; DOUBLE,nbsp;nbsp;nbsp;nbsp; totalDur > BIGINT,nbsp;nbsp;nbsp;nbsp; url > STRING,nbsp;nbsp;nbsp;nbsp; statusCode INT, > gt;nbsp; prototype > STRING,nbsp;nbsp;nbsp;nbsp; netType > STRING,nbsp;nbsp;nbsp;nbsp; traceId > STRING,nbsp;nbsp;nbsp;nbsp; ts AS > gt; CAST(FROM_UNIXTIME(happenAt/1000) AS > TIMESTAMP(3)),nbsp;nbsp;nbsp;nbsp; WATERMARK FOR ts AS > gt; ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > gt; 'connector.version' = 'universal', 'connector.topic' = > 'yapm_metrics', > gt; 'connector.properties.zookeeper.connect' = 'localhost:2181', > gt; 'connector.properties.bootstrap.servers' = ' > kafkawx007-core001.yy.com:8101 > gt; ,kafkawx007-core002.yy.com:8101, > kafkawx007-core003.yy.com:8101', ' > gt; connector.properties.group.id' = > 'interface_success_rate_consumer', > gt; 'connector.startup-mode' = 'latest-offset', 'format.type' = > 'json' ); > gt; create table request_latency_tbl > (nbsp;nbsp;nbsp;nbsp; app_id > string,nbsp;nbsp;nbsp;nbsp; app_ver string, > gt;nbsp;nbsp;nbsp; net_type > string,nbsp;nbsp;nbsp;nbsp; prototype > string,nbsp;nbsp;nbsp;nbsp; url > string,nbsp;nbsp;nbsp;nbsp; status_code > gt; int,nbsp;nbsp;nbsp;nbsp; w_start > string,nbsp;nbsp;nbsp;nbsp; success_cnt > BIGINT,nbsp;nbsp;nbsp;nbsp; failure_cnt BIGINT, > gt;nbsp; total_cnt BIGINT ) with( 'connector.type' = > 'jdbc', 'connector.url' = > gt; > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;amp;characterEncoding=utf-8amp;amp;zeroDateTimeBehavior=convertToNullamp;amp;autoReconnect=true', > gt; 'connector.table' = 'request_latency_statistics', > 'connector.username' = > gt; 'yapm_metrics', 'connector.password' = '1234456', > gt; 'connector.write.flush.max-rows' = '1000', > 'connector.write.flush.interval' > gt; = '5s', 'connector.write.max-retries' = '2' ); create view > gt; request_1minutes_latencynbsp; > asnbsp;nbsp;nbsp;nbsp; select appId, appVer, netType, > prototype, > gt; url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, > gt;nbsp; count(distinct traceId) filter (where statusCode > in (200)) as successCnt, > gt;nbsp;nbsp;nbsp; count(distinct traceId) filter > (where statusCode not in (200)) as > gt; failureCnt,nbsp;nbsp;nbsp;nbsp; > count(distinct traceId) as > total_cntnbsp;nbsp;nbsp;nbsp; from > gt; yy_yapmnetwork_original group by appId, appVer, netType, > prototype, url, > gt; statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into > gt; request_latency_tblnbsp;nbsp;nbsp;nbsp; > select * fromnbsp; request_1minutes_latency; > > > -- > 刘大龙 > > 浙江大学 控制系 智能系统与控制研究所 工控新楼217 > 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 > Tel:18867547281
Re: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
Hi, 你的状态过期时间设置的是多久?对于普通的group by agg算子,目前使用的是定时器方式实现过期状态的清理,精确的过期时间是状态最后更新时间加上你设置的最小idle,如果状态在一直更新,是不会过期的;另外你的Kafka中的数据量有多大?比如每秒大概有多少条数据?你可以试试把过期时间设置的短一点,观察一下状态是否能比较稳定的不增大 > -原始邮件- > 发件人: claylin <1012539...@qq.com> > 发送时间: 2020-05-17 17:41:13 (星期日) > 收件人: user-zh > 抄送: > 主题: 回复: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > 链接这里https://gist.github.com/fairytalelin/e6be097a897d38b816bfe91f7ca8c0d4 > > > > > --原始邮件-- > 发件人:"tison" 发送时间:2020年5月17日(星期天) 下午5:34 > 收件人:"user-zh" > 主题:Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大 > > > > 考虑把 SQL 贴成 gist 链接? > > Best, > tison. > > > claylin <1012539...@qq.com 于2020年5月17日周日 下午5:32写道: > > sql作业定义如下,也通过TableConfig设置了最大和最小idle > > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > TABLE yy_yapmnetwork_original ( happenAt > BIGINT, uid BIGINT, > appId STRING, deviceId > STRING, appVer STRING, dnsDur > BIGINT, > useGlb INT, hitCache > INT, requestSize DOUBLE, > responseSize > DOUBLE, totalDur BIGINT, > url STRING, statusCode INT, > prototype STRING, netType > STRING, traceId STRING, ts AS > CAST(FROM_UNIXTIME(happenAt/1000) AS > TIMESTAMP(3)), WATERMARK FOR ts AS > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = > 'kafkawx007-core001.yy.com:8101 > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > connector.properties.group.id' = 'interface_success_rate_consumer', > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > create table request_latency_tbl ( app_id > string, app_ver string, > net_type string, prototype > string, url string, > status_code > int, w_start string, > success_cnt BIGINT, failure_cnt BIGINT, > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', > 'connector.url' = > > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=trueamp;characterEncoding=utf-8amp;zeroDateTimeBehavior=convertToNullamp;autoReconnect=true', > 'connector.table' = 'request_latency_statistics', 'connector.username' = > 'yapm_metrics', 'connector.password' = '1234456', > 'connector.write.flush.max-rows' = '1000', > 'connector.write.flush.interval' > = '5s', 'connector.write.max-retries' = '2' ); create view > request_1minutes_latency as select appId, > appVer, netType, prototype, > url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, > count(distinct traceId) filter (where statusCode in (200)) as > successCnt, > count(distinct traceId) filter (where statusCode not > in (200)) as > failureCnt, count(distinct traceId) as > total_cnt from > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into > request_latency_tbl select * from > request_1minutes_latency; -- 刘大龙 浙江大学 控制系 智能系统与控制研究所 工控新楼217 地址:浙江省杭州市浙大路38号浙江大学玉泉校区 Tel:18867547281
Re: 使用sql时候,设置了idle过期时间,但是状态还是一直变大
考虑把 SQL 贴成 gist 链接? Best, tison. claylin <1012539...@qq.com> 于2020年5月17日周日 下午5:32写道: > sql作业定义如下,也通过TableConfig设置了最大和最小idle > time,但是运行很长时间,查看sst的目录flink-io-8a4ac39d-b8c3-4baa-bd08-a063e6e964e9下,状态还是在一直变打,导致作业线程读写state很耗时间,最后作业处于一直反压状态,求大佬支招CREATE > TABLE yy_yapmnetwork_original ( happenAt BIGINT, uid BIGINT, > appId STRING, deviceId STRING, appVer STRING, dnsDur BIGINT, >useGlb INT, hitCache INT, requestSize DOUBLE, responseSize > DOUBLE, totalDur BIGINT, url STRING, statusCode INT, > prototype STRING, netType STRING, traceId STRING, ts AS > CAST(FROM_UNIXTIME(happenAt/1000) AS TIMESTAMP(3)), WATERMARK FOR ts AS > ts - INTERVAL '20' SECOND )with ( 'connector.type' = 'kafka', > 'connector.version' = 'universal', 'connector.topic' = 'yapm_metrics', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'kafkawx007-core001.yy.com:8101 > ,kafkawx007-core002.yy.com:8101,kafkawx007-core003.yy.com:8101', ' > connector.properties.group.id' = 'interface_success_rate_consumer', > 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' ); > create table request_latency_tbl ( app_id string, app_ver string, >net_type string, prototype string, url string, status_code > int, w_start string, success_cnt BIGINT, failure_cnt BIGINT, > total_cnt BIGINT ) with( 'connector.type' = 'jdbc', 'connector.url' = > 'jdbc:mysql://localhost:3315/yapm_metrics?useUnicode=truecharacterEncoding=utf-8zeroDateTimeBehavior=convertToNullautoReconnect=true', > 'connector.table' = 'request_latency_statistics', 'connector.username' = > 'yapm_metrics', 'connector.password' = '1234456', > 'connector.write.flush.max-rows' = '1000', 'connector.write.flush.interval' > = '5s', 'connector.write.max-retries' = '2' ); create view > request_1minutes_latency as select appId, appVer, netType, prototype, > url, statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm') w_start, > count(distinct traceId) filter (where statusCode in (200)) as successCnt, >count(distinct traceId) filter (where statusCode not in (200)) as > failureCnt, count(distinct traceId) as total_cnt from > yy_yapmnetwork_original group by appId, appVer, netType, prototype, url, > statusCode, DATE_FORMAT(ts, '-MM-dd HH:mm'); insert into > request_latency_tbl select * from request_1minutes_latency;