官网上没有,在github上https://github.com/ververica/flink-cdc-connectors
```
SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost") .port(3306) .databaseList("inventory") // monitor all
tables under inventory database .username("flinkuser") .password("flinkpw")
.deserializer(new
我没发现官网没看到DataStream有cdc的connector貌似,你是怎么搞的呢。
javenjiangfsof 于2021年2月1日周一 下午1:40写道:
> DataStream API,像下面这样
> ```
> val list = ... //i use jdbc to get the init data
> val dimensionInitStream = env.fromCollection(list)
> val dimension =
>
DataStream API,像下面这样
```
val list = ... //i use jdbc to get the init data
val dimensionInitStream = env.fromCollection(list)
val dimension =
dimensionStream.union(dimensionInitStream).broadcast(descriptor)
mainStream.connect(dimensionStream)
...
```
FlinkSQL ?
javenjiangfsof 于2021年2月1日周一 上午11:40写道:
> Hi 社区的各位
>
> 最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
> +
> broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
> 1.初始化通过jdbc获取,通过fromCollection处理后,union
>
Hi 社区的各位
最近也是刚刚开始接触flink,现在是有这样的一个需求,主流是来自于kafka,需要关联的配置来自于mysql,由于配置会发生新增更新删除的操作,所以使用cdc
+
broadcaststate来做配置的关联,但是这会存在一个广播流的初始化问题,我看了网上和社区中其他关于broadcaststate这个问题的回复,大概有一下两种解决方案:
1.初始化通过jdbc获取,通过fromCollection处理后,union