[ 
https://issues.apache.org/jira/browse/FLINK-33608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-33608:
-------------------------------
    Description: 
UpsertTestDynamicTableSink's keySerializationSchama should extract and 
serialize the primary key fields from RowData.

{code:sql}
CREATE TABLE UpsertFileSinkTable (
  user_id INT,
  user_name STRING,
  user_count BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-files',
  'key.format' = 'json',
  'value.format' = 'json',
  'output-filepath' = '..'
);


INSERT INTO UpsertFileSinkTable
SELECT user_id, user_name, COUNT(*) AS user_count
FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), 
(1, 'Bob'))
  AS UserCountTable(user_id, user_name)
GROUP BY user_id, user_name;
{code}

Results:
|| Key || Value ||
| {"user_id":1,"user_name":"Bob","user_count":2} | 
{"user_id":1,"user_name":"Bob","user_count":2} |
| {"user_id":22,"user_name":"Tom","user_count":1} | 
{"user_id":22,"user_name":"Tom","user_count":1} |
| {"user_id":42,"user_name":"Kim","user_count":3} | 
{"user_id":42,"user_name":"Kim","user_count":3} |

Expected:
| {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} |
| {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} |
| {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} |


  was:UpsertTestDynamicTableSink's keySerializationSchama should extract and 
serialize the primary key fields from RowData.


> UpsertTestDynamicTableSink's keySerializationSchama should extract and 
> serialize the primary key fields from RowData
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33608
>                 URL: https://issues.apache.org/jira/browse/FLINK-33608
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Common
>    Affects Versions: 1.18.0
>            Reporter: Jiabao Sun
>            Priority: Major
>              Labels: pull-request-available
>
> UpsertTestDynamicTableSink's keySerializationSchama should extract and 
> serialize the primary key fields from RowData.
> {code:sql}
> CREATE TABLE UpsertFileSinkTable (
>   user_id INT,
>   user_name STRING,
>   user_count BIGINT,
>   PRIMARY KEY (user_id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-files',
>   'key.format' = 'json',
>   'value.format' = 'json',
>   'output-filepath' = '..'
> );
> INSERT INTO UpsertFileSinkTable
> SELECT user_id, user_name, COUNT(*) AS user_count
> FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), 
> (1, 'Bob'))
>   AS UserCountTable(user_id, user_name)
> GROUP BY user_id, user_name;
> {code}
> Results:
> || Key || Value ||
> | {"user_id":1,"user_name":"Bob","user_count":2} | 
> {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22,"user_name":"Tom","user_count":1} | 
> {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42,"user_name":"Kim","user_count":3} | 
> {"user_id":42,"user_name":"Kim","user_count":3} |
> Expected:
> | {"user_id":1} | {"user_id":1,"user_name":"Bob","user_count":2} |
> | {"user_id":22} | {"user_id":22,"user_name":"Tom","user_count":1} |
> | {"user_id":42} | {"user_id":42,"user_name":"Kim","user_count":3} |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to