[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict
[ https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288161#comment-17288161 ] Jark Wu commented on FLINK-21430: - I see. You are enriching your stream with your sink table. Currently, the data consistency is not guaranteed. Because it is a cycle graph, and when you are looking up the data in JDBC, the data might not be written into it yet. > Appear append data when flink sql sink mysql on key conflict > > > Key: FLINK-21430 > URL: https://issues.apache.org/jira/browse/FLINK-21430 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.0 >Reporter: Yu Wang >Priority: Major > > kafka data: > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 > 06:39:05.088"} > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 > 06:47:34.609"} > kafka ddl : > CREATE TABLE washroom_detail ( > building_id STRING, > sofa_id STRING, > floor_num INT, > occupy_status INT, > start_time BIGINT, > end_time BIGINT, > process_time TIMESTAMP, > occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, > cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', > date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), > 'HH:mm')), > local_date as date_format(cast(start_time / 1000 as > timestamp), '-MM-dd'), > day_hour as cast(date_format(cast(start_time / 1000 as > timestamp), 'HH') as INT) + 8 > ) WITH ( > 'connector' = 'kafka', > 'topic' = '', > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = '', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > mysql ddl: > create table hour_ddl > ( > building_idSTRING, > sofa_id STRING, > local_date STRING, > `hour` INT, > floor_num INT, > occupy_frequency INT, > occupy_times STRING, > update_time TIMESTAMP, > process_time TIMESTAMP, > primary key (building_id, sofa_id, local_date, `hour`) > NOT ENFORCED > ) with ( > 'connector' = 'jdbc', > 'url' = '', > 'table-name' = '', > 'username' = 'x' > 'password' = 'xx' > ) > flink sql dml: > INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, > occupy_frequency, occupy_times, update_time, process_time) > SELECT a.building_id, >a.sofa_id, >a.local_date, >a.day_hour, >a.floor_num, >CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, > b.occupy_frequency) AS INT), >concat(if(b.occupy_times IS NULL, '', b.occupy_times), > if(b.occupy_times IS NULL, a.times, concat(',', a.times))), >NOW(), >a.process_time > FROM > (SELECT building_id, > sofa_id, > local_date, > day_hour, > floor_num, > count(1) AS frequency, > LISTAGG(occupy_times) AS times, > MAX(process_time) AS process_time, > PROCTIME() AS compute_time > FROM washroom_detail > GROUP BY building_id, > sofa_id, > local_date, > day_hour, > floor_num) a > LEFT JOIN hour_ddl > FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id > AND a.sofa_id = b.sofa_id > AND a.local_date = b.local_date > AND a.day_hour = b.`hour` > WHERE a.process_time > b.process_time > OR b.process_time IS NULL > appearance: > when mysql has not this record,insert this record: > occupy_frequencyoccupy_times > 1 15:01-15:03 > when key conflict , upsert this record: > occupy_frequencyoccupy_times > 3 15:01-15:03,15:01-15:03,15:03-15:04 > result should be the following record: > {color:red}occupy_frequencyoccupy_times > 2 > 15:01-15:03,15:03-15:04{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict
[ https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288154#comment-17288154 ] Yu Wang commented on FLINK-21430: - [~jark]thanks for your reminding > Appear append data when flink sql sink mysql on key conflict > > > Key: FLINK-21430 > URL: https://issues.apache.org/jira/browse/FLINK-21430 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.0 >Reporter: Yu Wang >Priority: Major > > kafka data: > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 > 06:39:05.088"} > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 > 06:47:34.609"} > kafka ddl : > CREATE TABLE washroom_detail ( > building_id STRING, > sofa_id STRING, > floor_num INT, > occupy_status INT, > start_time BIGINT, > end_time BIGINT, > process_time TIMESTAMP, > occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, > cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', > date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), > 'HH:mm')), > local_date as date_format(cast(start_time / 1000 as > timestamp), '-MM-dd'), > day_hour as cast(date_format(cast(start_time / 1000 as > timestamp), 'HH') as INT) + 8 > ) WITH ( > 'connector' = 'kafka', > 'topic' = '', > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = '', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > mysql ddl: > create table hour_ddl > ( > building_idSTRING, > sofa_id STRING, > local_date STRING, > `hour` INT, > floor_num INT, > occupy_frequency INT, > occupy_times STRING, > update_time TIMESTAMP, > process_time TIMESTAMP, > primary key (building_id, sofa_id, local_date, `hour`) > NOT ENFORCED > ) with ( > 'connector' = 'jdbc', > 'url' = '', > 'table-name' = '', > 'username' = 'x' > 'password' = 'xx' > ) > flink sql dml: > INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, > occupy_frequency, occupy_times, update_time, process_time) > SELECT a.building_id, >a.sofa_id, >a.local_date, >a.day_hour, >a.floor_num, >CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, > b.occupy_frequency) AS INT), >concat(if(b.occupy_times IS NULL, '', b.occupy_times), > if(b.occupy_times IS NULL, a.times, concat(',', a.times))), >NOW(), >a.process_time > FROM > (SELECT building_id, > sofa_id, > local_date, > day_hour, > floor_num, > count(1) AS frequency, > LISTAGG(occupy_times) AS times, > MAX(process_time) AS process_time, > PROCTIME() AS compute_time > FROM washroom_detail > GROUP BY building_id, > sofa_id, > local_date, > day_hour, > floor_num) a > LEFT JOIN hour_ddl > FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id > AND a.sofa_id = b.sofa_id > AND a.local_date = b.local_date > AND a.day_hour = b.`hour` > WHERE a.process_time > b.process_time > OR b.process_time IS NULL > appearance: > when mysql has not this record,insert this record: > occupy_frequencyoccupy_times > 1 15:01-15:03 > when key conflict , upsert this record: > occupy_frequencyoccupy_times > 3 15:01-15:03,15:01-15:03,15:03-15:04 > result should be the following record: > {color:red}occupy_frequencyoccupy_times > 2 > 15:01-15:03,15:03-15:04{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict
[ https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288146#comment-17288146 ] Jark Wu commented on FLINK-21430: - Please update description in English. > Appear append data when flink sql sink mysql on key conflict > > > Key: FLINK-21430 > URL: https://issues.apache.org/jira/browse/FLINK-21430 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.12.0 >Reporter: Yu Wang >Priority: Major > > kafka 数据格式: > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 > 06:39:05.088"} > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 > 06:47:34.609"} > kafka ddl : > CREATE TABLE washroom_detail ( > building_id STRING, > sofa_id STRING, > floor_num INT, > occupy_status INT, > start_time BIGINT, > end_time BIGINT, > process_time TIMESTAMP, > occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, > cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', > date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), > 'HH:mm')), > local_date as date_format(cast(start_time / 1000 as > timestamp), '-MM-dd'), > day_hour as cast(date_format(cast(start_time / 1000 as > timestamp), 'HH') as INT) + 8 > ) WITH ( > 'connector' = 'kafka', > 'topic' = '', > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = '', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > mysql ddl: > create table hour_ddl > ( > building_idSTRING, > sofa_id STRING, > local_date STRING, > `hour` INT, > floor_num INT, > occupy_frequency INT, > occupy_times STRING, > update_time TIMESTAMP, > process_time TIMESTAMP, > primary key (building_id, sofa_id, local_date, `hour`) > NOT ENFORCED > ) with ( > 'connector' = 'jdbc', > 'url' = '', > 'table-name' = '', > 'username' = 'x' > 'password' = 'xx' > ) > flink sql dml: > INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, > occupy_frequency, occupy_times, update_time, process_time) > SELECT a.building_id, >a.sofa_id, >a.local_date, >a.day_hour, >a.floor_num, >CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, > b.occupy_frequency) AS INT), >concat(if(b.occupy_times IS NULL, '', b.occupy_times), > if(b.occupy_times IS NULL, a.times, concat(',', a.times))), >NOW(), >a.process_time > FROM > (SELECT building_id, > sofa_id, > local_date, > day_hour, > floor_num, > count(1) AS frequency, > LISTAGG(occupy_times) AS times, > MAX(process_time) AS process_time, > PROCTIME() AS compute_time > FROM washroom_detail > GROUP BY building_id, > sofa_id, > local_date, > day_hour, > floor_num) a > LEFT JOIN hour_ddl > FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id > AND a.sofa_id = b.sofa_id > AND a.local_date = b.local_date > AND a.day_hour = b.`hour` > WHERE a.process_time > b.process_time > OR b.process_time IS NULL > 现象: > 当mysql 没有数据时,插入一条记录: > occupy_frequencyoccupy_times > 1 15:01-15:03 > 当主键冲突时,现象是: > occupy_frequencyoccupy_times > 3 15:01-15:03,15:01-15:03,15:03-15:04 > 结果应该是: > {color:red}occupy_frequencyoccupy_times > 2 > 15:01-15:03,15:03-15:04{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)