Re: 关于upsert-kafka connector的问题
这里有对upsert-kafka完整的一个分析的讲解:深度解析 Flink upsert-kafka[1]。如果还有问题,可以继续咨询。 [1]https://flink-learning.org.cn/developers/flink-training-course3/ Shengkai Fang 于2021年4月25日周日 上午10:16写道: > 本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。 > > 消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。 > > Best, > Shengkai > > op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道: > >> >> 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka, >> 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗 >> >> >> >> >> -- 原始邮件 -- >> 发件人: >> "user-zh" >> < >> fskm...@gmail.com>; >> 发送时间: 2021年4月23日(星期五) 中午12:20 >> 收件人: "user-zh"> >> 主题: Re: 关于upsert-kafka connector的问题 >> >> >> >> 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。 >> >> Best, >> Shengkai > >
Re: 关于upsert-kafka connector的问题
本质上,upsert-kafka是对kafka的封装,其内部仍然是一个消息队列,只是在消费的时候,我们形成一个视图。 消息从flink进入到kafka之中,根据kafka的协议保证了at-least-once。 Best, Shengkai op <520075...@qq.com> 于2021年4月23日周五 下午2:18写道: > > 谢谢,upsert-kafka作为sink可以保证相同key的数据放在同一个partition内,假如对相同key的更新数据,由于网络等原因后更新的值A的比先更新的值B提前发送到kafka, > 这个时候用upsert-kafka去消费数据更新这个key,收到A进行更新后,在收到B的时候会覆盖掉A对吗 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > fskm...@gmail.com>; > 发送时间: 2021年4月23日(星期五) 中午12:20 > 收件人: "user-zh" > 主题: Re: 关于upsert-kafka connector的问题 > > > > 如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。 > > Best, > Shengkai
?????? ????upsert-kafka connector??????
??upsert-kafkasinkkeypartition??keyA??B??kafka, ??upsert-kafka??key??A??B??A -- -- ??: "user-zh"
Re: 关于upsert-kafka connector的问题
如果数据在upsert-kafka中已经做到了按序存储(相同key的数据放在同一个partition内),那么flink消费的时候可以做到保序。 Best, Shengkai
?????? ????upsert-kafka connector??????
??upsert-kafka??key -- -- ??: "user-zh"
Re: 关于upsert-kafka connector的问题
Hi, 请问是有什么具体的问题吗? Best, Shengkai op <520075...@qq.com> 于2021年4月22日周四 下午6:05写道: > 用 upsert-kafka connector 作为source,会有key的插入和更新出现乱序导致结果不准的问题吗? > 谢谢
????upsert-kafka connector??????
?? upsert-kafka connector source??key??
Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
Hi Jark. 对于 upsert-kafka connector 有两个疑问: 1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* ` ,我试了下每次都是从 earliest 开始; 2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize 算子之后会变成2条,这个不是很理解? Qishang 于2021年3月5日周五 上午11:14写道: > > 某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 > 学到了,感谢。 > > Jark Wu 于2021年3月4日周四 下午11:11写道: > >> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize >> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json >> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true >> 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 >> forward。 >> >> Best, >> Jark >> >> [1]: >> >> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate >> >> On Thu, 4 Mar 2021 at 15:30, Qishang wrote: >> >> > Hi 社区。 >> > Flink 1.12.1 >> > >> > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition >> ,设置大的并发,对于只有 >> > forword 的ETL没有作用。 >> > >> > insert into table_a select id,udf(a),b,c from table_b; >> > >> > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 >> > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? >> > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka >> 中生效吗?可以用在我上面说的场景上面吗? >> > >> > ``` >> > == Physical Execution Plan == >> > Stage 1 : Data Source >> > content : Source: TableSourceScan(table=[[default_catalog, >> > default_database, temp_table]], fields=[id...]) >> > >> > Stage 3 : Operator >> > content : ChangelogNormalize(key=[id]) >> > ship_strategy : HASH >> > >> > Stage 4 : Operator >> > content : Calc(select=[...]) >> > ship_strategy : FORWARD >> > >> > Stage 5 : Data Sink >> > content : Sink: Sink(table=[default_catalog.default_database.table_a], >> > fields=[id...]) >> > ship_strategy : FORWARD >> > ``` >> > >> >
Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。 学到了,感谢。 Jark Wu 于2021年3月4日周四 下午11:11写道: > 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize > 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json > 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true > 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 > forward。 > > Best, > Jark > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate > > On Thu, 4 Mar 2021 at 15:30, Qishang wrote: > > > Hi 社区。 > > Flink 1.12.1 > > > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition > ,设置大的并发,对于只有 > > forword 的ETL没有作用。 > > > > insert into table_a select id,udf(a),b,c from table_b; > > > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > > > ``` > > == Physical Execution Plan == > > Stage 1 : Data Source > > content : Source: TableSourceScan(table=[[default_catalog, > > default_database, temp_table]], fields=[id...]) > > > > Stage 3 : Operator > > content : ChangelogNormalize(key=[id]) > > ship_strategy : HASH > > > > Stage 4 : Operator > > content : Calc(select=[...]) > > ship_strategy : FORWARD > > > > Stage 5 : Data Sink > > content : Sink: Sink(table=[default_catalog.default_database.table_a], > > fields=[id...]) > > ship_strategy : FORWARD > > ``` > > >
Re: Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true 参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如 forward。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/#table-exec-source-cdc-events-duplicate On Thu, 4 Mar 2021 at 15:30, Qishang wrote: > Hi 社区。 > Flink 1.12.1 > > 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 > forword 的ETL没有作用。 > > insert into table_a select id,udf(a),b,c from table_b; > > 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 > 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? > 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? > > ``` > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: TableSourceScan(table=[[default_catalog, > default_database, temp_table]], fields=[id...]) > > Stage 3 : Operator > content : ChangelogNormalize(key=[id]) > ship_strategy : HASH > > Stage 4 : Operator > content : Calc(select=[...]) > ship_strategy : FORWARD > > Stage 5 : Data Sink > content : Sink: Sink(table=[default_catalog.default_database.table_a], > fields=[id...]) > ship_strategy : FORWARD > ``` >
Flink SQL upsert-kafka connector 生成的 Stage ChangelogNormalize 算子的疑问
Hi 社区。 Flink 1.12.1 现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有 forword 的ETL没有作用。 insert into table_a select id,udf(a),b,c from table_b; 发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区 1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置? 2. 这个可以改变默认 Kakfka Partition 带来的并发限制,只在 upsert-kafka 中生效吗?可以用在我上面说的场景上面吗? ``` == Physical Execution Plan == Stage 1 : Data Source content : Source: TableSourceScan(table=[[default_catalog, default_database, temp_table]], fields=[id...]) Stage 3 : Operator content : ChangelogNormalize(key=[id]) ship_strategy : HASH Stage 4 : Operator content : Calc(select=[...]) ship_strategy : FORWARD Stage 5 : Data Sink content : Sink: Sink(table=[default_catalog.default_database.table_a], fields=[id...]) ship_strategy : FORWARD ```