[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455065#comment-17455065
 ] 

Jingsong Lee commented on FLINK-20370:
--

Fixed via:

 

part1: Fix wrong results when sink primary key is not the same with query 
result's changelog upsert key

master: f8f6935adc841529ecdc0636174650cffbf73719

release-1.14: 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400

 

part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's 
pk if parallelism are not the same for insertOnly input

master: 9e76585f1fa110288604913a73d86ac2f1542777

 

part2 does not cherry-pick to 1.14 because it may affect the normal plan and 
lead to incompatibility.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-07 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454921#comment-17454921
 ] 

lincoln lee commented on FLINK-20370:
-

I've created the cherry pick: [https://github.com/apache/flink/pull/18048]  
[~lzljs3620320] would you help to check it?

 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-07 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454467#comment-17454467
 ] 

Jingsong Lee commented on FLINK-20370:
--

[~MartijnVisser] Got it, I will do it asap.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-06 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454447#comment-17454447
 ] 

Martijn Visser commented on FLINK-20370:


Given the impact, I think it makes sense to wait with the 1.14.1 release until 
this is backported, what do you think [~lzljs3620320] [~twalthr] ? If so, it 
would be good to get a fix in for 1.14 asap given that we want to start 
releasing that soon. 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-06 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454329#comment-17454329
 ] 

Jingsong Lee commented on FLINK-20370:
--

> A user just needs to set `upsert-materialize` to `NONE`?

Yes. With the popularity of cdc and hudi, etc. I am +1 to back port.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-06 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453964#comment-17453964
 ] 

Timo Walther commented on FLINK-20370:
--

Given that many people still wait for 1.14.1 and don't upgrade to a 1.XX.0 
release. I think it is still reasonable to perform the change at least for the 
1.14 branch. A user just needs to set `upsert-materialize` to `NONE`?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-05 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453752#comment-17453752
 ] 

Jingsong Lee commented on FLINK-20370:
--

Hi [~twalthr] 

We have evaluated the impact of this fix and it does affect the compatibility 
of some correct plans. For example the plan:
 * generates changes
 * No upsert keys
 * but it has no problems with the order

This situation generates materialized node resulting in incompatibility.

If we agree with the following assumptions:
 * This incompatibility case is very rare
 * When this incompatibility occurs, the user can maintain compatibility by 
configuring option

We can consider cherry-pick.

What do you think? [~lincoln.86xy] 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-12-03 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453124#comment-17453124
 ] 

Timo Walther commented on FLINK-20370:
--

[~lzljs3620320] are we planning to back port the change to 1.13 and 1.14? I 
know already one user that would benefit from a 1.14 back port.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-30 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450999#comment-17450999
 ] 

lincoln lee commented on FLINK-20370:
-

After discussed with [~wenlong.lwl]  offline and confirm the code,  it is safe 
for TransformationSinkProvider.
For the DataStreamSinkProvider which is designed for advanced connector 
developers, it's necessary to pay attention to how changes are shuffled to not 
mess up the changelog per parallel subtask.

 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-29 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450880#comment-17450880
 ] 

Wenlong Lyu commented on FLINK-20370:
-

[~lincoln.86xy] Even sink implements ParallelismProvider, parallelism can still 
be undefined, because it returns an optional value.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-29 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450263#comment-17450263
 ] 

lincoln lee commented on FLINK-20370:
-

[~wenlong.lwl]  After look into the `TransformationSinkProvider` and 
`DataStreamSinkProvider`, I found it's unnecessary to add `keyby` because
1. `DataStreamSinkProvider` implements the `ParallelismProvider` interface so 
that its parallelism can be detected.
2. the internal `TransformationSinkProvider` is used for `ExternalDynamicSink`, 
when convert to `datastream` it will apply the input's parallelism so it always 
the same as the input.

pls correct me if I'm wrong.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-22 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447729#comment-17447729
 ] 

lincoln lee commented on FLINK-20370:
-

[~wenlong.lwl]  Good question!  I think it‘s safe adding ‘keyby’ by default for 
the two kinds of sink if we can't get parallelism (users can choose to turn it 
off explicitly).

What do you think?  [~twalthr]  [~lzljs3620320] 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447367#comment-17447367
 ] 

Wenlong Lyu commented on FLINK-20370:
-

