[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict

2021-02-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288161#comment-17288161
 ] 

Jark Wu commented on FLINK-21430:
-

I see. You are enriching your stream with your sink table. Currently, the data 
consistency is not guaranteed. Because it is a cycle graph, and when you are 
looking up the data in JDBC, the data might not be written into it yet. 

> Appear append data when flink sql sink mysql on key conflict
> 
>
> Key: FLINK-21430
> URL: https://issues.apache.org/jira/browse/FLINK-21430
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.0
>Reporter: Yu Wang
>Priority: Major
>
> kafka data:
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
>  06:39:05.088"}
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
>  06:47:34.609"}
> kafka ddl :
> CREATE TABLE washroom_detail (
>  building_id STRING,
>  sofa_id STRING,
>  floor_num INT,
>  occupy_status INT,
>  start_time BIGINT,
>  end_time BIGINT,
>  process_time TIMESTAMP,
>  occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
> cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
> date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
> 'HH:mm')),
>  local_date as date_format(cast(start_time / 1000 as 
> timestamp), '-MM-dd'),
>  day_hour as cast(date_format(cast(start_time / 1000 as 
> timestamp), 'HH') as INT) + 8
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = '',
>  'properties.bootstrap.servers' = 'xxx',
>  'properties.group.id' = '',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
> mysql ddl:
>   create table hour_ddl
> (
> building_idSTRING,
> sofa_id  STRING,
> local_date STRING,
> `hour`  INT,
> floor_num INT,
> occupy_frequency INT,
> occupy_times STRING,
> update_time TIMESTAMP,
> process_time TIMESTAMP,
> primary key (building_id, sofa_id, local_date, `hour`) 
> NOT ENFORCED
> ) with (
>   'connector' = 'jdbc',
>   'url' = '',
>   'table-name' = '',
>   'username' = 'x'
>   'password' = 'xx'
>   )
> flink sql dml:
> INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
> occupy_frequency, occupy_times, update_time, process_time)
> SELECT a.building_id,
>a.sofa_id,
>a.local_date,
>a.day_hour,
>a.floor_num,
>CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, 
> b.occupy_frequency) AS INT),
>concat(if(b.occupy_times IS NULL, '', b.occupy_times), 
> if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
>NOW(),
>a.process_time
> FROM
> (SELECT building_id,
> sofa_id,
> local_date,
> day_hour,
> floor_num,
> count(1) AS frequency,
> LISTAGG(occupy_times) AS times,
> MAX(process_time) AS process_time,
> PROCTIME() AS compute_time
>  FROM washroom_detail
>  GROUP BY building_id,
>   sofa_id,
>   local_date,
>   day_hour,
>   floor_num) a
> LEFT JOIN hour_ddl
> FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
> AND a.sofa_id = b.sofa_id
> AND a.local_date = b.local_date
> AND a.day_hour = b.`hour`
> WHERE a.process_time > b.process_time
> OR b.process_time IS NULL
> appearance:
> when mysql has not this record,insert this record:
> occupy_frequencyoccupy_times
>   1  15:01-15:03
> when key conflict , upsert this record:
> occupy_frequencyoccupy_times
>   3  15:01-15:03,15:01-15:03,15:03-15:04
> result should be the following record:
> {color:red}occupy_frequencyoccupy_times
>   2  
> 15:01-15:03,15:03-15:04{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict

2021-02-21 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288154#comment-17288154
 ] 

Yu Wang commented on FLINK-21430:
-

[~jark]thanks for your reminding

> Appear append data when flink sql sink mysql on key conflict
> 
>
> Key: FLINK-21430
> URL: https://issues.apache.org/jira/browse/FLINK-21430
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.0
>Reporter: Yu Wang
>Priority: Major
>
> kafka data:
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
>  06:39:05.088"}
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
>  06:47:34.609"}
> kafka ddl :
> CREATE TABLE washroom_detail (
>  building_id STRING,
>  sofa_id STRING,
>  floor_num INT,
>  occupy_status INT,
>  start_time BIGINT,
>  end_time BIGINT,
>  process_time TIMESTAMP,
>  occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
> cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
> date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
> 'HH:mm')),
>  local_date as date_format(cast(start_time / 1000 as 
> timestamp), '-MM-dd'),
>  day_hour as cast(date_format(cast(start_time / 1000 as 
> timestamp), 'HH') as INT) + 8
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = '',
>  'properties.bootstrap.servers' = 'xxx',
>  'properties.group.id' = '',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
> mysql ddl:
>   create table hour_ddl
> (
> building_idSTRING,
> sofa_id  STRING,
> local_date STRING,
> `hour`  INT,
> floor_num INT,
> occupy_frequency INT,
> occupy_times STRING,
> update_time TIMESTAMP,
> process_time TIMESTAMP,
> primary key (building_id, sofa_id, local_date, `hour`) 
> NOT ENFORCED
> ) with (
>   'connector' = 'jdbc',
>   'url' = '',
>   'table-name' = '',
>   'username' = 'x'
>   'password' = 'xx'
>   )
> flink sql dml:
> INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
> occupy_frequency, occupy_times, update_time, process_time)
> SELECT a.building_id,
>a.sofa_id,
>a.local_date,
>a.day_hour,
>a.floor_num,
>CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, 
> b.occupy_frequency) AS INT),
>concat(if(b.occupy_times IS NULL, '', b.occupy_times), 
> if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
>NOW(),
>a.process_time
> FROM
> (SELECT building_id,
> sofa_id,
> local_date,
> day_hour,
> floor_num,
> count(1) AS frequency,
> LISTAGG(occupy_times) AS times,
> MAX(process_time) AS process_time,
> PROCTIME() AS compute_time
>  FROM washroom_detail
>  GROUP BY building_id,
>   sofa_id,
>   local_date,
>   day_hour,
>   floor_num) a
> LEFT JOIN hour_ddl
> FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
> AND a.sofa_id = b.sofa_id
> AND a.local_date = b.local_date
> AND a.day_hour = b.`hour`
> WHERE a.process_time > b.process_time
> OR b.process_time IS NULL
> appearance:
> when mysql has not this record,insert this record:
> occupy_frequencyoccupy_times
>   1  15:01-15:03
> when key conflict , upsert this record:
> occupy_frequencyoccupy_times
>   3  15:01-15:03,15:01-15:03,15:03-15:04
> result should be the following record:
> {color:red}occupy_frequencyoccupy_times
>   2  
> 15:01-15:03,15:03-15:04{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21430) Appear append data when flink sql sink mysql on key conflict

2021-02-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17288146#comment-17288146
 ] 

