[ 
https://issues.apache.org/jira/browse/FLINK-35147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharon Xie updated FLINK-35147:
-------------------------------
    Description: 
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

1. run the below SQL 
{code:sql}
CREATE TABLE ltable (
    `id` integer primary key,
    `num` int
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test1'
);

CREATE TABLE rtable (
    `id` integer primary key,
    `ts` timestamp(3)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test2'
);

CREATE TABLE output (
    `id` integer primary key,
    `num` int,
    `ts` timestamp(3)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test3'
);

insert into
    `output`
select
    ltable.id,
    num,
    ts
from
    ltable
    join rtable on ltable.id = rtable.id
 {code}
 

2. Stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
    `id` integer primary key,
    – change one of the type below would cause the issue
    `num` bigint,
    `ts` timestamp(6)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test3'
);
{code}
3. Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png|width=911,height=352!

!image-2024-04-17-14-15-21-647.png|width=1172,height=458!

  was:
When a field type in the output table is changed from int -> bigint or 
timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
This is unexpected as the change is backward compatible. The new type should be 
able to "accept" all the old values that had narrower type. 

Note that the planner works fine and would accept such change. 

To reproduce

1. run the below SQL 
{code:sql}
CREATE TABLE ltable (
    `id` integer primary key,
    `num` int
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test1'
);

CREATE TABLE rtable (
    `id` integer primary key,
    `ts` timestamp(3)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test2'
);

CREATE TABLE output (
    `id` integer primary key,
    `num` int,
    `ts` timestamp(3)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test3'
);

insert into
    `output`
select
    ltable.id,
    num,
    ts
from
    ltable
    join rtable on ltable.id = rtable.id
 {code}
 

2. Stop with a savepoint, then update output table with 
{code:sql}
CREATE TABLE output (
    `id` integer primary key,
    – change one of the type below would cause the issue
    `num` bigint,
    `ts` timestamp(6)
) WITH (
    'connector' = 'upsert-kafka',
    'properties.bootstrap.servers' = 'kafka.test:9092',
    'key.format' = 'json',
    'value.format' = 'json',
    'topic' = 'test3'
);
{code}
3. Restart the job with the savepoint created 

Sample screenshots

!image-2024-04-17-14-15-35-297.png!

!image-2024-04-17-14-15-21-647.png!


> SinkMaterializer throws StateMigrationException when widening the field type 
> in the output table
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-35147
>                 URL: https://issues.apache.org/jira/browse/FLINK-35147
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: Sharon Xie
>            Priority: Major
>         Attachments: image-2024-04-17-14-15-21-647.png, 
> image-2024-04-17-14-15-35-297.png
>
>
> When a field type in the output table is changed from int -> bigint or 
> timestamp(3) -> timestamp(6), SinkMaterializer would fail to restore state. 
> This is unexpected as the change is backward compatible. The new type should 
> be able to "accept" all the old values that had narrower type. 
> Note that the planner works fine and would accept such change. 
> To reproduce
> 1. run the below SQL 
> {code:sql}
> CREATE TABLE ltable (
>     `id` integer primary key,
>     `num` int
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'properties.bootstrap.servers' = 'kafka.test:9092',
>     'key.format' = 'json',
>     'value.format' = 'json',
>     'topic' = 'test1'
> );
> CREATE TABLE rtable (
>     `id` integer primary key,
>     `ts` timestamp(3)
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'properties.bootstrap.servers' = 'kafka.test:9092',
>     'key.format' = 'json',
>     'value.format' = 'json',
>     'topic' = 'test2'
> );
> CREATE TABLE output (
>     `id` integer primary key,
>     `num` int,
>     `ts` timestamp(3)
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'properties.bootstrap.servers' = 'kafka.test:9092',
>     'key.format' = 'json',
>     'value.format' = 'json',
>     'topic' = 'test3'
> );
> insert into
>     `output`
> select
>     ltable.id,
>     num,
>     ts
> from
>     ltable
>     join rtable on ltable.id = rtable.id
>  {code}
>  
> 2. Stop with a savepoint, then update output table with 
> {code:sql}
> CREATE TABLE output (
>     `id` integer primary key,
>     – change one of the type below would cause the issue
>     `num` bigint,
>     `ts` timestamp(6)
> ) WITH (
>     'connector' = 'upsert-kafka',
>     'properties.bootstrap.servers' = 'kafka.test:9092',
>     'key.format' = 'json',
>     'value.format' = 'json',
>     'topic' = 'test3'
> );
> {code}
> 3. Restart the job with the savepoint created 
> Sample screenshots
> !image-2024-04-17-14-15-35-297.png|width=911,height=352!
> !image-2024-04-17-14-15-21-647.png|width=1172,height=458!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to