Hi, 这个版本是支持的。 其中插入语句是 "insert into " 而不是 “update into”?
在 2020-11-16 17:04:23,"鱼子酱" <384939...@qq.com> 写道: >请问使用mysql数据库时,使用flinksql,已经设置主键的情况下,相同主键的记录没有更新,而是越来越多, >是目前不支持还是我使用的方法不对呢? >版本:flink 1.11.1 > >关键的2个sql如下 > > create table open_and_close_terminal_minute_1 ( > request_date varchar > ,terminal_no varchar > ,logon_time varchar > ,logout_time varchar > ,insert_time varchar > ,PRIMARY KEY (request_date,terminal_no) NOT ENFORCED > ) with ( > 'connector' = 'jd…… > 'url' = 'jdbc:mys……se', > 'table-name' = 'c……, > 'driver' = 'com.m…… > 'username' = 'ana…… > 'password' = 'ana…… > 'sink.buffer-flus…… > ) > > upsert into open_and_close_terminal_minute_1 > select request_date ,terminal_no ,logon_time ,logout_time >,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from > ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' >MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,10) as request_date > ,cast(terminalNo as varchar) as terminal_no > ,DATE_FORMAT(min(times),'yyyy-MM-dd HH:mm:ss.SSS') as logon_time > ,DATE_FORMAT(max(times),'yyyy-MM-dd HH:mm:ss.SSS') as logout_time > from caslog INNER join itoa_b_terminal_shop for system_time as of >caslog.proc_time > on cast(caslog.terminalNo as varchar)= itoa_b_terminal_shop.rowkey > where > errCode=0 and attr=0 > group by TUMBLE(times, INTERVAL '1' MINUTE),terminalNo > > ) > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/