Re: table.exec.state.ttl not working as expected

2023-06-25 Thread Jane Chan
Hi,

>From the attachment, I saw there are IntervalJoin and GroupWindowAggregate
operators. AFAIK the state retention for such operators is not controlled
by `table.exec.state.ttl`.
Could you share the operator-level state metrics to help identify the
issue?

Best,
Jane

On Sun, Jun 25, 2023 at 10:38 AM Hangxiang Yu  wrote:

> Hi, neha.
>
> Could you share more information:
>
>1. Which State Backend are you using? If it's RocksDB, is incremental
>checkpointing enabled?
>2. Which specific operator is experiencing an increase in Checkpoint
>data size? (You can check the Checkpoint size changes of different subtasks
>from the Checkpoint History in the Flink UI)
>3. Has there been any change in data flow and input data during this
>time?
>
>
> On Fri, Jun 23, 2023 at 2:01 PM neha goyal  wrote:
>
>> Hello,
>>
>> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to
>> my table environment. Even after that, I can see continuous growth in
>> savepoint size.
>>
>> I am attaching the screenshot of the job graph and savepoint metric.
>> I am also adding the query that I am running on Kafka streams, It is a
>> lengthy query. Any help would be highly appreciated.
>>
>> SELECT
>>   *
>> from
>>   (
>> With Actuals as (
>>   SELECT
>> a1.orderId,
>> a1.zoneId,
>> a3.cityId,
>> case
>>   when a2.status = 'delivered' then round(
>> CAST(
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>   ) AS DOUBLE
>> ) / CAST(6 AS DOUBLE),
>> 4
>>   )
>>   when CAST(
>> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>   ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then
>> CAST(
>> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
>> DOUBLE
>>   ) / CAST(60 * 1000 AS DOUBLE)
>>   else null
>> end AS P2D_inclusive,
>> case
>>   when a2.status = 'delivered' then round(
>> CAST(
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) -
>> Cast(a1.server_time_stamp AS BIGINT)
>>   ) AS DOUBLE
>> ) / CAST(6 AS DOUBLE),
>> 4
>>   )
>>   else null
>> end as P2D_exclusive,
>> cast(lmTime as DOUBLE) as PP2D,
>> case
>>   when a2.status = 'delivered' then exp(
>> (
>>   (
>> Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP
>>   ) /(60 * 1000)
>> ) / 100
>>   )
>>   else 1
>> end as recency_wt,
>> case
>>   when a2.status = 'delivered' then 1
>>   else 0
>> end as delivered_flag,
>> case
>>   when a2.status = 'delivered' then a2.proctime
>>   else a1.proctime
>> end as proctime
>>   FROM
>> (
>>   select
>> distinct orderId,
>> zoneId,
>> server_time_stamp,
>> proctime
>>   from
>> my_streamtable
>>   where
>> status = 'pickedup'
>> ) a1
>> LEFT JOIN (
>>   select
>> distinct orderId,
>> zoneId,
>> status,
>> server_time_stamp,
>> proctime
>>   from
>> my_streamtable
>>   where
>> status = 'delivered'
>> ) a2 ON a1.orderId = a2.orderId
>> AND a2.proctime BETWEEN a1.proctime - interval '60' minute
>> AND a1.proctime + interval '60' minute
>> INNER JOIN (
>>   select
>> distinct orderId,
>> cityId,
>> lmTime,
>> proctime
>>   from
>> my_streamtable2
>>   where
>> orderId is not null
>> ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR)
>> AND a3.proctime BETWEEN a1.proctime - interval '60' minute
>> AND a1.proctime + interval '60' minute
>> ),
>> zone_count as(
>>   select
>> zoneId,
>> proctime() as proctime,
>> COUNT(orderId) as counts_inclusive,
>> sum(delivered_flag) as counts_exclusive,
>> AVG(cityId) as cityId
>>   from
>> Actuals
>>   where
>> P2D_inclusive is not null
>>   group by
>> HOP(
>>   proctime(),
>>   interval '5' minute,
>>   interval '60' minute
>> ),
>> zoneId
>> ),
>> zone_agg as (
>>   select
>> zoneId,
>> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_inclusive,
>> sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as
>> zone_quotient_exclusive,
>> avg(cityId) as cityId,
>> proctime() as proctime
>>   

Re: table.exec.state.ttl not working as expected

2023-06-24 Thread Hangxiang Yu
Hi, neha.

Could you share more information:

   1. Which State Backend are you using? If it's RocksDB, is incremental
   checkpointing enabled?
   2. Which specific operator is experiencing an increase in Checkpoint
   data size? (You can check the Checkpoint size changes of different subtasks
   from the Checkpoint History in the Flink UI)
   3. Has there been any change in data flow and input data during this
   time?


On Fri, Jun 23, 2023 at 2:01 PM neha goyal  wrote:

> Hello,
>
> I have assigned env.getConfig().set("table.exec.state.ttl", "180 s") to my
> table environment. Even after that, I can see continuous growth in
> savepoint size.
>
> I am attaching the screenshot of the job graph and savepoint metric.
> I am also adding the query that I am running on Kafka streams, It is a
> lengthy query. Any help would be highly appreciated.
>
> SELECT
>   *
> from
>   (
> With Actuals as (
>   SELECT
> a1.orderId,
> a1.zoneId,
> a3.cityId,
> case
>   when a2.status = 'delivered' then round(
> CAST(
>   (
> Cast(a2.server_time_stamp AS BIGINT) -
> Cast(a1.server_time_stamp AS BIGINT)
>   ) AS DOUBLE
> ) / CAST(6 AS DOUBLE),
> 4
>   )
>   when CAST(
> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
> DOUBLE
>   ) / CAST(60 * 1000 AS DOUBLE) > cast(lmTime as DOUBLE) then CAST(
> CURRENT_TIMESTAMP - (Cast(a1.server_time_stamp AS BIGINT)) AS
> DOUBLE
>   ) / CAST(60 * 1000 AS DOUBLE)
>   else null
> end AS P2D_inclusive,
> case
>   when a2.status = 'delivered' then round(
> CAST(
>   (
> Cast(a2.server_time_stamp AS BIGINT) -
> Cast(a1.server_time_stamp AS BIGINT)
>   ) AS DOUBLE
> ) / CAST(6 AS DOUBLE),
> 4
>   )
>   else null
> end as P2D_exclusive,
> cast(lmTime as DOUBLE) as PP2D,
> case
>   when a2.status = 'delivered' then exp(
> (
>   (
> Cast(a2.server_time_stamp AS BIGINT) - CURRENT_TIMESTAMP
>   ) /(60 * 1000)
> ) / 100
>   )
>   else 1
> end as recency_wt,
> case
>   when a2.status = 'delivered' then 1
>   else 0
> end as delivered_flag,
> case
>   when a2.status = 'delivered' then a2.proctime
>   else a1.proctime
> end as proctime
>   FROM
> (
>   select
> distinct orderId,
> zoneId,
> server_time_stamp,
> proctime
>   from
> my_streamtable
>   where
> status = 'pickedup'
> ) a1
> LEFT JOIN (
>   select
> distinct orderId,
> zoneId,
> status,
> server_time_stamp,
> proctime
>   from
> my_streamtable
>   where
> status = 'delivered'
> ) a2 ON a1.orderId = a2.orderId
> AND a2.proctime BETWEEN a1.proctime - interval '60' minute
> AND a1.proctime + interval '60' minute
> INNER JOIN (
>   select
> distinct orderId,
> cityId,
> lmTime,
> proctime
>   from
> my_streamtable2
>   where
> orderId is not null
> ) a3 ON cast(a1.orderId as VARCHAR) = cast(a3.orderId as VARCHAR)
> AND a3.proctime BETWEEN a1.proctime - interval '60' minute
> AND a1.proctime + interval '60' minute
> ),
> zone_count as(
>   select
> zoneId,
> proctime() as proctime,
> COUNT(orderId) as counts_inclusive,
> sum(delivered_flag) as counts_exclusive,
> AVG(cityId) as cityId
>   from
> Actuals
>   where
> P2D_inclusive is not null
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '60' minute
> ),
> zoneId
> ),
> zone_agg as (
>   select
> zoneId,
> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> zone_quotient_inclusive,
> sum(recency_wt *(P2D_exclusive - PP2D)) / sum(recency_wt) as
> zone_quotient_exclusive,
> avg(cityId) as cityId,
> proctime() as proctime
>   from
> Actuals
>   where
> P2D_inclusive is not null
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '60' minute
> ),
> zoneId
> ),
> city_agg as(
>   select
> cityId,
> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> city_quotient_inclusive,
> sum(recency_wt *(P2D_inclusive - PP2D)) / sum(recency_wt) as
> city_quotient_exclusive,
>