Hi Shamit,

Yes, there are more possibilities when using the DataStream API like with
the link you've included. You could also use the State Processor API [1] to
read, write & modify your savepoint.

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

On Wed, 29 Dec 2021 at 22:57, shamit jain <jainsha...@gmail.com> wrote:

> Thanks Martijn!
>
> One more question, Can we achieve(schema evolution) using DataStream APIs?
>
> In flink documentation, I found [2] "The process of migrating state to
> adapt to changed schemas happens automatically, and independently for each
> state. This process is performed internally by Flink by first checking if
> the new serializer for the state has different serialization schema than
> the previous serializer; if so, the previous serializer is used to read the
> state to objects, and written back to bytes again with the new serializer."
>
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema
>
> Regards,
> Shamit Jain
>
>
>
> On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <mart...@ververica.com>
> wrote:
>
>> Hi Shamit,
>>
>> Adding columns means that you're trying to perform schema evolution,
>> which isn't yet supported by Flink per the documentation [1] "Savepoints
>> are only supported if both the query and the Flink version remain constant"
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>>
>> On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote:
>>
>>> Hello Experts,
>>>
>>>
>>> I need help to understand whether we can deploy a job from a snapshot
>>> after changing the code by adding one new column in an existing table.
>>>
>>> We are using flink-1.13.2 table API and RocksDB as backend. We have
>>> changed the code and added one new column in the existing table and later,
>>> tried to deploy from the snapshot. While deploying, I'm getting the below
>>> error:-
>>>
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>> serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336)
>>> must not be incompatible with the old state serializer
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>>>    at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>>>    at
>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>    at
>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
>>>    at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
>>>    at
>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
>>>    at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>    at
>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>    ...
>>>
>>>
>>> After troubleshooting, I found we are getting this error while comparing
>>> the fields of previous and latest table definition.
>>>
>>> This comparision is happening from flink-table-runtime library  class:
>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
>>> resolveSchemaCompatibility()
>>>
>>>    if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
>>>                 return TypeSerializerSchemaCompatibility.incompatible();
>>>             }
>>>
>>>
>>> Can you please help me to understand if we can add a new column in an
>>> existing table and deploy from the snapshot?
>>>
>>> Regards,
>>> Shamit Jain
>>>
>>

Reply via email to