Hi,

>From the attachment, I saw there are IntervalJoin and GroupWindowAggregate
operators. AFAIK the state retention for such operators is not controlled
by `table.exec.state.ttl`.
Could you share the operator-level state metrics to help identify the
issue?

Best,
Jane

On Sun, Jun 25, 2023 at 10:38 AM Hangxiang Yu <master...@gmail.com> wrote:

> Hi, neha.
>
> Could you share more information:
>
>    1. Which State Backend are you using? If it's RocksDB, is incremental
>    checkpointing enabled?
>    2. Which specific operator is experiencing an increase in Checkpoint
>    data size? (You can check the Checkpoint size changes of different subtasks
>    from the Checkpoint History in the Flink UI)
>    3. Has there been any change in data flow and input data during this
>    time?
>
>
> On Fri, Jun 23, 2023 at 2:01 PM neha goyal <nehagoy...@gmail.com> wrote:
>
>> Hello,
>>
>> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to
>> my table environment. Even after that, I can see continuous growth in
>> savepoint size.
>>
>> I am attaching the screenshot of the job graph and savepoint metric.
>> I am also adding the query that I am running on Kafka streams, It is a
>> lengthy query. Any help would be highly appreciated.
>>
>> SELECT
>>   *
>> from
>>   (
>>     With Actuals as (
>>       SELECT
>>         a1.orderId,
>>         a1.zoneId,
>>         a3.cityId,
>>         case
>>           when a2.status = 'delivered' then round(
>>             CAST(
>>               (
>>                 Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>               ) AS DOUBLE
>>             ) / CAST(60000 AS DOUBLE),
>>             4
>>           )
>>           when CAST(
>>             CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>           ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then
>> CAST(
>>             CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>           ) / CAST(60 * 1000 AS DOUBLE)
>>           else null
>>         end AS P2D_inclusive,
>>         case
>>           when a2.status = 'delivered' then round(
>>             CAST(
>>               (
>>                 Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>               ) AS DOUBLE
>>             ) / CAST(60000 AS DOUBLE),
>>             4
>>           )
>>           else null
>>         end as P2D_exclusive,
>>         cast(lmTime as DOUBLE) as PP2D,
>>         case
>>           when a2.status = 'delivered' then exp(
>>             (
>>               (
>>                 Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP
>>               ) /(60 * 1000)
>>             ) / 100
>>           )
>>           else 1
>>         end as recency_wt,
>>         case
>>           when a2.status = 'delivered' then 1
>>           else 0
>>         end as delivered_flag,
>>         case
>>           when a2.status = 'delivered' then a2.proctime
>>           else a1.proctime
>>         end as proctime
>>       FROM
>>         (
>>           select
>>             distinct orderId,
>>             zoneId,
>>             server_time_stamp,
>>             proctime
>>           from
>>             my_streamtable
>>           where
>>             status = 'pickedup'
>>         ) a1
>>         LEFT JOIN (
>>           select
>>             distinct orderId,
>>             zoneId,
>>             status,
>>             server_time_stamp,
>>             proctime
>>           from
>>             my_streamtable
>>           where
>>             status = 'delivered'
>>         ) a2 ON a1.orderId = a2.orderId
>>         AND a2.proctime BETWEEN a1.proctime - interval '60' minute
>>         AND a1.proctime + interval '60' minute
>>         INNER JOIN (
>>           select
>>             distinct orderId,
>>             cityId,
>>             lmTime,
>>             proctime
>>           from
>>             my_streamtable2
>>           where
>>             orderId is not null
>>         ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR)
>>         AND a3.proctime BETWEEN a1.proctime - interval '60' minute
>>         AND a1.proctime + interval '60' minute
>>     ),
>>     zone_count as(
>>       select
>>         zoneId,
>>         proctime() as proctime,
>>         COUNT(orderId) as counts_inclusive,
>>         sum(delivered_flag) as counts_exclusive,
>>         AVG(cityId) as cityId
>>       from
>>         Actuals
>>       where
>>         P2D_inclusive is not null
>>       group by
>>         HOP(
>>           proctime(),
>>           interval '5' minute,
>>           interval '60' minute
>>         ),
>>         zoneId
>>     ),
>>     zone_agg as (
>>       select
>>         zoneId,
>>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_inclusive,
>>         sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_exclusive,
>>         avg(cityId) as cityId,
>>         proctime() as proctime
>>       from
>>         Actuals
>>       where
>>         P2D_inclusive is not null
>>       group by
>>         HOP(
>>           proctime(),
>>           interval '5' minute,
>>           interval '60' minute
>>         ),
>>         zoneId
>>     ),
>>     city_agg as(
>>       select
>>         cityId,
>>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
>> city_quotient_inclusive,
>>         sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
>> city_quotient_exclusive,
>>         proctime() as proctime
>>       from
>>         Actuals
>>       where
>>         P2D_inclusive is not null
>>       group by
>>         HOP(
>>           proctime(),
>>           interval '5' minute,
>>           interval '60' minute
>>         ),
>>         cityId
>>     ),
>>     final as (
>>       select
>>         zone_count.zoneId,
>>         zone_count.cityId,
>>         avg(zone_count.counts_inclusive) as counts_inclusive,
>>         avg(zone_count.counts_exclusive) as counts_exclusive,
>>         avg(zone_agg.zone_quotient_inclusive) as zone_quotient_inclusive,
>>         avg(city_agg.city_quotient_inclusive) as city_quotient_inclusive,
>>         avg(zone_agg.zone_quotient_exclusive) as zone_quotient_exclusive,
>>         avg(city_agg.city_quotient_exclusive) as city_quotient_exclusive
>>       from
>>         zone_count
>>         INNER join zone_agg on zone_count.zoneId = zone_agg.zoneId
>>         AND zone_agg.proctime BETWEEN zone_count.proctime - interval '60'
>> minute
>>         AND zone_count.proctime
>>         INNER join city_agg on zone_count.cityId = city_agg.cityId
>>         AND city_agg.proctime BETWEEN zone_count.proctime - interval '60'
>> minute
>>         AND zone_count.proctime
>>       group by
>>         HOP(
>>           proctime(),
>>           interval '5' minute,
>>           interval '60' minute
>>         ),
>>         zone_count.zoneId,
>>         zone_count.cityId
>>     ),
>>     new_final as (
>>       select
>>         'lm_feedback#' || cast(zoneId as varchar) as key,
>>         case
>>           when counts_inclusive > 5
>>           and zone_quotient_inclusive > zone_quotient_exclusive then
>> zone_quotient_inclusive
>>           when counts_exclusive > 5 then zone_quotient_exclusive
>>           when city_quotient_inclusive > city_quotient_exclusive then
>> city_quotient_inclusive
>>           else city_quotient_exclusive
>>         end as value
>>       from
>>         final
>>     )
>>     select
>>       key,
>>       case
>>         when new_final.value > 30 then 30
>>         else new_final.value
>>       end as value,
>>       CURRENT_TIMESTAMP AS win_end,
>>       CURRENT_TIMESTAMP - 1800000 AS win_start,
>>       CURRENT_TIMESTAMP AS time_stamp
>>     from
>>       new_final
>>   )
>>
>
>
> --
> Best,
> Hangxiang.
>

Reply via email to