Sharon Xie created FLINK-35147:
----------------------------------
Summary: SinkMaterializer throws StateMigrationException when
widening the field type in the output table
Key: FLINK-35147
URL: https://issues.apache.org/jira/browse/FLINK-35147
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Reporter: Sharon Xie
Attachments: image-2024-04-17-14-15-21-647.png,
image-2024-04-17-14-15-35-297.png
When a field type in the output table is changed from int -> bigint or
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state.
This is unexpected as the change is backward compatible. The new type should be
able to "accept" all the old values that had narrower type.
Note that the planner works fine and would accept such change.
To reproduce
```
CREATE TABLE ltable (
`id` integer primary key,
`num` int
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test1'
);
CREATE TABLE rtable (
`id` integer primary key,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test2'
);
CREATE TABLE output (
`id` integer primary key,
`num` int,
`ts` timestamp(3)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
insert into
`output`
select
ltable.id,
num,
ts
from
ltable
join rtable on ltable.id = rtable.id
```
Run it, stop with a savepoint, then update output table with
```
CREATE TABLE output (
`id` integer primary key,
-- change one of the type below would cause the issue
`num` bigint,
`ts` timestamp(6)
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = 'kafka.test:9092',
'key.format' = 'json',
'value.format' = 'json',
'topic' = 'test3'
);
```
Restart the job with the savepoint created
Sample screenshots
!image-2024-04-17-14-15-35-297.png!
!image-2024-04-17-14-15-21-647.png!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)