一、环境: 1、版本:1.12.0 2、flink sql 3、已经设置了setIdleStateRetention 为1小时 4、状态后端是rocksDB, 增量模式 5、源数据没有数据激增情况,任务已经跑了两天
二、详情 具体sql见第三大点,就是普通的group by统计的 sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。 我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理? 这种情况应该怎么处理 三、sql具体 CREATE TABLE user_behavior ( `request_ip` STRING, `request_time` BIGINT, `header` STRING , //这个操作是将时间戳转为分钟 `t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as BIGINT), `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd HH:mm:ss')), WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) with ( 'connector' = 'kafka', ........ ); CREATE TABLE blackhole_table ( `cnt` BIGINT, `lists` STRING ) WITH ( 'connector' = 'blackhole' ); insert into blackhole_table select count(*) as cnt, LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING))) as lists from user_behavior group by `request_ip`,`header`,`t_min`; -- Sent from: http://apache-flink.147419.n8.nabble.com/