Re: Is it possible to do state migration with checkpoints?

2020-07-23 文章 Sivaprasanna
Adding dev@ to get some traction. Any help would be greatly appreciated.

Thanks.

On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna 
wrote:

> +user-zh@flink.apache.org 
>
> A follow up question. I tried taking a savepoint but the job failed
> immediately. It happens everytime I take a savepoint. The job is running on
> a Yarn cluster so it fails with "container running out of memory". The
> state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
> refer to the screenshot below). The job is running with 2GB task manager
> heap & 2GB task manager managed memory. I increased the managed memory to
> 6GB assuming the failure has something to do with RocksDB but it failed
> even with 6GB managed memory. I guess I am missing on some configurations.
> Can you folks please help me with this?
>
> [image: Screenshot 2020-07-23 at 10.34.29 AM.png]
>
> On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
> wrote:
>
>> Hi,
>>
>> We are trying out state schema migration for one of our stateful
>> pipelines. We use few Avro type states. Changes made to the job:
>> 1. Updated the schema for one of the states (added a new 'boolean'
>> field with default value).
>> 2. Modified the code by removing a couple of ValueStates.
>>
>> To push these changes, I stopped the live job and resubmitted the new jar
>> with the latest *checkpoint* path. However, the job failed with the
>> following error:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>> ...
>> ...
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer cannot be incompatible.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>
>> I was going through the state schema evolution doc. The document mentions
>> that we need to take a *savepoint* and restart the job with the savepoint
>> path. We are using RocksDB backend with incremental checkpoint enabled. Can
>> we not use the latest checkpoint available when we are dealing with state
>> schema changes?
>>
>> Complete stacktrace is attached with this mail.
>>
>> -
>> Sivaprasanna
>>
>


Re: Is it possible to do state migration with checkpoints?

2020-07-22 文章 Sivaprasanna
+user-zh@flink.apache.org 

A follow up question. I tried taking a savepoint but the job failed
immediately. It happens everytime I take a savepoint. The job is running on
a Yarn cluster so it fails with "container running out of memory". The
state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
refer to the screenshot below). The job is running with 2GB task manager
heap & 2GB task manager managed memory. I increased the managed memory to
6GB assuming the failure has something to do with RocksDB but it failed
even with 6GB managed memory. I guess I am missing on some configurations.
Can you folks please help me with this?

[image: Screenshot 2020-07-23 at 10.34.29 AM.png]

On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
wrote:

> Hi,
>
> We are trying out state schema migration for one of our stateful
> pipelines. We use few Avro type states. Changes made to the job:
> 1. Updated the schema for one of the states (added a new 'boolean'
> field with default value).
> 2. Modified the code by removing a couple of ValueStates.
>
> To push these changes, I stopped the live job and resubmitted the new jar
> with the latest *checkpoint* path. However, the job failed with the
> following error:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> ...
> ...
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>
> I was going through the state schema evolution doc. The document mentions
> that we need to take a *savepoint* and restart the job with the savepoint
> path. We are using RocksDB backend with incremental checkpoint enabled. Can
> we not use the latest checkpoint available when we are dealing with state
> schema changes?
>
> Complete stacktrace is attached with this mail.
>
> -
> Sivaprasanna
>