hi, [~lincoln.86xy], what do you think we should provide when the sink is a 
DataStreamSinkProvider/TransformationSinkProvider? In such cases, we have no 
idea about the parallelism of sink, I think this is the case missed in the 
summary.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-22 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447239#comment-17447239
 ] 

lincoln lee commented on FLINK-20370:
-

According to Jingsong's review comments, I've split the change into two. The 
1st part solve the 1.2 & 1.3 above, and a following one address the 2.2 which 
will introduce a config option 'table.exec.sink.pk-shuffle'.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-10 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441679#comment-17441679
 ] 

lincoln lee commented on FLINK-20370:
-

Thanks for all your inputs!
Based on Jingson's summary and the discussion, let me try to summarize, please 
correct me if I'm wrong

An upsert sink can accept such inputs:
1. input is changelog stream 
1.1 primary key = upsert key, it's already ok 
1.2 primary key != upsert key (upsert key can be none), we should add 
upsertMaterialize
1.3 primary key contains upsert key, upsertMaterialize can be committed but 
sink requires update_before

2. input is append stream
2.1 sink has same parallelism with the upstream operator, it's ok
2.2 sink's parallelism differs from the upstream operator, we should add a 
'keyby' for the primary key by default

The current pr already addressed 1.2 & 1.3, so remaining the 2.2 to be done. 
The fix is simple, but for the sake of be configurable, 
we should introduce another job level config option similar to 
'table.exec.sink.upsert-materialize' (since FLINK-24254 fine-grained setting 
per INSERT INTO not ready now) 
I temporally name it 'table.exec.sink.pk-shuffle',  and updated the pr, welcome 
your suggestions.

cc [~twalthr] [~lzljs3620320] [~wenlong.lwl] 

 

 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-08 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440534#comment-17440534
 ] 

Timo Walther commented on FLINK-20370:
--

[~wenlong.lwl] I guess for Kafka you are right. Kafka is doing the partitioning 
for us already with the default sink partitioner and given Kafka key. The only 
missing case is if you do a {{StreamTableEnvironment.toChangelogStream}} with a 
declared primary key, the user needs to take case of the partitioning in this 
case. But that might be acceptable. 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-07 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440153#comment-17440153
 ] 

Wenlong Lyu commented on FLINK-20370:
-

[~twalthr] thanks for the explanation. 
However, I think when the sink is kafka with pk(I would assume that it is a 
upsert kafka with key format provided), the target Kafka partition of a record 
is decided by the key generated. event it is written at different subtask. 
In such case, I think adding a key-by can help keep the order of record with 
the same uid, only when the input is already partitioned by uid. Is this case 
is what you want to solve?  

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-07 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440150#comment-17440150
 ] 

Jingsong Lee commented on FLINK-20370:
--

[~twalthr] Kafka partitioning should be another matter, depending on the user's 
connector option, for example, using upsert-kafka or `sink.partitioner`.
In my opinion, if the parallelism changes, such as the parallelism of source 
and sink is different, the final result will be random, which may not meet the 
user's expectations. So the premise is to change parallelism.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-05 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439361#comment-17439361
 ] 

Timo Walther commented on FLINK-20370:
--

[~wenlong.lwl]

bq. If the input of sink actually has the same pk with sink, and is 
insert-only, I think there is no distribution disorder.

That is correct.

But let's assume the following schema:

{code}
CREATE TABLE A (uid INT, name STRING)

CREATE TABLE B (uid INT, name STRING, PRIMARY KEY(uid))

INSERT INTO B SELECT * FROM A;
{code}

In databases this query is totally fine because partitions don't exist. Please 
correct me if I'm wrong but in Flink it could cause issues, because we don't 
shuffle on uid here and different uid could end up in different Kafka 
partitions in the end.


> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-05 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439220#comment-17439220
 ] 

Wenlong Lyu commented on FLINK-20370:
-

hi, [~twalthr], could you explain more about case 2, what should we guarantee 
semantically in this case? If the input of sink actually has the samepk 
with sink, and is insert-only, I think there is no distribution disorder. If 
the input of sink doesn't have the same pk with sink, and is insert-only, I 
have no idea what guarantee can be provided by adding a key-by?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-05 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439135#comment-17439135
 ] 

lincoln lee commented on FLINK-20370:
-

