Hi,
可以尝试用 EXPLAIN CHANGELOG_MODE[1] 把 plan 打出来看看, 或者尝试升级到 FLINK 1.16 版本, 这个
query 在 1.16.2 上验证没有问题
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/explain/
Best,
Jane
On Wed, Mar 1, 2023 at 6:22 PM 陈佳豪 wrote:
> flink ,kafka连接 jdbc连接版本都是1.15.2的
>
>
>
>
>
>
>
可以看一下反压算子是否出现在同一台机器(排除单点故障)。比如使用了 rocksdb + hdd 盘;单机负载过高;磁盘打满等。
如果不是单点故障,可以打 jstack 查看对应的线程具体在执行什么样的操作,再进行相应的逻辑优化。
On Tue, Jan 31, 2023 at 6:01 PM lxk wrote:
> 现在从web ui上看,瓶颈主要在于group by 聚合函数之后去重这个逻辑。
> 而且SQL这个并行度是全局设置的,没法针对某一个特定的算子设置并行度,并行度多了之后,资源又感觉有点吃紧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在
flink ,kafka连接 jdbc连接版本都是1.15.2的
在 2023-03-01 18:14:35,"陈佳豪" 写道:
>问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
>String kafka = "CREATE TABLE `电话` (`rowid`
>VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
>VARCHAR(2147483647),`63fd660536521f81a2cfabad`
问题如标题所示,就是删除操作的时候mysql的表数据不对,每次都会新增当前主键的旧数据。
String kafka = "CREATE TABLE `电话` (`rowid`
VARCHAR(2147483647),`63fd65fb36521f81a2cfab90`
VARCHAR(2147483647),`63fd660536521f81a2cfabad`
VARCHAR(65535),`63fd660536521f81a2cfabae` VARCHAR(65535) ) WITH ( 'connector'
= 'kafka', 'topic' =
我使用的flink版本是1.14.0,在flink-conf.yaml里添加了latency的配置
但是我在web-ui中没有找到这个指标
请问下是哪里出问题了。
flink任务是从kafka中读数据写入mysql中
public class FlinkSqlTask {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
flink sql上游接kafka canal json topic消费mysql同步过来的变更,接着关联几张维表,最后写入下游数据库发现漏数据。
随后在写目标数据库中加了一些日志后发现同一主键的变更记录(前后发生间隔时间很短)被发送到了不同的TaskManager处理,导致新数据被旧数据覆盖,造成漏数据现象。
请问:
1. cdc数据源关联维表后会被分散到不同TaskManager吗?什么情况下会发生?
2. 如何解决这个问题?是需要在写目标表之前加一层窗口去重[1]吗?
[1]