[ https://issues.apache.org/jira/browse/FLINK-33608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiabao Sun updated FLINK-33608: ------------------------------- Description: UpsertTestDynamicTableSink's keySerializationSchama should extract and serialize the primary key fields from RowData. {code:sql} CREATE TABLE UpsertFileSinkTable ( user_id INT, user_name STRING, user_count BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-files', 'key.format' = 'json', 'value.format' = 'json', 'output-filepath' = '..' ); INSERT INTO UpsertFileSinkTable SELECT user_id, user_name, COUNT(*) AS user_count FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) AS UserCountTable(user_id, user_name) GROUP BY user_id, user_name; {code} Results: || Key || Value || | {"user_id":1,"user_name":"Bob","user_count":2} | {"user_id":1,"user_name":"Bob","user_count":2} | | {"user_id":22,"user_name":"Tom","user_count":1} | {"user_id":22,"user_name":"Tom","user_count":1} | | {"user_id":42,"user_name":"Kim","user_count":3} | {"user_id":42,"user_name":"Kim","user_count":3} | Expected: | {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} | | {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} | | {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} | was:UpsertTestDynamicTableSink's keySerializationSchama should extract and serialize the primary key fields from RowData. > UpsertTestDynamicTableSink's keySerializationSchama should extract and > serialize the primary key fields from RowData > -------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-33608 > URL: https://issues.apache.org/jira/browse/FLINK-33608 > Project: Flink > Issue Type: Improvement > Components: Connectors / Common > Affects Versions: 1.18.0 > Reporter: Jiabao Sun > Priority: Major > Labels: pull-request-available > > UpsertTestDynamicTableSink's keySerializationSchama should extract and > serialize the primary key fields from RowData. > {code:sql} > CREATE TABLE UpsertFileSinkTable ( > user_id INT, > user_name STRING, > user_count BIGINT, > PRIMARY KEY (user_id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-files', > 'key.format' = 'json', > 'value.format' = 'json', > 'output-filepath' = '..' > ); > INSERT INTO UpsertFileSinkTable > SELECT user_id, user_name, COUNT(*) AS user_count > FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), > (1, 'Bob')) > AS UserCountTable(user_id, user_name) > GROUP BY user_id, user_name; > {code} > Results: > || Key || Value || > | {"user_id":1,"user_name":"Bob","user_count":2} | > {"user_id":1,"user_name":"Bob","user_count":2} | > | {"user_id":22,"user_name":"Tom","user_count":1} | > {"user_id":22,"user_name":"Tom","user_count":1} | > | {"user_id":42,"user_name":"Kim","user_count":3} | > {"user_id":42,"user_name":"Kim","user_count":3} | > Expected: > | {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} | > | {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} | > | {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} | -- This message was sent by Atlassian Jira (v8.20.10#820010)