Hi, >From the attachment, I saw there are IntervalJoin and GroupWindowAggregate operators. AFAIK the state retention for such operators is not controlled by `table.exec.state.ttl`. Could you share the operator-level state metrics to help identify the issue?
Best, Jane On Sun, Jun 25, 2023 at 10:38 AM Hangxiang Yu <master...@gmail.com> wrote: > Hi, neha. > > Could you share more information: > > 1. Which State Backend are you using? If it's RocksDB, is incremental > checkpointing enabled? > 2. Which specific operator is experiencing an increase in Checkpoint > data size? (You can check the Checkpoint size changes of different subtasks > from the Checkpoint History in the Flink UI) > 3. Has there been any change in data flow and input data during this > time? > > > On Fri, Jun 23, 2023 at 2:01 PM neha goyal <nehagoy...@gmail.com> wrote: > >> Hello, >> >> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to >> my table environment. Even after that, I can see continuous growth in >> savepoint size. >> >> I am attaching the screenshot of the job graph and savepoint metric. >> I am also adding the query that I am running on Kafka streams, It is a >> lengthy query. Any help would be highly appreciated. >> >> SELECT >> * >> from >> ( >> With Actuals as ( >> SELECT >> a1.orderId, >> a1.zoneId, >> a3.cityId, >> case >> when a2.status = 'delivered' then round( >> CAST( >> ( >> Cast(a2.server_time_stamp AS BIGINT) - >> Cast(a1.server_time_stamp AS BIGINT) >> ) AS DOUBLE >> ) / CAST(60000 AS DOUBLE), >> 4 >> ) >> when CAST( >> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS >> DOUBLE >> ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then >> CAST( >> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS >> DOUBLE >> ) / CAST(60 * 1000 AS DOUBLE) >> else null >> end AS P2D_inclusive, >> case >> when a2.status = 'delivered' then round( >> CAST( >> ( >> Cast(a2.server_time_stamp AS BIGINT) - >> Cast(a1.server_time_stamp AS BIGINT) >> ) AS DOUBLE >> ) / CAST(60000 AS DOUBLE), >> 4 >> ) >> else null >> end as P2D_exclusive, >> cast(lmTime as DOUBLE) as PP2D, >> case >> when a2.status = 'delivered' then exp( >> ( >> ( >> Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP >> ) /(60 * 1000) >> ) / 100 >> ) >> else 1 >> end as recency_wt, >> case >> when a2.status = 'delivered' then 1 >> else 0 >> end as delivered_flag, >> case >> when a2.status = 'delivered' then a2.proctime >> else a1.proctime >> end as proctime >> FROM >> ( >> select >> distinct orderId, >> zoneId, >> server_time_stamp, >> proctime >> from >> my_streamtable >> where >> status = 'pickedup' >> ) a1 >> LEFT JOIN ( >> select >> distinct orderId, >> zoneId, >> status, >> server_time_stamp, >> proctime >> from >> my_streamtable >> where >> status = 'delivered' >> ) a2 ON a1.orderId = a2.orderId >> AND a2.proctime BETWEEN a1.proctime - interval '60' minute >> AND a1.proctime + interval '60' minute >> INNER JOIN ( >> select >> distinct orderId, >> cityId, >> lmTime, >> proctime >> from >> my_streamtable2 >> where >> orderId is not null >> ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR) >> AND a3.proctime BETWEEN a1.proctime - interval '60' minute >> AND a1.proctime + interval '60' minute >> ), >> zone_count as( >> select >> zoneId, >> proctime() as proctime, >> COUNT(orderId) as counts_inclusive, >> sum(delivered_flag) as counts_exclusive, >> AVG(cityId) as cityId >> from >> Actuals >> where >> P2D_inclusive is not null >> group by >> HOP( >> proctime(), >> interval '5' minute, >> interval '60' minute >> ), >> zoneId >> ), >> zone_agg as ( >> select >> zoneId, >> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as >> zone_quotient_inclusive, >> sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as >> zone_quotient_exclusive, >> avg(cityId) as cityId, >> proctime() as proctime >> from >> Actuals >> where >> P2D_inclusive is not null >> group by >> HOP( >> proctime(), >> interval '5' minute, >> interval '60' minute >> ), >> zoneId >> ), >> city_agg as( >> select >> cityId, >> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as >> city_quotient_inclusive, >> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as >> city_quotient_exclusive, >> proctime() as proctime >> from >> Actuals >> where >> P2D_inclusive is not null >> group by >> HOP( >> proctime(), >> interval '5' minute, >> interval '60' minute >> ), >> cityId >> ), >> final as ( >> select >> zone_count.zoneId, >> zone_count.cityId, >> avg(zone_count.counts_inclusive) as counts_inclusive, >> avg(zone_count.counts_exclusive) as counts_exclusive, >> avg(zone_agg.zone_quotient_inclusive) as zone_quotient_inclusive, >> avg(city_agg.city_quotient_inclusive) as city_quotient_inclusive, >> avg(zone_agg.zone_quotient_exclusive) as zone_quotient_exclusive, >> avg(city_agg.city_quotient_exclusive) as city_quotient_exclusive >> from >> zone_count >> INNER join zone_agg on zone_count.zoneId = zone_agg.zoneId >> AND zone_agg.proctime BETWEEN zone_count.proctime - interval '60' >> minute >> AND zone_count.proctime >> INNER join city_agg on zone_count.cityId = city_agg.cityId >> AND city_agg.proctime BETWEEN zone_count.proctime - interval '60' >> minute >> AND zone_count.proctime >> group by >> HOP( >> proctime(), >> interval '5' minute, >> interval '60' minute >> ), >> zone_count.zoneId, >> zone_count.cityId >> ), >> new_final as ( >> select >> 'lm_feedback#' || cast(zoneId as varchar) as key, >> case >> when counts_inclusive > 5 >> and zone_quotient_inclusive > zone_quotient_exclusive then >> zone_quotient_inclusive >> when counts_exclusive > 5 then zone_quotient_exclusive >> when city_quotient_inclusive > city_quotient_exclusive then >> city_quotient_inclusive >> else city_quotient_exclusive >> end as value >> from >> final >> ) >> select >> key, >> case >> when new_final.value > 30 then 30 >> else new_final.value >> end as value, >> CURRENT_TIMESTAMP AS win_end, >> CURRENT_TIMESTAMP - 1800000 AS win_start, >> CURRENT_TIMESTAMP AS time_stamp >> from >> new_final >> ) >> > > > -- > Best, > Hangxiang. >