Re: 使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 Jane Chan
Hi, 可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个 query 在 1.16.2 上验证没有问题 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/ Best, Jane On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote: > flink ,kafka连接 jdbc连接版本都是1.15.2的 > > > > > > >

Re: Re: Flink SQL 如何优化以及处理反压

2023-03-01 文章 Guojun Li
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。 如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。 On Tue, Jan 31, 2023 at 6:01 PM lxk wrote: > 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。 > 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。 > > > > > > > > > > > > > > > > > > 在

Re:使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
flink ,kafka连接 jdbc连接版本都是1.15.2的 在 2023-03-01 18:14:35,"陈佳豪" 写道: >问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 >String kafka = "CREATE TABLE `电话` (`rowid` >VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` >VARCHAR(2147483647),`63fd660536521f81a2cfabad`

使用flinksql将kafka的数据同步mysql,在源kafka进行删除操作,mysql收到op为d的数据进行删除操作但是与此同时会新增一条当前数据以前的旧数据。

2023-03-01 文章 陈佳豪
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。 String kafka = "CREATE TABLE `电话` (`rowid` VARCHAR(2147483647),`63fd65fb36521f81a2cfab90` VARCHAR(2147483647),`63fd660536521f81a2cfabad` VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector' = 'kafka', 'topic' =

metrics.latency.interval指标应该如何查看?

2023-03-01 文章 陶浩然
我使用的flink版本是1.14.0,在flink-conf.yaml里添加了latency的配置 但是我在web-ui中没有找到这个指标 请问下是哪里出问题了。 flink任务是从kafka中读数据写入mysql中 public class FlinkSqlTask { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env =

flink sql接cdc数据源关联维表写入下游数据库发现漏数据

2023-03-01 文章 casel.chen
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。 随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。 请问: 1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生? 2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗? [1]