一、环境:
    1、版本:1.12.0
    2、flink sql
    3、已经设置了setIdleStateRetention 为1小时
    4、状态后端是rocksDB, 增量模式
    5、源数据没有数据激增情况,任务已经跑了两天

二、详情
    具体sql见第三大点,就是普通的group by统计的
sql,然后设置setIdleStateRetention(3600)。目前观察两天了,checkpoint目录下面的shared文件夹的大小一直在增长,然后看文件夹里的文件是在一直更新,最早的文件也会消失。
   
我sql的groupby维度有加一个具体的分钟字段,所以一小时之后是不可能有一模一样的维度数据,那过期的数据正常是要被清理掉,那/checkpoint/shared/文件夹大小不断增长是否能说明过期的旧数据还没有被清理?
    这种情况应该怎么处理

三、sql具体

CREATE TABLE user_behavior (
   `request_ip` STRING,
   `request_time` BIGINT,
   `header` STRING ,
    //这个操作是将时间戳转为分钟
   `t_min` as cast(`request_time`-(`request_time` + 28800000)%60000 as
BIGINT),
   `ts` as TO_TIMESTAMP(FROM_UNIXTIME(`request_time`/1000-28800,'yyyy-MM-dd
HH:mm:ss')),
   WATERMARK FOR `ts` AS `ts` - INTERVAL '60' MINUTE) 
with (
   'connector' = 'kafka',
   ........ 
);


CREATE TABLE blackhole_table (
   `cnt` BIGINT,
   `lists` STRING
) WITH (
 'connector' = 'blackhole'
);


insert into blackhole_table 
select 
    count(*) as cnt, 
    LISTAGG(concat(`request_ip`, `header`, cast(`request_time` as STRING)))
as lists
from user_behavior 
group by `request_ip`,`header`,`t_min`;





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复