[ 
https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Wang updated FLINK-21430:
----------------------------
    Description: 
kafka data:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
 06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
 06:47:34.609"}

kafka ddl :
CREATE TABLE washroom_detail (
                 building_id STRING,
                 sofa_id STRING,
                 floor_num INT,
                 occupy_status INT,
                 start_time BIGINT,
                 end_time BIGINT,
                 process_time TIMESTAMP,
                 occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
'HH:mm')),
                 local_date as date_format(cast(start_time / 1000 as 
timestamp), 'yyyy-MM-dd'),
                 day_hour as cast(date_format(cast(start_time / 1000 as 
timestamp), 'HH') as INT) + 8
                ) WITH (
                 'connector' = 'kafka',
                 'topic' = 'xxxxxxxx',
                 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
                 'properties.group.id' = 'xxxxxxxxxxxx',
                 'scan.startup.mode' = 'earliest-offset',
                 'format' = 'json'
                );


mysql ddl:

  create table hour_ddl
                (
                    building_id        STRING,
                    sofa_id      STRING,
                    local_date STRING,
                    `hour`      INT,
                    floor_num         INT,
                    occupy_frequency         INT,
                    occupy_times         STRING,
                    update_time         TIMESTAMP,
                    process_time         TIMESTAMP,
                    primary key (building_id, sofa_id, local_date, `hour`) NOT 
ENFORCED
                ) with (
                      'connector' = 'jdbc',
                      'url' = 'xxxxxxxx',
                      'table-name' = 'xxxxxxxx',
                      'username' = 'xxxxx'
                      'password' = 'xxxxxx'
                      )


flink sql dml:

INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
       a.sofa_id,
       a.local_date,
       a.day_hour,
       a.floor_num,
       CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) 
AS INT),
       concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times 
IS NULL, a.times, concat(',', a.times))),
       NOW(),
       a.process_time
FROM
(SELECT building_id,
        sofa_id,
        local_date,
        day_hour,
        floor_num,
        count(1) AS frequency,
        LISTAGG(occupy_times) AS times,
        MAX(process_time) AS process_time,
        PROCTIME() AS compute_time
 FROM washroom_detail
 GROUP BY building_id,
          sofa_id,
          local_date,
          day_hour,
          floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL

appearance:
when mysql has not this record,insert this record:
occupy_frequency        occupy_times
              1                      15:01-15:03
when key conflict , upsert this record:
occupy_frequency        occupy_times
              3                      15:01-15:03,15:01-15:03,15:03-15:04
result should be the following record:
{color:red}occupy_frequency        occupy_times
              2                                  15:01-15:03,15:03-15:04{color}


  was:
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
 06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
 06:47:34.609"}

kafka ddl :
CREATE TABLE washroom_detail (
                 building_id STRING,
                 sofa_id STRING,
                 floor_num INT,
                 occupy_status INT,
                 start_time BIGINT,
                 end_time BIGINT,
                 process_time TIMESTAMP,
                 occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
'HH:mm')),
                 local_date as date_format(cast(start_time / 1000 as 
timestamp), 'yyyy-MM-dd'),
                 day_hour as cast(date_format(cast(start_time / 1000 as 
timestamp), 'HH') as INT) + 8
                ) WITH (
                 'connector' = 'kafka',
                 'topic' = 'xxxxxxxx',
                 'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
                 'properties.group.id' = 'xxxxxxxxxxxx',
                 'scan.startup.mode' = 'earliest-offset',
                 'format' = 'json'
                );


mysql ddl:
  create table hour_ddl
                (
                    building_id        STRING,
                    sofa_id      STRING,
                    local_date STRING,
                    `hour`      INT,
                    floor_num         INT,
                    occupy_frequency         INT,
                    occupy_times         STRING,
                    update_time         TIMESTAMP,
                    process_time         TIMESTAMP,
                    primary key (building_id, sofa_id, local_date, `hour`) NOT 
ENFORCED
                ) with (
                      'connector' = 'jdbc',
                      'url' = 'xxxxxxxx',
                      'table-name' = 'xxxxxxxx',
                      'username' = 'xxxxx'
                      'password' = 'xxxxxx'
                      )


flink sql dml:
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
       a.sofa_id,
       a.local_date,
       a.day_hour,
       a.floor_num,
       CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, b.occupy_frequency) 
AS INT),
       concat(if(b.occupy_times IS NULL, '', b.occupy_times), if(b.occupy_times 
IS NULL, a.times, concat(',', a.times))),
       NOW(),
       a.process_time
