You could consider trying out the experimental version upgrade that was
introduced as part of FLIP-190: https://cwiki.apache.org/confluence/x/KZBnCw

On Tue, Mar 21, 2023 at 12:11 PM Ashish Khatkar via user <
user@flink.apache.org> wrote:

> Hi Shammon,
>
> Schema evolution works with avro type state, and Flink Table API uses
> RowData and has a serializer (RowDataSerializer) for it which doesn't
> allow change in column structure. Regarding state processor api, we are not
> creating any state in our service, we simply use Flink sql as a blackbox
> and let it handle the state. We simply create sql tables out of avro schema
> in StreamTableEnvironment and run queries on those sql tables by creating
> StatementSet and calling the execute() on it.
>
> I found flink doc on upgrade and evolution [1] and according to the doc it
> is not possible to achieve this.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
>
> Cheers
> Ashish
>
> On Tue, Mar 21, 2023 at 1:51 AM Shammon FY <zjur...@gmail.com> wrote:
>
>> Hi Ashish
>>
>> State compatibility is a complex issue, and you can review the state
>> evolution [1] and state processor [2] docs to see if there's a solution for
>> your problem.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>>
>> Best,
>> Shammon FY
>>
>>
>> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi all,
>>>
>>> I need help in understanding if we can add columns with defaults, let's
>>> say NULL to the existing table and recover the job from the savepoint.
>>>
>>> We are using flink-1.16.0 table API and RocksDB as backend to provide a
>>> service to our users to run sql queries. The tables are created using the
>>> avro schema and when the schema is changed in a compatible manner i.e
>>> adding a field with default, we are unable to recover the job from the
>>> savepoint. This is the error we get after the schema is upgraded.
>>>
>>> Caused by: org.apache.flink.util.StateMigrationException: The new state 
>>> serializer 
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must 
>>> not be incompatible with the old state serializer 
>>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>>>
>>> We tried to debug the issue and this error originates from
>>>
>>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot
>>>  -> resolveSchemaCompatibility line 343:345
>>>
>>> which checks the length of the type array and also the logicalType for
>>> each element or you can say columns.
>>>
>>> Is there a way to restore and evolve a table using table-api when the
>>> avro schema evolves in a compatible manner? If not, is there any plan to
>>> provide upgrades and evolutions with table apis?
>>>
>>> Cheers,
>>> Ashish Khatkar
>>>
>>

Reply via email to