Thanks Martijn!

One more question, Can we achieve(schema evolution) using DataStream APIs?

In flink documentation, I found [2] "The process of migrating state to
adapt to changed schemas happens automatically, and independently for each
state. This process is performed internally by Flink by first checking if
the new serializer for the state has different serialization schema than
the previous serializer; if so, the previous serializer is used to read the
state to objects, and written back to bytes again with the new serializer."

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema

Regards,
Shamit Jain



On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <mart...@ververica.com>
wrote:

> Hi Shamit,
>
> Adding columns means that you're trying to perform schema evolution, which
> isn't yet supported by Flink per the documentation [1] "Savepoints are only
> supported if both the query and the Flink version remain constant"
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>
> On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote:
>
>> Hello Experts,
>>
>>
>> I need help to understand whether we can deploy a job from a snapshot
>> after changing the code by adding one new column in an existing table.
>>
>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>> changed the code and added one new column in the existing table and later,
>> tried to deploy from the snapshot. While deploying, I'm getting the below
>> error:-
>>
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>> must not be incompatible with the old state serializer
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>    at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>    at
>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>    at
>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>    at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>    at
>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>    at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>    at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>    ...
>>
>>
>> After troubleshooting, I found we are getting this error while comparing
>> the fields of previous and latest table definition.
>>
>> This comparision is happening from flink-table-runtime library  class:
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>> resolveSchemaCompatibility()
>>
>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>             }
>>
>>
>> Can you please help me to understand if we can add a new column in an
>> existing table and deploy from the snapshot?
>>
>> Regards,
>> Shamit Jain
>>
>

Reply via email to