Hi Neha,

I think you can first check whether the options `state.backend` and
`state.backend.incremental` you mentioned above exist in
`JobManager`->`Configuration` in Flink webui. If they do not exist, you may
be using the wrong conf file.

Best,
Shammon FY


On Mon, Jul 17, 2023 at 5:04 PM Neha . <neh...@swiggy.in> wrote:

> Hi Shammon,
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> This is already set in the Flink-conf. Anything else that should be taken
> care of for the incremental checkpointing? Is there any related bug in
> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
> What can be the reason for stopped incremental checkpointing?
>
> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY <zjur...@gmail.com> wrote:
>
>> Hi Neha,
>>
>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>> incremental checkpoint,  you can check it
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 10:25 AM Neha . <neh...@swiggy.in> wrote:
>>
>>> Hello Shammon,
>>>
>>> Thank you for your assistance.
>>> I have already enabled the incremental checkpointing, Attaching the
>>> screenshot. Can you please elaborate on what makes you think it is not
>>> enabled, It might hint towards the issue. The problem is checkpoint size is
>>> not going down and keeps on increasing while savepoint size shows the
>>> correct behavior of going up and down with the throughput peaks.
>>>
>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>
>>>
>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY <zjur...@gmail.com> wrote:
>>>
>>>> Hi Neha,
>>>>
>>>> I think it is normal for the data size of a savepoint to be smaller
>>>> than the full data of a checkpoint. Flink uses rocksdb to store
>>>> checkpointed data, which is an LSM structured storage where the same key
>>>> will have multiple version records, while savepoint will traverse all keys
>>>> and store only one record per key.
>>>>
>>>> But I noticed that you did not enable incremental checkpoint, which
>>>> resulted in each checkpoint saving full data. You can refer to [1] for more
>>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>>>
>>>> Best,
>>>> Shammon FY
>>>>
>>>>
>>>> On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote:
>>>>
>>>>> Hello  Shammon FY,
>>>>>
>>>>> It is a production issue for me. Can you please take a look if
>>>>> anything can be done?
>>>>>
>>>>> ---------- Forwarded message ---------
>>>>> From: Neha . <neh...@swiggy.in>
>>>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>>>> Subject: Checkpoint size smaller than Savepoint size
>>>>> To: <user@flink.apache.org>
>>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>> According to Flink's documentation, Checkpoints are designed to be
>>>>> lightweight. However, in my Flink pipeline, I have observed that the
>>>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>>>> behavior? What are the possible scenarios that can lead to this situation?
>>>>>
>>>>> Additionally, I have noticed that the checkpoint size in my datastream
>>>>> pipeline continues to grow while the savepoint size behaves as expected.
>>>>> Could this be attributed to the usage of Common Table Expressions (CTEs) 
>>>>> in
>>>>> Flink SQL?
>>>>>
>>>>> Flink version: 1.16.1
>>>>> Incremental checkpointing is enabled.
>>>>> StateBackend: RocksDB
>>>>> Time Characteristic: Ingestion
>>>>>
>>>>> SQL:
>>>>>
>>>>> SELECT
>>>>>   *
>>>>> from
>>>>>   (
>>>>>     With Actuals as (
>>>>>       SELECT
>>>>>         clientOrderId,
>>>>>         Cast(
>>>>>           ValueFromKeyCacheUDF(
>>>>>             concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>>>           ) as INT
>>>>>         ) as zoneId,
>>>>>         cityId,
>>>>>         case
>>>>>           when status = 'ASSIGNED' then 1
>>>>>           else 0
>>>>>         end as acceptance_flag,
>>>>>         unicast.proctime
>>>>>       FROM
>>>>>         order
>>>>>         INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>>>>         AND order.proctime BETWEEN unicast.proctime - interval '70'
>>>>> minute
>>>>>         AND unicast.proctime + interval '10' minute
>>>>>         and unicast.status in ('ASSIGNED', 'REJECTED')
>>>>>     ),
>>>>>     zone_agg as (
>>>>>       select
>>>>>         zoneId,
>>>>>         (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>>>>         avg(cityId) as cityId,
>>>>>         COUNT(*) as `unicast_count`,
>>>>>         proctime() as proctime
>>>>>       from
>>>>>         Actuals
>>>>>       group by
>>>>>         HOP(
>>>>>           proctime(),
>>>>>           interval '5' minute,
>>>>>           interval '30' minute
>>>>>         ),
>>>>>         zoneId
>>>>>     ),
>>>>>     city_agg as(
>>>>>       select
>>>>>         cityId,
>>>>>         sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>>>>         proctime() as proctime
>>>>>       from
>>>>>         Actuals
>>>>>       group by
>>>>>         HOP(
>>>>>           proctime(),
>>>>>           interval '5' minute,
>>>>>           interval '30' minute
>>>>>         ),
>>>>>         cityId
>>>>>     ),
>>>>>     final as (
>>>>>       select
>>>>>         zone_agg.zoneId,
>>>>>         zone_agg.cityId,
>>>>>         avg(zone_agg.unicast_count) as unicast_count,
>>>>>         avg(zone_agg.zone_quotient) as zone_quotient,
>>>>>         avg(city_agg.city_quotient) as city_quotient
>>>>>       from
>>>>>         city_agg
>>>>>         INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>>>>         AND city_agg.proctime BETWEEN zone_agg.proctime - interval
>>>>> '60' minute
>>>>>         AND zone_agg.proctime
>>>>>       group by
>>>>>         HOP(
>>>>>           proctime(),
>>>>>           interval '5' minute,
>>>>>           interval '30' minute
>>>>>         ),
>>>>>         zone_agg.zoneId,
>>>>>         zone_agg.cityId
>>>>>     ),
>>>>>     new_final as (
>>>>>       select
>>>>>         'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
>>>>>         zone_quotient,
>>>>>         city_quotient,
>>>>>         case
>>>>>           when unicast_count > 5 then zone_quotient
>>>>>           else city_quotient
>>>>>         end as `value`
>>>>>       from
>>>>>         final
>>>>>     )
>>>>>     select
>>>>>       key,
>>>>>       case
>>>>>         when new_final.`value` > 1 then 1
>>>>>         else new_final.`value`
>>>>>       end as `value`,
>>>>>       zone_quotient,
>>>>>       city_quotient
>>>>>     from
>>>>>       new_final
>>>>>   )
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>>> IMPORTANT NOTICE:  The contents of this email and any attachments are
>>>>> confidential in nature and intended solely for the addressee, and are
>>>>> subject to the terms and conditions of disclosure as further described
>>>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>>>> recipient or you do not agree to the terms and conditions of disclosure,
>>>>> please delete this email immediately, and notify the sender by return
>>>>> email. In the event that you continue to access the information herein or
>>>>> act upon it in any manner, the terms and conditions shall be deemed
>>>>> accepted by you.
>>>>
>>>>
>>>
>>> ------------------------------
>>> IMPORTANT NOTICE:  The contents of this email and any attachments are
>>> confidential in nature and intended solely for the addressee, and are
>>> subject to the terms and conditions of disclosure as further described
>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>> recipient or you do not agree to the terms and conditions of disclosure,
>>> please delete this email immediately, and notify the sender by return
>>> email. In the event that you continue to access the information herein or
>>> act upon it in any manner, the terms and conditions shall be deemed
>>> accepted by you.
>>
>>
>
> ------------------------------
> IMPORTANT NOTICE:  The contents of this email and any attachments are
> confidential in nature and intended solely for the addressee, and are
> subject to the terms and conditions of disclosure as further described
> here: https://www.scd.swiggy.in/nda. If you are not the intended
> recipient or you do not agree to the terms and conditions of disclosure,
> please delete this email immediately, and notify the sender by return
> email. In the event that you continue to access the information herein or
> act upon it in any manner, the terms and conditions shall be deemed
> accepted by you.

Reply via email to