Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
Hi, 可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 query 在 1.16.2 上验证没有问题 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ Best, Jane On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: > flink ,kafka连接 jdbc连接版本都是1.15.2的 > > > > > > > > > > > > > > > > > > 在 2023-03-01 18:14:35,"陈佳豪" 写道: > >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 > >String kafka = "CREATE TABLE `电话` (`rowid` > VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` > VARCHAR(2147483647),`63fd660536521f81a2cfabad` > VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( > 'connector' = 'kafka', 'topic' = > 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', > 'properties.bootstrap.servers' = '132.232.27.116:9092', > 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; > > > >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` > STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT > ENFORCED ) WITH ('connector' = 'jdbc','driver' = > 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql:// > 43.136.128.102:6506/meihua_test','username' = 'root','password' = > '123456','table-name' = '电话2' )"; > > > >String insert = "insert into `电话_1` select `t_1`.`rowID` as > `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( > select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as > `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` > from `电话` ) as t_1"; > > > >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug? >
Re: Re: Flink SQL 如何优化以及处理反压
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。 如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。 On Tue, Jan 31, 2023 at 6:01 PM lxk wrote: > 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 > 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 > > > > > > > > > > > > > > > > > > 在 2023-01-31 17:45:15,"weijie guo" 写道: > >最好先找到导致下游处理过慢的瓶颈算子,适当扩大一下并发。如果还不行,看下jstack的情况,可能需要调整逻辑。 > > > >Best regards, > > > >Weijie > > > > > >ssmq <374060...@qq.com.invalid> 于2023年1月31日周二 17:22写道: > > > >> 你可以测试不写入clickhouse是否还存在反压,如果不是因为写入瓶颈的话就从你的处理逻辑优化了 > >> > >> > >> 发件人: lxk > >> 发送时间: 2023年1月31日 15:16 > >> 收件人: user-zh@flink.apache.org > >> 主题: Flink SQL 如何优化以及处理反压 > >> > >> Flink版本:1.16.0 > >> 目前在使用Flink SQL进行多流关联,并写入Clickhouse中 > >> 具体代码如下: > >> select \ > >> header.id as id, \ > >> LAST_VALUE(header.order_status), \ > >> LAST_VALUE(header.customer_id), \ > >> LAST_VALUE(header.shop_id), \ > >> LAST_VALUE(header.parent_order_id), \ > >> LAST_VALUE(header.order_at), \ > >> LAST_VALUE(header.pay_at), \ > >> LAST_VALUE(header.channel_id), \ > >> LAST_VALUE(header.root_order_id), \ > >> LAST_VALUE(header.last_updated_at), \ > >> item.id as item_id, \ > >> LAST_VALUE(item.order_id) as order_id, \ > >> LAST_VALUE(item.row_num), \ > >> LAST_VALUE(item.goods_id), \ > >> LAST_VALUE(item.s_sku_code), \ > >> LAST_VALUE(item.qty), \ > >> LAST_VALUE(item.p_paid_sub_amt), \ > >> LAST_VALUE(item.p_sp_sub_amt), \ > >> LAST_VALUE(item.bom_type), \ > >> LAST_VALUE(item.last_updated_at) as item_last_updated_at, \ > >> LAST_VALUE(item.display_qty), \ > >> LAST_VALUE(delivery.del_type), \ > >> LAST_VALUE(delivery.time_slot_type), \ > >> LAST_VALUE(delivery.time_slot_date), \ > >> LAST_VALUE(delivery.time_slot_time_from), \ > >> LAST_VALUE(delivery.time_slot_time_to), \ > >> LAST_VALUE(delivery.sku_delivery_type), \ > >> LAST_VALUE(delivery.last_updated_at) as del_last_updated_at, \ > >> LAST_VALUE(promotion.id) as promo_id, \ > >> LAST_VALUE(promotion.order_item_id), \ > >> LAST_VALUE(promotion.p_promo_amt), \ > >> LAST_VALUE(promotion.promotion_category), \ > >> LAST_VALUE(promotion.promo_type), \ > >> LAST_VALUE(promotion.promo_sub_type), \ > >> LAST_VALUE(promotion.last_updated_at) as promo_last_updated_at, \ > >> LAST_VALUE(promotion.promotion_cost) \ > >> from \ > >> item \ > >> join \ > >> header \ > >> on item.order_id = header.id \ > >> left join \ > >> delivery \ > >> on item.order_id = delivery.order_id \ > >> left join \ > >> promotion \ > >> on item.id =promotion.order_item_id \ > >> group by header.id,item.id > >> 在Flink WEB UI 上发现程序反压很严重,而且时不时挂掉: > >> https://pic.imgdb.cn/item/63d8bebbface21e9ef3c92fe.jpg > >> > >> 参考了京东的一篇文章 > >> > https://flink-learning.org.cn/article/detail/1e86b8b38faaeefd5ed7f70858aa40bc > >> ,对相关参数做了调整,但是发现有些功能在Flink 1.16中已经做了相关优化了,同时加了这些参数之后对程序没有起到任何优化的作用。 > >> > >> conf.setString("table.exec.mini-batch.enabled", "true"); > >> conf.setString("table.exec.mini-batch.allow-latency", "15 s"); > >> conf.setString("table.exec.mini-batch.size", "5000"); > >> conf.setString("table.exec.state.ttl", "86400 s"); > >> conf.setString("table.exec.disabled-operators", "NestedLoopJoin"); > >> conf.setString("table.optimizer.join.broadcast-threshold", "-1"); > >> conf.setString("table.optimizer.multiple-input-enabled", "true"); > >> conf.setString("table.exec.shuffle-mode", "POINTWISE_EDGES_PIPELINED"); > >> conf.setString("taskmanager.network.sort-shuffle.min-parallelism", "8"); > >> 想请教下,针对Flink SQL如何处理反压,同时有什么其他的优化手段? > >> > >> > >> > >> >
Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
flink ,kafka连接 jdbc连接版本都是1.15.2的 在 2023-03-01 18:14:35,"陈佳豪" 写道: >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >String kafka = "CREATE TABLE `电话` (`rowid` >VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >VARCHAR(2147483647),`63fd660536521f81a2cfabad` >VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' >= 'kafka', 'topic' = >'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', >'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = >'earliest-offset', 'format' = 'debezium-json' )"; > >String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` STRING,`手机` >STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT ENFORCED ) WITH ( >'connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = >'jdbc:mysql://43.136.128.102:6506/meihua_test','username' = 'root', >'password' = '123456','table-name' = '电话2' )"; > >String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` >as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as >`rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as >`手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1"; > >操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 String kafka = "CREATE TABLE `电话` (`rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' = 'kafka', 'topic' = 'sz_worksheet-63fdcff9ae76ba371276c1e5-63fd65fb36521f81a2cfab8f', 'properties.bootstrap.servers' = '132.232.27.116:9092', 'scan.startup.mode' = 'earliest-offset', 'format' = 'debezium-json' )"; String mysql = "CREATE TABLE `电话_1` (`rowID` VARCHAR(255),`名称` STRING,`手机` STRING,`座机` VARCHAR(255),PRIMARY KEY (`rowID`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://43.136.128.102:6506/meihua_test','username' = 'root', 'password' = '123456','table-name' = '电话2' )"; String insert = "insert into `电话_1` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as `名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from ( select `rowid` as `rowID`,`63fd65fb36521f81a2cfab90` as `名称`,`63fd660536521f81a2cfabad` as `手机`,`63fd660536521f81a2cfabae` as `座机` from `电话` ) as t_1"; 操作的语句如图所示,有大佬能帮忙看看解惑一下吗?是我语法问题还是本身就是flink 连接去的bug?
metrics.latency.interval指标应该如何查看?
我使用的flink版本是1.14.0,在flink-conf.yaml里添加了latency的配置 但是我在web-ui中没有找到这个指标 请问下是哪里出问题了。 flink任务是从kafka中读数据写入mysql中 public class FlinkSqlTask { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("10.215.142.175", 7080); env.getConfig().setParallelism(1); EnvironmentSettings streamSettings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build(); StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(env, streamSettings); tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT); StatementSet stmtSet = tableEnvironment.createStatementSet(); String originSql = "create table class0 (id int,name string) with ('connector'='kafka','topic'='test.280.91.test.class','properties.bootstrap.servers'='10.215.142.175:9092','debezium-json.schema-include'='true','scan.startup.mode' = 'latest-offset','properties.group.id'='test.280.91-dbhistory','format'='debezium-json')"; String targetSql = "create table class_test (id int,name string,primary key(id) NOT ENFORCED) with ('connector'='jdbc','url'='jdbc:mysql://10.215.142.98:43306/test?allowMultiQueries=trueuseUnicode=truecharacterEncoding=UTF-8useSSL=falseserverTimezone=GMT%2B8allowPublicKeyRetrieval=true','username'='root','password' ='ecidi@2019+Ecidi@2019','table-name'='class_test')"; String insertSql = "insert into class_test(id,name) select id,name from class0 where 1=1"; tableEnvironment.executeSql(originSql); tableEnvironment.executeSql(targetSql); stmtSet.addInsertSql(insertSql); stmtSet.execute(); } }
flink sql接cdc数据源关联维表写入下游数据库发现漏数据
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。 请问: 1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生? 2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗? [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sql/queries/window-deduplication/