You could consider trying out the experimental version upgrade that was introduced as part of FLIP-190: https://cwiki.apache.org/confluence/x/KZBnCw
On Tue, Mar 21, 2023 at 12:11 PM Ashish Khatkar via user < user@flink.apache.org> wrote: > Hi Shammon, > > Schema evolution works with avro type state, and Flink Table API uses > RowData and has a serializer (RowDataSerializer) for it which doesn't > allow change in column structure. Regarding state processor api, we are not > creating any state in our service, we simply use Flink sql as a blackbox > and let it handle the state. We simply create sql tables out of avro schema > in StreamTableEnvironment and run queries on those sql tables by creating > StatementSet and calling the execute() on it. > > I found flink doc on upgrade and evolution [1] and according to the doc it > is not possible to achieve this. > > [1] > https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution > > Cheers > Ashish > > On Tue, Mar 21, 2023 at 1:51 AM Shammon FY <zjur...@gmail.com> wrote: > >> Hi Ashish >> >> State compatibility is a complex issue, and you can review the state >> evolution [1] and state processor [2] docs to see if there's a solution for >> your problem. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/ >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ >> >> Best, >> Shammon FY >> >> >> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user < >> user@flink.apache.org> wrote: >> >>> Hi all, >>> >>> I need help in understanding if we can add columns with defaults, let's >>> say NULL to the existing table and recover the job from the savepoint. >>> >>> We are using flink-1.16.0 table API and RocksDB as backend to provide a >>> service to our users to run sql queries. The tables are created using the >>> avro schema and when the schema is changed in a compatible manner i.e >>> adding a field with default, we are unable to recover the job from the >>> savepoint. This is the error we get after the schema is upgraded. >>> >>> Caused by: org.apache.flink.util.StateMigrationException: The new state >>> serializer >>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must >>> not be incompatible with the old state serializer >>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984). >>> >>> We tried to debug the issue and this error originates from >>> >>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot >>> -> resolveSchemaCompatibility line 343:345 >>> >>> which checks the length of the type array and also the logicalType for >>> each element or you can say columns. >>> >>> Is there a way to restore and evolve a table using table-api when the >>> avro schema evolves in a compatible manner? If not, is there any plan to >>> provide upgrades and evolutions with table apis? >>> >>> Cheers, >>> Ashish Khatkar >>> >>