[ https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yu Wang updated FLINK-21430: ---------------------------- Description: kafka data: {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 06:39:05.088"} {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 06:47:34.609"} kafka ddl : CREATE TABLE washroom_detail ( building_id STRING, sofa_id STRING, floor_num INT, occupy_status INT, start_time BIGINT, end_time BIGINT, process_time TIMESTAMP, occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 'HH:mm')), local_date as date_format(cast(start_time / 1000 as timestamp), 'yyyy-MM-dd'), day_hour as cast(date_format(cast(start_time / 1000 as timestamp), 'HH') as INT) + 8 ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxx', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); mysql ddl: create table hour_ddl ( building_id STRING, sofa_id STRING, local_date STRING, `hour` INT, floor_num INT, occupy_frequency INT, occupy_times STRING, update_time TIMESTAMP, process_time TIMESTAMP, primary key (building_id, sofa_id, local_date, `hour`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'xxxxxxxx', 'table-name' = 'xxxxxxxx', 'username' = 'xxxxx' 'password' = 'xxxxxx' ) flink sql dml: INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, occupy_frequency, occupy_times, update_time, process_time) SELECT a.building_id, a.sofa_id, a.local_date, a.day_hour, a.floor_num, CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) AS INT), concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times IS NULL, a.times, concat(',', a.times))), NOW(), a.process_time FROM (SELECT building_id, sofa_id, local_date, day_hour, floor_num, count(1) AS frequency, LISTAGG(occupy_times) AS times, MAX(process_time) AS process_time, PROCTIME() AS compute_time FROM washroom_detail GROUP BY building_id, sofa_id, local_date, day_hour, floor_num) a LEFT JOIN hour_ddl FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id AND a.sofa_id = b.sofa_id AND a.local_date = b.local_date AND a.day_hour = b.`hour` WHERE a.process_time > b.process_time OR b.process_time IS NULL appearance: when mysql has not this record,insert this record: occupy_frequency occupy_times 1 15:01-15:03 when key conflict , upsert this record: occupy_frequency occupy_times 3 15:01-15:03,15:01-15:03,15:03-15:04 result should be the following record: {color:red}occupy_frequency occupy_times 2 15:01-15:03,15:03-15:04{color} was: kafka 数据格式: {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 06:39:05.088"} {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 06:47:34.609"} kafka ddl : CREATE TABLE washroom_detail ( building_id STRING, sofa_id STRING, floor_num INT, occupy_status INT, start_time BIGINT, end_time BIGINT, process_time TIMESTAMP, occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 'HH:mm')), local_date as date_format(cast(start_time / 1000 as timestamp), 'yyyy-MM-dd'), day_hour as cast(date_format(cast(start_time / 1000 as timestamp), 'HH') as INT) + 8 ) WITH ( 'connector' = 'kafka', 'topic' = 'xxxxxxxx', 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx', 'properties.group.id' = 'xxxxxxxxxxxx', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); mysql ddl: create table hour_ddl ( building_id STRING, sofa_id STRING, local_date STRING, `hour` INT, floor_num INT, occupy_frequency INT, occupy_times STRING, update_time TIMESTAMP, process_time TIMESTAMP, primary key (building_id, sofa_id, local_date, `hour`) NOT ENFORCED ) with ( 'connector' = 'jdbc', 'url' = 'xxxxxxxx', 'table-name' = 'xxxxxxxx', 'username' = 'xxxxx' 'password' = 'xxxxxx' ) flink sql dml: INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, occupy_frequency, occupy_times, update_time, process_time) SELECT a.building_id, a.sofa_id, a.local_date, a.day_hour, a.floor_num, CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) AS INT), concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times IS NULL, a.times, concat(',', a.times))), NOW(), a.process_time FROM (SELECT building_id, sofa_id, local_date, day_hour, floor_num, count(1) AS frequency, LISTAGG(occupy_times) AS times, MAX(process_time) AS process_time, PROCTIME() AS compute_time FROM washroom_detail GROUP BY building_id, sofa_id, local_date, day_hour, floor_num) a LEFT JOIN hour_ddl FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id AND a.sofa_id = b.sofa_id AND a.local_date = b.local_date AND a.day_hour = b.`hour` WHERE a.process_time > b.process_time OR b.process_time IS NULL 现象: 当mysql 没有数据时,插入一条记录: occupy_frequency occupy_times 1 15:01-15:03 当主键冲突时,现象是: occupy_frequency occupy_times 3 15:01-15:03,15:01-15:03,15:03-15:04 结果应该是: {color:red}occupy_frequency occupy_times 2 15:01-15:03,15:03-15:04{color} > Appear append data when flink sql sink mysql on key conflict > ------------------------------------------------------------ > > Key: FLINK-21430 > URL: https://issues.apache.org/jira/browse/FLINK-21430 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.12.0 > Reporter: Yu Wang > Priority: Major > > kafka data: > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20 > 06:39:05.088"} > {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20 > 06:47:34.609"} > kafka ddl : > CREATE TABLE washroom_detail ( > building_id STRING, > sofa_id STRING, > floor_num INT, > occupy_status INT, > start_time BIGINT, > end_time BIGINT, > process_time TIMESTAMP, > occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, > cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', > date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), > 'HH:mm')), > local_date as date_format(cast(start_time / 1000 as > timestamp), 'yyyy-MM-dd'), > day_hour as cast(date_format(cast(start_time / 1000 as > timestamp), 'HH') as INT) + 8 > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'xxxxxxxx', > 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx', > 'properties.group.id' = 'xxxxxxxxxxxx', > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'json' > ); > mysql ddl: > create table hour_ddl > ( > building_id STRING, > sofa_id STRING, > local_date STRING, > `hour` INT, > floor_num INT, > occupy_frequency INT, > occupy_times STRING, > update_time TIMESTAMP, > process_time TIMESTAMP, > primary key (building_id, sofa_id, local_date, `hour`) > NOT ENFORCED > ) with ( > 'connector' = 'jdbc', > 'url' = 'xxxxxxxx', > 'table-name' = 'xxxxxxxx', > 'username' = 'xxxxx' > 'password' = 'xxxxxx' > ) > flink sql dml: > INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, > occupy_frequency, occupy_times, update_time, process_time) > SELECT a.building_id, > a.sofa_id, > a.local_date, > a.day_hour, > a.floor_num, > CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, > b.occupy_frequency) AS INT), > concat(if(b.occupy_times IS NULL, '', b.occupy_times), > if(b.occupy_times IS NULL, a.times, concat(',', a.times))), > NOW(), > a.process_time > FROM > (SELECT building_id, > sofa_id, > local_date, > day_hour, > floor_num, > count(1) AS frequency, > LISTAGG(occupy_times) AS times, > MAX(process_time) AS process_time, > PROCTIME() AS compute_time > FROM washroom_detail > GROUP BY building_id, > sofa_id, > local_date, > day_hour, > floor_num) a > LEFT JOIN hour_ddl > FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id > AND a.sofa_id = b.sofa_id > AND a.local_date = b.local_date > AND a.day_hour = b.`hour` > WHERE a.process_time > b.process_time > OR b.process_time IS NULL > appearance: > when mysql has not this record,insert this record: > occupy_frequency occupy_times > 1 15:01-15:03 > when key conflict , upsert this record: > occupy_frequency occupy_times > 3 15:01-15:03,15:01-15:03,15:03-15:04 > result should be the following record: > {color:red}occupy_frequency occupy_times > 2 > 15:01-15:03,15:03-15:04{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)