Hi Shamit, Yes, there are more possibilities when using the DataStream API like with the link you've included. You could also use the State Processor API [1] to read, write & modify your savepoint.
Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/ On Wed, 29 Dec 2021 at 22:57, shamit jain <jainsha...@gmail.com> wrote: > Thanks Martijn! > > One more question, Can we achieve(schema evolution) using DataStream APIs? > > In flink documentation, I found [2] "The process of migrating state to > adapt to changed schemas happens automatically, and independently for each > state. This process is performed internally by Flink by first checking if > the new serializer for the state has different serialization schema than > the previous serializer; if so, the previous serializer is used to read the > state to objects, and written back to bytes again with the new serializer." > > [2] > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/schema_evolution/#evolving-state-schema > > Regards, > Shamit Jain > > > > On Wed, Dec 29, 2021 at 5:09 AM Martijn Visser <mart...@ververica.com> > wrote: > >> Hi Shamit, >> >> Adding columns means that you're trying to perform schema evolution, >> which isn't yet supported by Flink per the documentation [1] "Savepoints >> are only supported if both the query and the Flink version remain constant" >> >> Best regards, >> >> Martijn >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution >> >> On Wed, 29 Dec 2021 at 04:10, shamit jain <jainsha...@gmail.com> wrote: >> >>> Hello Experts, >>> >>> >>> I need help to understand whether we can deploy a job from a snapshot >>> after changing the code by adding one new column in an existing table. >>> >>> We are using flink-1.13.2 table API and RocksDB as backend. We have >>> changed the code and added one new column in the existing table and later, >>> tried to deploy from the snapshot. While deploying, I'm getting the below >>> error:- >>> >>> >>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>> serializer >>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) >>> must not be incompatible with the old state serializer >>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf). >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) >>> at >>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>> at >>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>> ... >>> >>> >>> After troubleshooting, I found we are getting this error while comparing >>> the fields of previous and latest table definition. >>> >>> This comparision is happening from flink-table-runtime library class: >>> org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name: >>> resolveSchemaCompatibility() >>> >>> if (!Arrays.equals(previousTypes, newRowSerializer.types)) { >>> return TypeSerializerSchemaCompatibility.incompatible(); >>> } >>> >>> >>> Can you please help me to understand if we can add a new column in an >>> existing table and deploy from the snapshot? >>> >>> Regards, >>> Shamit Jain >>> >>