Hi,

Some context: after a refactoring, we were unable to start our jobs.
They started fine and checkpointed fine, but once the job restarted
owing to a transient failure, the application was unable to start. The
Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
`_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
the `_metadata` file we saw `- 1402496 offsets:
com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
to be the operator state we were no longer initializing or
snapshotting after the refactoring.

Before I dig further into this and try to find a smaller reproducible
test case I thought I would ask if someone knows what the expected
behaviour is for the following scenario:

suppose you have an operator (in this case a Source) which has some
operator ListState. Suppose you run your flink job for some time and
then later refactor your job such that you no longer use that state
(so after the refactoring you're no longer initializing this operator
state in initializeState, nor are you snapshotting the operator state
in snapshotState). If you launch your new code from a recent
savepoint, what do we expect to happen to the state? Do we anticipate
the behaviour I explained above?

My assumption would be that Flink would not read this state and so it
would be removed from the next checkpoint or savepoint. Alternatively,
I might assume it would not be read but would linger around every
future checkpoint or savepoint. However, it feels like what is
happening is it's not read and then possibly replicated by every
instance of the task every time a checkpoint happens (hence the
accidentally exponential behaviour).

Thoughts?

PS - in case someone asks: I was sure that we were calling `.clear()`
appropriately in `snapshotState` (we, uh, already learned that lesson
:D)

Best,

Aaron Levin

Reply via email to