FROM
(SELECT building_id,
        sofa_id,
        local_date,
        day_hour,
        floor_num,
        count(1) AS frequency,
        LISTAGG(occupy_times) AS times,
        MAX(process_time) AS process_time,
        PROCTIME() AS compute_time
 FROM washroom_detail
 GROUP BY building_id,
          sofa_id,
          local_date,
          day_hour,
          floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL

现象:
当mysql 没有数据时,插入一条记录:
occupy_frequency        occupy_times
              1                      15:01-15:03
当主键冲突时,现象是:
occupy_frequency        occupy_times
              3                      15:01-15:03,15:01-15:03,15:03-15:04
结果应该是:
{color:red}occupy_frequency        occupy_times
              2                                  15:01-15:03,15:03-15:04{color}



> Appear append data when flink sql sink mysql on key conflict
> ------------------------------------------------------------
>
>                 Key: FLINK-21430
>                 URL: https://issues.apache.org/jira/browse/FLINK-21430
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.12.0
>            Reporter: Yu Wang
>            Priority: Major
>
> kafka data:
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
>  06:39:05.088"}
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
>  06:47:34.609"}
> kafka ddl :
> CREATE TABLE washroom_detail (
>                  building_id STRING,
>                  sofa_id STRING,
>                  floor_num INT,
>                  occupy_status INT,
>                  start_time BIGINT,
>                  end_time BIGINT,
>                  process_time TIMESTAMP,
>                  occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
> cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
> date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
> 'HH:mm')),
>                  local_date as date_format(cast(start_time / 1000 as 
> timestamp), 'yyyy-MM-dd'),
>                  day_hour as cast(date_format(cast(start_time / 1000 as 
> timestamp), 'HH') as INT) + 8
>                 ) WITH (
>                  'connector' = 'kafka',
>                  'topic' = 'xxxxxxxx',
>                  'properties.bootstrap.servers' = 'xxxxxxxxxxxxxxx',
>                  'properties.group.id' = 'xxxxxxxxxxxx',
>                  'scan.startup.mode' = 'earliest-offset',
>                  'format' = 'json'
>                 );
> mysql ddl:
>   create table hour_ddl
>                 (
>                     building_id        STRING,
>                     sofa_id      STRING,
>                     local_date STRING,
>                     `hour`      INT,
>                     floor_num         INT,
>                     occupy_frequency         INT,
>                     occupy_times         STRING,
>                     update_time         TIMESTAMP,
>                     process_time         TIMESTAMP,
>                     primary key (building_id, sofa_id, local_date, `hour`) 
> NOT ENFORCED
>                 ) with (
>                       'connector' = 'jdbc',
>                       'url' = 'xxxxxxxx',
>                       'table-name' = 'xxxxxxxx',
>                       'username' = 'xxxxx'
>                       'password' = 'xxxxxx'
>                       )
> flink sql dml:
> INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
> occupy_frequency, occupy_times, update_time, process_time)
> SELECT a.building_id,
>        a.sofa_id,
>        a.local_date,
>        a.day_hour,
>        a.floor_num,
>        CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, 
> b.occupy_frequency) AS INT),
>        concat(if(b.occupy_times IS NULL, '', b.occupy_times), 
> if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
>        NOW(),
>        a.process_time
> FROM
> (SELECT building_id,
>         sofa_id,
>         local_date,
>         day_hour,
>         floor_num,
>         count(1) AS frequency,
>         LISTAGG(occupy_times) AS times,
>         MAX(process_time) AS process_time,
>         PROCTIME() AS compute_time
>  FROM washroom_detail
>  GROUP BY building_id,
>           sofa_id,
>           local_date,
>           day_hour,
>           floor_num) a
> LEFT JOIN hour_ddl
> FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
> AND a.sofa_id = b.sofa_id
> AND a.local_date = b.local_date
> AND a.day_hour = b.`hour`
> WHERE a.process_time > b.process_time
> OR b.process_time IS NULL
> appearance:
> when mysql has not this record,insert this record:
> occupy_frequency        occupy_times
>               1                      15:01-15:03
> when key conflict , upsert this record:
> occupy_frequency        occupy_times
>               3                      15:01-15:03,15:01-15:03,15:03-15:04
> result should be the following record:
> {color:red}occupy_frequency        occupy_times
>               2                                  
> 15:01-15:03,15:03-15:04{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to