Alexis Sarda-Espinosa created FLINK-34325:
---------------------------------------------
Summary: Inconsistent state with data loss after OutOfMemoryError
in Job Manager
Key: FLINK-34325
URL: https://issues.apache.org/jira/browse/FLINK-34325
Project: Flink
Issue Type: Bug
Affects Versions: 1.17.1
Environment: Flink on Kubernetes with HA, RocksDB with incremental
checkpoints on Azure
Reporter: Alexis Sarda-Espinosa
Attachments: jobmanager_log.txt
I have a job that uses broadcast state to maintain a cache of required
metadata. I am currently evaluating memory requirements of my specific use
case, and I ran into a weird situation that seems worrisome.
All sources in my job are Kafka sources. I wrote a large amount of messages in
Kafka to force the broadcast state's cache to grow. At some point, this caused
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the Job Manager.
I would have expected the whole java process of the JM to crash, but the job
was simply restarted. What's worrisome is that, after 2 restarts, the job
resumed from the latest successful checkpoint and completely ignored all the
events I wrote to Kafka, which I can verify because I have a custom metric that
exposes the approximate size of this cache, and the fact that the job didn't
crashloop at this point after reading all the messages from Kafka over and over
again.
I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
# It seems the memory error from the JM didn't prevent the Kafka offsets from
being "rolled back", so eventually the Kafka events that should have ended in
the broadcast state's cache were ignored.
# Is it normal that the state is somehow "materialized" in the JM and is thus
affected by the size of the JM's heap? Is this something particular due to the
use of broadcast state? I found this very surprising.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)