Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 Shuo Cheng
> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?

Sink upsert materialize would be applied in the following circumstances:
1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key
nonempty.
2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key
doesn't contain upsert keys of the input update stream.

Note: upsert materializing operator use state to resolve disorder problems
which may incur additional performance regression.

Best,
Shuo

On Fri, Feb 24, 2023 at 10:02 AM casel.chen  wrote:

> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
>
>
> Because of the disorder of ChangeLog data caused by Shuffle in distributed
> system, the data received by Sink may not be the order of global upsert. So
> add upsert materialize operator before upsert sink. It receives the
> upstream changelog records and generate an upsert view for the downstream.
> By default, the materialize operator will be added when a distributed
> disorder occurs on unique keys. You can also choose no
> materialization(NONE) or force materialization(FORCE).
>
> Possible values:
> "NONE"
> "AUTO"
> "FORCE"
>
>
> public static final ConfigOption
> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
> key("table.exec.sink.upsert-materialize")
> .enumType(UpsertMaterialize.class)
> .defaultValue(UpsertMaterialize.AUTO)
> .withDescription(
> Description.builder()
> .text(
> "Because of the disorder of
> ChangeLog data caused by Shuffle in distributed system, "
> + "the data received
> by Sink may not be the order of global upsert. "
> + "So add upsert
> materialize operator before upsert sink. It receives the "
> + "upstream changelog
> records and generate an upsert view for the downstream.")
> .linebreak()
> .text(
> "By default, the materialize
> operator will be added when a distributed disorder "
> + "occurs on unique
> keys. You can also choose no materialization(NONE) "
> + "or force
> materialization(FORCE).")
> .build());
>
>
>
>
>
> 在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
> >Hi,
> >
> >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
> >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details
> about
> >solution of disordering problems in KeyBy shuffling.
> >
> >Best,
> >Shuo
> >
> >On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
> >
> >>
> >>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
> >>
> >>
> >> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >[1]
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >> >
> >> >Shammon FY  于2023年2月20日周一 08:41写道:
> >> >
> >> >> Hi
> >> >>
> >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >> >>
> >> >> Best,
> >> >> Shammon
> >> >>
> >> >>
> >> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >> >>
> >> >> > Hi,
> >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by
> 主键,然后再执行insert
> >> into
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> >
> >> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> >> > >
> >> >> > >
> >> >> > >请问:
> >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> >> > >我理解flink
> >> >> >
> >> >>
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> >> > >
> >> >> >
> >> >>
> >>
>


Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Shuo Cheng
Hi,

Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
solution of disordering problems in KeyBy shuffling.

Best,
Shuo

On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY  于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>


Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 Weihua Hu
如果想保证每次写入 mysql 的事件是最新的,需要在 Flink 内部针对事件时间排序取 TOP 1, 可以参考[1]。 但是需要注意这需要使用
state,你可以需要指定合适的 TTL[2] 来保证 state 不会过大

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/topn/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-state-ttl

Best,
Weihua


On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:

>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>
>
> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >
> >Best,
> >Shengkai
> >
> >[1]
> >
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >
> >Shammon FY  于2023年2月20日周一 08:41写道:
> >
> >> Hi
> >>
> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >>
> >> Best,
> >> Shammon
> >>
> >>
> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >>
> >> > Hi,
> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
> into
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> >
> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> > >
> >> > >
> >> > >请问:
> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> > >我理解flink
> >> >
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> > >
> >> >
> >>
>