Jark Wu commented on FLINK-21430:
-

Please update description in English. 

> Appear append data when flink sql sink mysql on key conflict
> 
>
> Key: FLINK-21430
> URL: https://issues.apache.org/jira/browse/FLINK-21430
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.0
>Reporter: Yu Wang
>Priority: Major
>
> kafka 数据格式:
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
>  06:39:05.088"}
> {"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
>  06:47:34.609"}
> kafka ddl :
> CREATE TABLE washroom_detail (
>  building_id STRING,
>  sofa_id STRING,
>  floor_num INT,
>  occupy_status INT,
>  start_time BIGINT,
>  end_time BIGINT,
>  process_time TIMESTAMP,
>  occupy_times as concat(date_format(TIMESTAMPADD(hour, 8, 
> cast(start_time / 1000 as timestamp)), 'HH:mm'), '-', 
> date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)), 
> 'HH:mm')),
>  local_date as date_format(cast(start_time / 1000 as 
> timestamp), '-MM-dd'),
>  day_hour as cast(date_format(cast(start_time / 1000 as 
> timestamp), 'HH') as INT) + 8
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = '',
>  'properties.bootstrap.servers' = 'xxx',
>  'properties.group.id' = '',
>  'scan.startup.mode' = 'earliest-offset',
>  'format' = 'json'
> );
> mysql ddl:
>   create table hour_ddl
> (
> building_idSTRING,
> sofa_id  STRING,
> local_date STRING,
> `hour`  INT,
> floor_num INT,
> occupy_frequency INT,
> occupy_times STRING,
> update_time TIMESTAMP,
> process_time TIMESTAMP,
> primary key (building_id, sofa_id, local_date, `hour`) 
> NOT ENFORCED
> ) with (
>   'connector' = 'jdbc',
>   'url' = '',
>   'table-name' = '',
>   'username' = 'x'
>   'password' = 'xx'
>   )
> flink sql dml:
> INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num, 
> occupy_frequency, occupy_times, update_time, process_time)
> SELECT a.building_id,
>a.sofa_id,
>a.local_date,
>a.day_hour,
>a.floor_num,
>CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0, 
> b.occupy_frequency) AS INT),
>concat(if(b.occupy_times IS NULL, '', b.occupy_times), 
> if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
>NOW(),
>a.process_time
> FROM
> (SELECT building_id,
> sofa_id,
> local_date,
> day_hour,
> floor_num,
> count(1) AS frequency,
> LISTAGG(occupy_times) AS times,
> MAX(process_time) AS process_time,
> PROCTIME() AS compute_time
>  FROM washroom_detail
>  GROUP BY building_id,
>   sofa_id,
>   local_date,
>   day_hour,
>   floor_num) a
> LEFT JOIN hour_ddl
> FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
> AND a.sofa_id = b.sofa_id
> AND a.local_date = b.local_date
> AND a.day_hour = b.`hour`
> WHERE a.process_time > b.process_time
> OR b.process_time IS NULL
> 现象:
> 当mysql 没有数据时,插入一条记录:
> occupy_frequencyoccupy_times
>   1  15:01-15:03
> 当主键冲突时,现象是:
> occupy_frequencyoccupy_times
>   3  15:01-15:03,15:01-15:03,15:03-15:04
> 结果应该是:
> {color:red}occupy_frequencyoccupy_times
>   2  
> 15:01-15:03,15:03-15:04{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)