Hi,

Yes, we're using UNION state. I would assume, though, that if you are
not reading the UNION state it would either stop stick around as a
constant factor in your state size, or get cleared.

Looks like I should try to recreate a small example and submit a bug
if this is true. Otherwise it's impossible to remove union state from
your operators.

On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu <qcx978132...@gmail.com> wrote:
>
> Hi
>
> Do you use UNION state in your scenario, when using UNION state, then JM may 
> encounter OOM because each TDD will contains all the state of all subtasks[1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> Best,
> Congxian
>
>
> Aaron Levin <aaronle...@stripe.com> 于2019年11月27日周三 上午3:55写道:
>>
>> Hi,
>>
>> Some context: after a refactoring, we were unable to start our jobs.
>> They started fine and checkpointed fine, but once the job restarted
>> owing to a transient failure, the application was unable to start. The
>> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> the `_metadata` file we saw `- 1402496 offsets:
>> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> to be the operator state we were no longer initializing or
>> snapshotting after the refactoring.
>>
>> Before I dig further into this and try to find a smaller reproducible
>> test case I thought I would ask if someone knows what the expected
>> behaviour is for the following scenario:
>>
>> suppose you have an operator (in this case a Source) which has some
>> operator ListState. Suppose you run your flink job for some time and
>> then later refactor your job such that you no longer use that state
>> (so after the refactoring you're no longer initializing this operator
>> state in initializeState, nor are you snapshotting the operator state
>> in snapshotState). If you launch your new code from a recent
>> savepoint, what do we expect to happen to the state? Do we anticipate
>> the behaviour I explained above?
>>
>> My assumption would be that Flink would not read this state and so it
>> would be removed from the next checkpoint or savepoint. Alternatively,
>> I might assume it would not be read but would linger around every
>> future checkpoint or savepoint. However, it feels like what is
>> happening is it's not read and then possibly replicated by every
>> instance of the task every time a checkpoint happens (hence the
>> accidentally exponential behaviour).
>>
>> Thoughts?
>>
>> PS - in case someone asks: I was sure that we were calling `.clear()`
>> appropriately in `snapshotState` (we, uh, already learned that lesson
>> :D)
>>
>> Best,
>>
>> Aaron Levin

Reply via email to