Hi Neha, I think you can first check whether the options `state.backend` and `state.backend.incremental` you mentioned above exist in `JobManager`->`Configuration` in Flink webui. If they do not exist, you may be using the wrong conf file.
Best, Shammon FY On Mon, Jul 17, 2023 at 5:04 PM Neha . <neh...@swiggy.in> wrote: > Hi Shammon, > > state.backend: rocksdb > state.backend.incremental: true > > This is already set in the Flink-conf. Anything else that should be taken > care of for the incremental checkpointing? Is there any related bug in > Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6. > What can be the reason for stopped incremental checkpointing? > > On Mon, Jul 17, 2023 at 11:35 AM Shammon FY <zjur...@gmail.com> wrote: > >> Hi Neha, >> >> I noticed that the `Checkpointed Data Size` is always equals to `Full >> Checkpoint Data Size`, I think the job is using full checkpoint instead of >> incremental checkpoint, you can check it >> >> Best, >> Shammon FY >> >> On Mon, Jul 17, 2023 at 10:25 AM Neha . <neh...@swiggy.in> wrote: >> >>> Hello Shammon, >>> >>> Thank you for your assistance. >>> I have already enabled the incremental checkpointing, Attaching the >>> screenshot. Can you please elaborate on what makes you think it is not >>> enabled, It might hint towards the issue. The problem is checkpoint size is >>> not going down and keeps on increasing while savepoint size shows the >>> correct behavior of going up and down with the throughput peaks. >>> >>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png] >>> >>> >>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY <zjur...@gmail.com> wrote: >>> >>>> Hi Neha, >>>> >>>> I think it is normal for the data size of a savepoint to be smaller >>>> than the full data of a checkpoint. Flink uses rocksdb to store >>>> checkpointed data, which is an LSM structured storage where the same key >>>> will have multiple version records, while savepoint will traverse all keys >>>> and store only one record per key. >>>> >>>> But I noticed that you did not enable incremental checkpoint, which >>>> resulted in each checkpoint saving full data. You can refer to [1] for more >>>> detail and turn it on, which will reduce the data size of the checkpoint. >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints >>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$> >>>> >>>> Best, >>>> Shammon FY >>>> >>>> >>>> On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote: >>>> >>>>> Hello Shammon FY, >>>>> >>>>> It is a production issue for me. Can you please take a look if >>>>> anything can be done? >>>>> >>>>> ---------- Forwarded message --------- >>>>> From: Neha . <neh...@swiggy.in> >>>>> Date: Fri, Jul 14, 2023 at 4:06 PM >>>>> Subject: Checkpoint size smaller than Savepoint size >>>>> To: <user@flink.apache.org> >>>>> >>>>> >>>>> Hello, >>>>> >>>>> According to Flink's documentation, Checkpoints are designed to be >>>>> lightweight. However, in my Flink pipeline, I have observed that the >>>>> savepoint sizes are smaller than the checkpoints. Is this expected >>>>> behavior? What are the possible scenarios that can lead to this situation? >>>>> >>>>> Additionally, I have noticed that the checkpoint size in my datastream >>>>> pipeline continues to grow while the savepoint size behaves as expected. >>>>> Could this be attributed to the usage of Common Table Expressions (CTEs) >>>>> in >>>>> Flink SQL? >>>>> >>>>> Flink version: 1.16.1 >>>>> Incremental checkpointing is enabled. >>>>> StateBackend: RocksDB >>>>> Time Characteristic: Ingestion >>>>> >>>>> SQL: >>>>> >>>>> SELECT >>>>> * >>>>> from >>>>> ( >>>>> With Actuals as ( >>>>> SELECT >>>>> clientOrderId, >>>>> Cast( >>>>> ValueFromKeyCacheUDF( >>>>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR)) >>>>> ) as INT >>>>> ) as zoneId, >>>>> cityId, >>>>> case >>>>> when status = 'ASSIGNED' then 1 >>>>> else 0 >>>>> end as acceptance_flag, >>>>> unicast.proctime >>>>> FROM >>>>> order >>>>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id >>>>> AND order.proctime BETWEEN unicast.proctime - interval '70' >>>>> minute >>>>> AND unicast.proctime + interval '10' minute >>>>> and unicast.status in ('ASSIGNED', 'REJECTED') >>>>> ), >>>>> zone_agg as ( >>>>> select >>>>> zoneId, >>>>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`, >>>>> avg(cityId) as cityId, >>>>> COUNT(*) as `unicast_count`, >>>>> proctime() as proctime >>>>> from >>>>> Actuals >>>>> group by >>>>> HOP( >>>>> proctime(), >>>>> interval '5' minute, >>>>> interval '30' minute >>>>> ), >>>>> zoneId >>>>> ), >>>>> city_agg as( >>>>> select >>>>> cityId, >>>>> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`, >>>>> proctime() as proctime >>>>> from >>>>> Actuals >>>>> group by >>>>> HOP( >>>>> proctime(), >>>>> interval '5' minute, >>>>> interval '30' minute >>>>> ), >>>>> cityId >>>>> ), >>>>> final as ( >>>>> select >>>>> zone_agg.zoneId, >>>>> zone_agg.cityId, >>>>> avg(zone_agg.unicast_count) as unicast_count, >>>>> avg(zone_agg.zone_quotient) as zone_quotient, >>>>> avg(city_agg.city_quotient) as city_quotient >>>>> from >>>>> city_agg >>>>> INNER join zone_agg on zone_agg.cityId = city_agg.cityId >>>>> AND city_agg.proctime BETWEEN zone_agg.proctime - interval >>>>> '60' minute >>>>> AND zone_agg.proctime >>>>> group by >>>>> HOP( >>>>> proctime(), >>>>> interval '5' minute, >>>>> interval '30' minute >>>>> ), >>>>> zone_agg.zoneId, >>>>> zone_agg.cityId >>>>> ), >>>>> new_final as ( >>>>> select >>>>> 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key, >>>>> zone_quotient, >>>>> city_quotient, >>>>> case >>>>> when unicast_count > 5 then zone_quotient >>>>> else city_quotient >>>>> end as `value` >>>>> from >>>>> final >>>>> ) >>>>> select >>>>> key, >>>>> case >>>>> when new_final.`value` > 1 then 1 >>>>> else new_final.`value` >>>>> end as `value`, >>>>> zone_quotient, >>>>> city_quotient >>>>> from >>>>> new_final >>>>> ) >>>>> >>>>> >>>>> >>>>> >>>>> ------------------------------ >>>>> IMPORTANT NOTICE: The contents of this email and any attachments are >>>>> confidential in nature and intended solely for the addressee, and are >>>>> subject to the terms and conditions of disclosure as further described >>>>> here: https://www.scd.swiggy.in/nda. If you are not the intended >>>>> recipient or you do not agree to the terms and conditions of disclosure, >>>>> please delete this email immediately, and notify the sender by return >>>>> email. In the event that you continue to access the information herein or >>>>> act upon it in any manner, the terms and conditions shall be deemed >>>>> accepted by you. >>>> >>>> >>> >>> ------------------------------ >>> IMPORTANT NOTICE: The contents of this email and any attachments are >>> confidential in nature and intended solely for the addressee, and are >>> subject to the terms and conditions of disclosure as further described >>> here: https://www.scd.swiggy.in/nda. If you are not the intended >>> recipient or you do not agree to the terms and conditions of disclosure, >>> please delete this email immediately, and notify the sender by return >>> email. In the event that you continue to access the information herein or >>> act upon it in any manner, the terms and conditions shall be deemed >>> accepted by you. >> >> > > ------------------------------ > IMPORTANT NOTICE: The contents of this email and any attachments are > confidential in nature and intended solely for the addressee, and are > subject to the terms and conditions of disclosure as further described > here: https://www.scd.swiggy.in/nda. If you are not the intended > recipient or you do not agree to the terms and conditions of disclosure, > please delete this email immediately, and notify the sender by return > email. In the event that you continue to access the information herein or > act upon it in any manner, the terms and conditions shall be deemed > accepted by you.