Hi all, We have some jobs that write parquet files in s3, bucketing by processing time in a structure like /year/month/day/hour.
On 13th of September, we have migrated our Flink runtime 1.14.5 to 1.15.2 and now we have some jobs crashing at checkpointing because of being unable to find some s3 files from the 13th of September (those being removed by retention policy). Being unable to explain why would it try to access 2-3 weeks old files, I looked into a couple of checkpoint/savepoint files and found there old files being referenced along with the current ones. The only association I could make is the migration from 1.14 to 1.15 performed on 13th of September. Have no idea how those files got stuck and passed from one checkpoint to another. bucket-states�s3p://flink-state/prod/imp/landslide-eu-west-1/checkpoints/eb0ca029a6a45006216b7df464a9c44a/chk-255210/ae1d9611-4118-4068-92de-315b90ad733f�writer_raw_states OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer$BytePrimitiveArraySerializerSnapshotstreaming_committer_raw_states OPERATOR_STATE_DISTRIBUTION_MODESPLIT_DISTRIBUTEVALUE_SERIALIZERrorg.apache.flink.api.common.typ2022/10/05/09�s3a:// raw-data-prod/default/imp/v3/ds-meru-prod-kinesis-eu-west-1-20211101-v3-ad-impression/2022/10/05/09�������&�%,v\�,�<��2v��default/imp/v3/ds-meru-prod-kinesis-eu-west-1-20211101-v3-ad-impression/2022/09/13/12/part-bdc414ff-a05f-4d0d-8c7e-0e0a62c00588-c13ff983-d266-4c7b-b13e-cb22f6681e5d-19.gz.parquet�3m.FuN.xuBx4BpyfPXavcgdXn.PtySRyjFI7rkfGJ60EfD2Pn3eOzpXtWppSicdEre1SzGh2brRGPGdtNrMVr85jDKoTM98qTDaU7Y9gm0AavlRN152MxtJABGzR.alZ_YH9WlEUGOM1xUv96j4CCla25fIfguHx83QVpapN2iQ- In the checkpoint snippet above, observe a normal file, processed on 5th of October and a stuck one from 13th of September. Any idea why would the sink keep old written files in the state and pass them from checkpoint to checkpoint? Is that a bug or a migration issue between 1.14 and 1.15?