Hi all, We have just upgraded to Flink 1.19 and we are experiencing some issue when the job tries to restore from *some* savepoints, not all. In these cases, the failure manifests when the job is unable to create a checkpoint after starting from savepoint, saying *Failure reason: Not all required tasks are currently running...* so the savepoint retrieval seems to never complete. Looking at the Flink graph, *it seems that the slot that is stuck is the one with the source and filters*, the slot with the sink recovers from savepoint just fine. Our filters don't save any custom state so it all points to the source, but that has not been changed with the upgrade. We use FlinkKafkaConsumerBase.
After enabling extra logging it seems that for problematic savepoints presto is doing a burst of calls to S3 (sometimes thousands), querying multiple times each file within the savepoint, but changing the bytes range of the request. Below there are some sample logs (with reduced headers), the second request being made within milliseconds of the first, changing the byte range from 0-9223372036854775806 to 1242-9223372036854775806. ---------------- [30-May-2024 12:02:49.462 UTC] DEBUG com.amazonaws.request[] - Sending Request: GET https://xxxx.amazonaws.com /somepath/savepoints/savepoint-12345/78910abcde Headers: (Range: bytes=0-9223372036854775806) [30-May-2024 12:02:49.657 UTC] DEBUG com.amazonaws.request[] - Sending Request: GET https://xxxx.amazonaws.com /somepath/savepoints/savepoint-12345/78910abcde Headers: (Range: bytes=1242-9223372036854775806) ---------------- We have confirmed that S3 does return the full file in each of these requests, so the hypothesis is that somehow presto is struggling to read the full content and retries almost immediatly. From various tests we have done, it seems that the file size does not have an impact, as some tests with files up to almost 200Mb don't fail. The one thing we have noticed is that for Flink 1.19 the number of small files (on the hundreds of kb in size) has increased in comparison to savepoints from 1.13, which at the beginning resulted on the _metadata file being quite large and giving pekko errors when trying to send it from jobmanager to taskmanagers. We made the error go away by reducing the *state.storage.fs.memory-threshold* to 100Kb, which in Flink 1.13 we had set to 1mb. I wonder what's triggering that increase of small files. Other details: - We use the Flink presto plugin without any custom configuration (with presto 0.272 for Flink 1.19). Please note that the default socket timeout is 30s, we have also tried to increase this value to no avail. - We have configured state.storage.fs.memory-threshold: 100kb - We use *hashmap* state backend - With our previous version of Flink (1.13) we never experienced similar issues, while checkpoints reached up to 8Gb sometimes. We haven't changed any configuration regarding this during the upgrade, so defaults in 1.13 and 1.19 would apply. - One last emphasis that *savepoint restoration works sometimes, *so configuration such as endpoints, authentication and so on should be correct. Recovery from checkpoint works the majority of times. - This issue *only happens in environments that ingest a lot of data from kafka*, environments with lower amounts of data recover from savepoint ok. - Using a 1.13 savepoint with Flink 1.19 always works so far, it's only savepoints created by Flink 1.19 that show this problem. Any pointers or suggestions would be greatly appreciated! Thanks in advance, Nora