Hi,

可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/

Best,
Jane

On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 <jagec...@yeah.net> wrote:

> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-01 18:14:35,"陈佳豪" <jagec...@yeah.net> 写道:
> >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
> >String kafka = "CREATE TABLE `电话` (    `rowid`
> VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
> VARCHAR(2147483647),`63fd660536521f81a2cfabad`
> VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535)  ) WITH (
> 'connector' = 'kafka', 'topic' =
> 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f',
> 'properties.bootstrap.servers' = '132.232.27.116:9092',
> 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )";
> >
> >String mysql = "CREATE TABLE `电话_1` (    `rowID` VARCHAR(255),`名称`
> STRING,`手机` STRING,`座机` VARCHAR(255),    PRIMARY KEY (`rowID`) NOT
> ENFORCED  )  WITH (    'connector' = 'jdbc',    'driver' =
> 'com.mysql.cj.jdbc.Driver',    'url' = 'jdbc:mysql://
> 43.136.128.102:6506/meihua_test',    'username' = 'root',    'password' =
> '123456',    'table-name' = '电话2'  )";
> >
> >String insert = "insert into `电话_1` select `t_1`.`rowID` as
> `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from (
> select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as
> `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机`
> from `电话` ) as t_1";
> >
> >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
>

回复