1. 对于 upsert-kafka 会默认加上 ChangelogNormalize 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 forward。
Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate On Thu, 4 Mar 2021 at 15:30, Qishang <zhongqish...@gmail.com> wrote: > Hi 社区。 > Flink 1.12.1 > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 > forword 的ETL没有作用。 > > insert into table_a select id,udf(a),b,c from table_b; > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > ``` > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, temp_table]], fields=[id...]) > > Stage 3 : Operator > content : ChangelogNormalize(key=[id]) > ship_strategy : HASH > > Stage 4 : Operator > content : Calc(select=[...]) > ship_strategy : FORWARD > > Stage 5 : Data Sink > content : Sink: Sink(table=[default_catalog.default_database.table_a], > fields=[id...]) > ship_strategy : FORWARD > ``` >