[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17455065#comment-17455065 ] Jingsong Lee commented on FLINK-20370: -- Fixed via: part1: Fix wrong results when sink primary key is not the same with query result's changelog upsert key master: f8f6935adc841529ecdc0636174650cffbf73719 release-1.14: 7c5ddbd201005e55ab68b4db7ee74c7cbeb13400 part2: introduce 'table.exec.sink.keyed-shuffle' option to auto keyby on sink's pk if parallelism are not the same for insertOnly input master: 9e76585f1fa110288604913a73d86ac2f1542777 part2 does not cherry-pick to 1.14 because it may affect the normal plan and lead to incompatibility. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454921#comment-17454921 ] lincoln lee commented on FLINK-20370: - I've created the cherry pick: [https://github.com/apache/flink/pull/18048] [~lzljs3620320] would you help to check it? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454467#comment-17454467 ] Jingsong Lee commented on FLINK-20370: -- [~MartijnVisser] Got it, I will do it asap. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0, 1.14.1 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454447#comment-17454447 ] Martijn Visser commented on FLINK-20370: Given the impact, I think it makes sense to wait with the 1.14.1 release until this is backported, what do you think [~lzljs3620320] [~twalthr] ? If so, it would be good to get a fix in for 1.14 asap given that we want to start releasing that soon. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454329#comment-17454329 ] Jingsong Lee commented on FLINK-20370: -- > A user just needs to set `upsert-materialize` to `NONE`? Yes. With the popularity of cdc and hudi, etc. I am +1 to back port. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453964#comment-17453964 ] Timo Walther commented on FLINK-20370: -- Given that many people still wait for 1.14.1 and don't upgrade to a 1.XX.0 release. I think it is still reasonable to perform the change at least for the 1.14 branch. A user just needs to set `upsert-materialize` to `NONE`? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453752#comment-17453752 ] Jingsong Lee commented on FLINK-20370: -- Hi [~twalthr] We have evaluated the impact of this fix and it does affect the compatibility of some correct plans. For example the plan: * generates changes * No upsert keys * but it has no problems with the order This situation generates materialized node resulting in incompatibility. If we agree with the following assumptions: * This incompatibility case is very rare * When this incompatibility occurs, the user can maintain compatibility by configuring option We can consider cherry-pick. What do you think? [~lincoln.86xy] > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17453124#comment-17453124 ] Timo Walther commented on FLINK-20370: -- [~lzljs3620320] are we planning to back port the change to 1.13 and 1.14? I know already one user that would benefit from a 1.14 back port. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450999#comment-17450999 ] lincoln lee commented on FLINK-20370: - After discussed with [~wenlong.lwl] offline and confirm the code, it is safe for TransformationSinkProvider. For the DataStreamSinkProvider which is designed for advanced connector developers, it's necessary to pay attention to how changes are shuffled to not mess up the changelog per parallel subtask. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450880#comment-17450880 ] Wenlong Lyu commented on FLINK-20370: - [~lincoln.86xy] Even sink implements ParallelismProvider, parallelism can still be undefined, because it returns an optional value. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17450263#comment-17450263 ] lincoln lee commented on FLINK-20370: - [~wenlong.lwl] After look into the `TransformationSinkProvider` and `DataStreamSinkProvider`, I found it's unnecessary to add `keyby` because 1. `DataStreamSinkProvider` implements the `ParallelismProvider` interface so that its parallelism can be detected. 2. the internal `TransformationSinkProvider` is used for `ExternalDynamicSink`, when convert to `datastream` it will apply the input's parallelism so it always the same as the input. pls correct me if I'm wrong. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447729#comment-17447729 ] lincoln lee commented on FLINK-20370: - [~wenlong.lwl] Good question! I think it‘s safe adding ‘keyby’ by default for the two kinds of sink if we can't get parallelism (users can choose to turn it off explicitly). What do you think? [~twalthr] [~lzljs3620320] > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447367#comment-17447367 ] Wenlong Lyu commented on FLINK-20370: - hi, [~lincoln.86xy], what do you think we should provide when the sink is a DataStreamSinkProvider/TransformationSinkProvider? In such cases, we have no idea about the parallelism of sink, I think this is the case missed in the summary. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17447239#comment-17447239 ] lincoln lee commented on FLINK-20370: - According to Jingsong's review comments, I've split the change into two. The 1st part solve the 1.2 & 1.3 above, and a following one address the 2.2 which will introduce a config option 'table.exec.sink.pk-shuffle'. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441679#comment-17441679 ] lincoln lee commented on FLINK-20370: - Thanks for all your inputs! Based on Jingson's summary and the discussion, let me try to summarize, please correct me if I'm wrong An upsert sink can accept such inputs: 1. input is changelog stream 1.1 primary key = upsert key, it's already ok 1.2 primary key != upsert key (upsert key can be none), we should add upsertMaterialize 1.3 primary key contains upsert key, upsertMaterialize can be committed but sink requires update_before 2. input is append stream 2.1 sink has same parallelism with the upstream operator, it's ok 2.2 sink's parallelism differs from the upstream operator, we should add a 'keyby' for the primary key by default The current pr already addressed 1.2 & 1.3, so remaining the 2.2 to be done. The fix is simple, but for the sake of be configurable, we should introduce another job level config option similar to 'table.exec.sink.upsert-materialize' (since FLINK-24254 fine-grained setting per INSERT INTO not ready now) I temporally name it 'table.exec.sink.pk-shuffle', and updated the pr, welcome your suggestions. cc [~twalthr] [~lzljs3620320] [~wenlong.lwl] > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440534#comment-17440534 ] Timo Walther commented on FLINK-20370: -- [~wenlong.lwl] I guess for Kafka you are right. Kafka is doing the partitioning for us already with the default sink partitioner and given Kafka key. The only missing case is if you do a {{StreamTableEnvironment.toChangelogStream}} with a declared primary key, the user needs to take case of the partitioning in this case. But that might be acceptable. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440153#comment-17440153 ] Wenlong Lyu commented on FLINK-20370: - [~twalthr] thanks for the explanation. However, I think when the sink is kafka with pk(I would assume that it is a upsert kafka with key format provided), the target Kafka partition of a record is decided by the key generated. event it is written at different subtask. In such case, I think adding a key-by can help keep the order of record with the same uid, only when the input is already partitioned by uid. Is this case is what you want to solve? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440150#comment-17440150 ] Jingsong Lee commented on FLINK-20370: -- [~twalthr] Kafka partitioning should be another matter, depending on the user's connector option, for example, using upsert-kafka or `sink.partitioner`. In my opinion, if the parallelism changes, such as the parallelism of source and sink is different, the final result will be random, which may not meet the user's expectations. So the premise is to change parallelism. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439361#comment-17439361 ] Timo Walther commented on FLINK-20370: -- [~wenlong.lwl] bq. If the input of sink actually has the same pk with sink, and is insert-only, I think there is no distribution disorder. That is correct. But let's assume the following schema: {code} CREATE TABLE A (uid INT, name STRING) CREATE TABLE B (uid INT, name STRING, PRIMARY KEY(uid)) INSERT INTO B SELECT * FROM A; {code} In databases this query is totally fine because partitions don't exist. Please correct me if I'm wrong but in Flink it could cause issues, because we don't shuffle on uid here and different uid could end up in different Kafka partitions in the end. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Labels: pull-request-available > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439220#comment-17439220 ] Wenlong Lyu commented on FLINK-20370: - hi, [~twalthr], could you explain more about case 2, what should we guarantee semantically in this case? If the input of sink actually has the samepk with sink, and is insert-only, I think there is no distribution disorder. If the input of sink doesn't have the same pk with sink, and is insert-only, I have no idea what guarantee can be provided by adding a key-by? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > Fix For: 1.15.0 > > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17439135#comment-17439135 ] lincoln lee commented on FLINK-20370: - After discussed with [~jark] and [~lzljs3620320] , we found that `upsertMaterialization` cannot solve this problem. The root cause is `FlinkChangelogModeInferenceProgram` didn't consider the case when sink's primary key differs from input's upsert keys when infer required `ChangeLogMode` from sink node. I'll update the pr later. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Assignee: lincoln lee >Priority: Critical > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17420489#comment-17420489 ] Jingsong Lee commented on FLINK-20370: -- 4.primary key contains unique key , in this case, we can just convert UPDATE_BEFORE/UPDATE_AFTER to DELETE/INSERT. This is a optimization for UpsertMaterialize. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Critical > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17413207#comment-17413207 ] Timo Walther commented on FLINK-20370: -- I opened FLINK-24254. Any opinion to my other suggestion above for the 3 use cases? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412518#comment-17412518 ] Jingsong Lee commented on FLINK-20370: -- hints +1 > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412497#comment-17412497 ] Timo Walther commented on FLINK-20370: -- Even in case 2, if users declare a PRIMARY KEY for Kafka the same keys should end up in the same partition. Distributed disorder should only be allowed if the sink does not declare a PRIMARY KEY. We should aim for correctness and add a keyBy for case 1 and enable UpsertMaterialize for case 2. Of course, this can be disabled if necessary. I mentioned it in a different issue already, but we should also introduce hints that allow those important settings for sources fine-grained per INSERT INTO. I will open an issue for that if it doesn't exist yet. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412323#comment-17412323 ] Jingsong Lee commented on FLINK-20370: -- [~jark] I updated cases. I think UpsertMaterialize can works. For the third case, we can add UpsertMaterialize, but sometimes it may be redundant. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412318#comment-17412318 ] Jark Wu commented on FLINK-20370: - [~lzljs3620320] there is 4th case: input is changelog, but there is no unique key. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412317#comment-17412317 ] Jark Wu commented on FLINK-20370: - [~lzljs3620320] could UpsertMaterialize also solve this problem? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17412302#comment-17412302 ] Jingsong Lee commented on FLINK-20370: -- Yes, it seems that the only solution is single parallelism. A upsert sink can accept inputs: # primary key = unique key, it is OK # input is append only, sometimes is OK # primary key != unique key, the most problematic situation Maybe the third can be disabled. But maybe it will make many situations difficult to use. We need a flag... > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411971#comment-17411971 ] Timo Walther commented on FLINK-20370: -- [~lzljs3620320] [~jark] What is the current status of this issue? As far as I understand it, it can still occur that inferred unique key != primary key of the sink, right? Why don't we add a check that can be disabled with a flag? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Minor > Labels: auto-deprioritized-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336005#comment-17336005 ] Flink Jira Bot commented on FLINK-20370: This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Major > Labels: stale-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327508#comment-17327508 ] Flink Jira Bot commented on FLINK-20370: This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Major > Labels: stale-major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239147#comment-17239147 ] Jark Wu commented on FLINK-20370: - [~fsk119], no, this will not be considered, because it has the same semantic of batch query. You can run your above code in MySQL, and you will get the same result. However, running the example of JIRA description in MySQL, will get a different result. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239139#comment-17239139 ] Shengkai Fang commented on FLINK-20370: --- I am not sure whether this Jira will take the example below into the consideration. Example: {code:java} CREATE TABLE WordCountSource ( word STRING, cnt INT, PRIMARY KEY (word) NOT ENFORCED ) WITH ( ... ) CREATE TABLE WordCountSink ( word STRING, cnt INT, PRIMARY KEY (cnt) NOT ENFORCED ) WITH ( ... ) INSERT INTO WordCountSink SELECT * FROM WordCountSource; {code} If we have two records {{('a', 1), ('b', 1)}}, the rowkind of the record {{('b', 1)}} is insert for souce but it is update for sink. > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query
[ https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17239123#comment-17239123 ] Jark Wu commented on FLINK-20370: - I think this is not a trivial work. And the problems are not only sit in Join, but also other operations (e.g. TopN, Aggregation). We should figure out a comprehensive solution for this. For example, do not ignore UPDATE_BEFORE when PK is not equal ? > Result is wrong when sink primary key is not the same with query > > > Key: FLINK-20370 > URL: https://issues.apache.org/jira/browse/FLINK-20370 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.12.0 >Reporter: Jark Wu >Priority: Major > > Both sources are upsert-kafka which synchronizes the changes from MySQL > tables (source_city, source_customer). The sink is another MySQL table which > is in upsert mode with "city_name" primary key. The join key is "city_id". > In this case, the result will be wrong when updating > {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored > and the old city_name is retained in the sink table. > {code} > Sink(table=[default_catalog.default_database.sink_kafka_count_city], > fields=[city_name, count_customer, sum_gender], changelogMode=[NONE]) > +- Calc(select=[city_name, CAST(count_customer) AS count_customer, > CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D]) >+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, > count_customer, sum_gender, id, city_name], > leftInputSpec=[JoinKeyContainsUniqueKey], > rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D]) > :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D]) > : +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS > sum_gender], changelogMode=[I,UA,D]) > : +- Exchange(distribution=[hash[city_id]], changelogMode=[I]) > :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, > COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], > changelogMode=[I]) > : +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D]) > : +- ChangelogNormalize(key=[customer_id], > changelogMode=[I,UB,UA,D]) > : +- Exchange(distribution=[hash[customer_id]], > changelogMode=[UA,D]) > :+- MiniBatchAssigner(interval=[3000ms], > mode=[ProcTime], changelogMode=[UA,D]) > : +- TableSourceScan(table=[[default_catalog, > default_database, source_customer]], fields=[customer_id, city_id, age, > gender, update_time], changelogMode=[UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) > +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) >+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], > changelogMode=[UA,D]) > +- TableSourceScan(table=[[default_catalog, > default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D]) > {code} > We have suggested users to use the same key of the query as the primary key > on sink in the documentation: > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication. > We should make this attention to be more highlight in CREATE TABLE page. -- This message was sent by Atlassian Jira (v8.3.4#803005)