这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。 这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下, state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。
鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:47写道: > 您好: > > 我按照您说的试了看了一下watermark, > 发现可以 正常更新,相关的计算结果也没发现问题。 > 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了 > <http://apache-flink.147419.n8.nabble.com/file/t793/111.png> > 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。 > <http://apache-flink.147419.n8.nabble.com/file/t793/333.png> > <http://apache-flink.147419.n8.nabble.com/file/t793/222.png> > > > > Congxian Qiu wrote > > Hi > > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state > 越来越大的情况,或许可以检查下 > > watermark[1] > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html > > > > Best, > > Congxian > > > > > > 鱼子酱 < > > > 384939718@ > > >> 于2020年7月28日周二 下午2:45写道: > > > >> Hi,社区的各位大家好: > >> 我目前生产上面使用的是1.8.2版本,相对稳定 > >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, > >> 截止目前先后研究了1.10.1 1.11.1共2个大版本 > >> > >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 > >> 状态后端使用的是rocksdb 的增量模式 > >> StateBackend backend =new > >> RocksDBStateBackend("hdfs:///checkpoints-data/",true); > >> 设置了官网文档中找到的删除策略: > >> TableConfig tableConfig = streamTableEnvironment.getConfig(); > >> tableConfig.setIdleStateRetentionTime(Time.minutes(2), > >> Time.minutes(7)); > >> > >> 请问是我使用的方式不对吗? > >> > >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator > >> > >> > >> > >> 版本影响:flink1.10.1 flink1.11.1 > >> planner:blink planner > >> source : kafka source > >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> > >> > >> > >> > >> > >> sql: > >> insert into result > >> select request_time ,request_id ,request_cnt ,avg_resptime > >> ,stddev_resptime ,terminal_cnt > >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) > >> from > >> ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' > >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time > >> ,commandId as request_id > >> ,count(*) as request_cnt > >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime > >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as > >> stddev_resptime > >> from log > >> where > >> commandId in (104005 ,204005 ,404005) > >> and errCode=0 and attr=0 > >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId > >> > >> union all > >> > >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' > >> MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time > >> ,99999999 > >> ,count(*) as request_cnt > >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime > >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as > >> stddev_resptime > >> from log > >> where > >> commandId in (104005 ,204005 ,404005) > >> and errCode=0 and attr=0 > >> group by TUMBLE(times, INTERVAL '1' MINUTE) > >> ) > >> > >> > >> source: > >> > >> create table log ( > >> eventTime bigint > >> ,times timestamp(3) > >> …………………… > >> ,commandId integer > >> ,watermark for times as times - interval '5' second > >> ) > >> with( > >> 'connector' = 'kafka-0.10', > >> 'topic' = '……', > >> 'properties.bootstrap.servers' = '……', > >> 'properties.group.id' = '……', > >> 'scan.startup.mode' = 'latest-offset', > >> 'format' = 'json' > >> ) > >> > >> sink1: > >> create table result ( > >> request_time varchar > >> ,request_id integer > >> ,request_cnt bigint > >> ,avg_resptime double > >> ,stddev_resptime double > >> ,insert_time varchar > >> ) with ( > >> 'connector' = 'kafka-0.10', > >> 'topic' = '……', > >> 'properties.bootstrap.servers' = '……', > >> 'properties.group.id' = '……', > >> 'scan.startup.mode' = 'latest-offset', > >> 'format' = 'json' > >> ) > >> > >> > >> > >> > >> > >> -- > >> Sent from: http://apache-flink.147419.n8.nabble.com/ > >> > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li