Hi Shammon,

Schema evolution works with avro type state, and Flink Table API uses
RowData and has a serializer (RowDataSerializer) for it which doesn't allow
change in column structure. Regarding state processor api, we are not
creating any state in our service, we simply use Flink sql as a blackbox
and let it handle the state. We simply create sql tables out of avro schema
in StreamTableEnvironment and run queries on those sql tables by creating
StatementSet and calling the execute() on it.

I found flink doc on upgrade and evolution [1] and according to the doc it
is not possible to achieve this.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution

Cheers
Ashish

On Tue, Mar 21, 2023 at 1:51 AM Shammon FY <zjur...@gmail.com> wrote:

> Hi Ashish
>
> State compatibility is a complex issue, and you can review the state
> evolution [1] and state processor [2] docs to see if there's a solution for
> your problem.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> Best,
> Shammon FY
>
>
> On Fri, Mar 17, 2023 at 8:48 PM Ashish Khatkar via user <
> user@flink.apache.org> wrote:
>
>> Hi all,
>>
>> I need help in understanding if we can add columns with defaults, let's
>> say NULL to the existing table and recover the job from the savepoint.
>>
>> We are using flink-1.16.0 table API and RocksDB as backend to provide a
>> service to our users to run sql queries. The tables are created using the
>> avro schema and when the schema is changed in a compatible manner i.e
>> adding a field with default, we are unable to recover the job from the
>> savepoint. This is the error we get after the schema is upgraded.
>>
>> Caused by: org.apache.flink.util.StateMigrationException: The new state 
>> serializer 
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@aad5b03a) must 
>> not be incompatible with the old state serializer 
>> (org.apache.flink.table.runtime.typeutils.RowDataSerializer@9d089984).
>>
>> We tried to debug the issue and this error originates from
>>
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.RowDataSerializerSnapshot
>>  -> resolveSchemaCompatibility line 343:345
>>
>> which checks the length of the type array and also the logicalType for
>> each element or you can say columns.
>>
>> Is there a way to restore and evolve a table using table-api when the
>> avro schema evolves in a compatible manner? If not, is there any plan to
>> provide upgrades and evolutions with table apis?
>>
>> Cheers,
>> Ashish Khatkar
>>
>

Reply via email to