Hi, Some context: after a refactoring, we were unable to start our jobs. They started fine and checkpointed fine, but once the job restarted owing to a transient failure, the application was unable to start. The Job Manager was OOM'ing (even when I gave them 256GB of ram!). The `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside the `_metadata` file we saw `- 1402496 offsets: com.stripe.flink.backfill.kafka-archive-file-progress`. This happened to be the operator state we were no longer initializing or snapshotting after the refactoring.
Before I dig further into this and try to find a smaller reproducible test case I thought I would ask if someone knows what the expected behaviour is for the following scenario: suppose you have an operator (in this case a Source) which has some operator ListState. Suppose you run your flink job for some time and then later refactor your job such that you no longer use that state (so after the refactoring you're no longer initializing this operator state in initializeState, nor are you snapshotting the operator state in snapshotState). If you launch your new code from a recent savepoint, what do we expect to happen to the state? Do we anticipate the behaviour I explained above? My assumption would be that Flink would not read this state and so it would be removed from the next checkpoint or savepoint. Alternatively, I might assume it would not be read but would linger around every future checkpoint or savepoint. However, it feels like what is happening is it's not read and then possibly replicated by every instance of the task every time a checkpoint happens (hence the accidentally exponential behaviour). Thoughts? PS - in case someone asks: I was sure that we were calling `.clear()` appropriately in `snapshotState` (we, uh, already learned that lesson :D) Best, Aaron Levin