Hi all,

We have just upgraded to Flink 1.19 and we are experiencing some issue when
the job tries to restore from *some* savepoints, not all. In these cases,
the failure manifests when the job is unable to create a checkpoint after
starting from savepoint, saying *Failure reason: Not all required tasks are
currently running...* so the savepoint retrieval seems to never complete.
Looking at the Flink graph, *it seems that the slot that is stuck is the
one with the source and filters*, the slot with the sink recovers from
savepoint just fine. Our filters don't save any custom state so it all
points to the source, but that has not been changed with the upgrade. We
use FlinkKafkaConsumerBase.

After enabling extra logging it seems that for problematic savepoints
presto is doing a burst of calls to S3 (sometimes thousands), querying
multiple times each file within the savepoint, but changing the bytes range
of the request. Below there are some sample logs (with reduced headers),
the second request being made within milliseconds of the first, changing
the byte range from 0-9223372036854775806 to 1242-9223372036854775806.
---------------- [30-May-2024 12:02:49.462 UTC] DEBUG
com.amazonaws.request[] - Sending Request: GET https://xxxx.amazonaws.com
/somepath/savepoints/savepoint-12345/78910abcde Headers: (Range:
bytes=0-9223372036854775806) [30-May-2024 12:02:49.657 UTC] DEBUG
com.amazonaws.request[] - Sending Request: GET https://xxxx.amazonaws.com
/somepath/savepoints/savepoint-12345/78910abcde Headers: (Range:
bytes=1242-9223372036854775806) ---------------- We have confirmed that S3
does return the full file in each of these requests, so the hypothesis is
that somehow presto is struggling to read the full content and retries
almost immediatly. From various tests we have done, it seems that the file
size does not have an impact, as some tests with files up to almost 200Mb
don't fail. The one thing we have noticed is that for Flink 1.19 the number
of small files (on the hundreds of kb in size) has increased in comparison
to savepoints from 1.13, which at the beginning resulted on the _metadata
file being quite large and giving pekko errors when trying to send it from
jobmanager to taskmanagers. We made the error go away by reducing the
*state.storage.fs.memory-threshold* to 100Kb, which in Flink 1.13 we had
set to 1mb. I wonder what's triggering that increase of small files. Other
details: - We use the Flink presto plugin without any custom configuration
(with presto 0.272 for Flink 1.19). Please note that the default socket
timeout is 30s, we have also tried to increase this value to no avail. - We
have configured state.storage.fs.memory-threshold: 100kb - We use *hashmap*
state backend - With our previous version of Flink (1.13) we never
experienced similar issues, while checkpoints reached up to 8Gb sometimes.
We haven't changed any configuration regarding this during the upgrade, so
defaults in 1.13 and 1.19 would apply. - One last emphasis that *savepoint
restoration works sometimes, *so configuration such as endpoints,
authentication and so on should be correct. Recovery from checkpoint works
the majority of times. - This issue *only happens in environments that
ingest a lot of data from kafka*, environments with lower amounts of data
recover from savepoint ok.
- Using a 1.13 savepoint with Flink 1.19 always works so far, it's only
savepoints created by Flink 1.19 that show this problem.

Any pointers or suggestions would be greatly appreciated!
Thanks in advance,

Nora

Reply via email to