After discussed with [~jark] and [~lzljs3620320] , we found that 
`upsertMaterialization` cannot solve this problem.
The root cause is `FlinkChangelogModeInferenceProgram` didn't consider  the 
case when sink's primary key differs from input's upsert keys when infer 
required `ChangeLogMode` from sink node.
I'll update the pr later.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-26 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420489#comment-17420489
 ] 

Jingsong Lee commented on FLINK-20370:
--

4.primary key contains unique key , in this case, we can just convert 
UPDATE_BEFORE/UPDATE_AFTER to DELETE/INSERT. This is a optimization for 
UpsertMaterialize.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Critical
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-10 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413207#comment-17413207
 ] 

Timo Walther commented on FLINK-20370:
--

I opened FLINK-24254. Any opinion to my other suggestion above for the 3 use 
cases?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-09 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412518#comment-17412518
 ] 

Jingsong Lee commented on FLINK-20370:
--

hints +1

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412497#comment-17412497
 ] 

Timo Walther commented on FLINK-20370:
--

Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should 
end up in the same partition. Distributed disorder should only be allowed if 
the sink does not declare a PRIMARY KEY. We should aim for correctness and add 
a keyBy for case 1 and enable UpsertMaterialize for case 2. Of course, this can 
be disabled if necessary.

I mentioned it in a different issue already, but we should also introduce hints 
that allow those important settings for sources fine-grained per INSERT INTO. I 
will open an issue for that if it doesn't exist yet.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412323#comment-17412323
 ] 

Jingsong Lee commented on FLINK-20370:
--

[~jark] I updated cases.
I think UpsertMaterialize can works. For the third case, we can add 
UpsertMaterialize, but sometimes it may be redundant.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412318#comment-17412318
 ] 

Jark Wu commented on FLINK-20370:
-

[~lzljs3620320] there is 4th case: input is changelog, but there is no unique 
key. 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412317#comment-17412317
 ] 

Jark Wu commented on FLINK-20370:
-

[~lzljs3620320] could UpsertMaterialize also solve this problem?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412302#comment-17412302
 ] 

Jingsong Lee commented on FLINK-20370:
--

Yes, it seems that the only solution is single parallelism.

A upsert sink can accept inputs:
# primary key = unique key, it is OK
# input is append only, sometimes is OK
# primary key != unique key, the most problematic situation

Maybe the third can be disabled. But maybe it will make many situations 
difficult to use. We need a flag...


> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-09-08 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411971#comment-17411971
 ] 

Timo Walther commented on FLINK-20370:
--

[~lzljs3620320] [~jark] What is the current status of this issue? As far as I 
understand it, it can still occur that inferred unique key != primary key of 
the sink, right? Why don't we add a check that can be disabled with a flag?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336005#comment-17336005
 ] 

Flink Jira Bot commented on FLINK-20370:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>  Labels: stale-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327508#comment-17327508
 ] 

Flink Jira Bot commented on FLINK-20370:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>  Labels: stale-major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2020-11-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239147#comment-17239147
 ] 

Jark Wu commented on FLINK-20370:
-

[~fsk119], no, this will not be considered, because it has the same semantic of 
batch query. 
You can run your above code in MySQL, and you will get the same result. 
However, running the example of JIRA description in MySQL, will get a different 
result. 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2020-11-26 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239139#comment-17239139
 ] 

Shengkai Fang commented on FLINK-20370:
---

I am not sure whether this Jira will take the example below into the 
consideration.

Example:
{code:java}
CREATE TABLE WordCountSource (
  word STRING,
  cnt INT,
  PRIMARY KEY (word) NOT ENFORCED
) WITH (
  ...
)

CREATE TABLE WordCountSink (
  word STRING,
  cnt INT,
  PRIMARY KEY (cnt) NOT ENFORCED
) WITH (
  ...
)

INSERT INTO WordCountSink SELECT * FROM  WordCountSource;
{code}
If we have two records {{('a', 1), ('b', 1)}}, the rowkind of the record 
{{('b', 1)}} is insert for souce but it is update for sink.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2020-11-26 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239123#comment-17239123
 ] 

Jark Wu commented on FLINK-20370:
-

I think this is not a trivial work. And the problems are not only sit in Join, 
but also other operations (e.g. TopN, Aggregation). 
We should figure out a comprehensive solution for this. For example, do not 
ignore UPDATE_BEFORE when PK is not equal ? 

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Priority: Major
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)