> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
Sink upsert materialize would be applied in the following circumstances: 1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key nonempty. 2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key doesn't contain upsert keys of the input update stream. Note: upsert materializing operator use state to resolve disorder problems which may incur additional performance regression. Best, Shuo On Fri, Feb 24, 2023 at 10:02 AM casel.chen <casel_c...@126.com> wrote: > 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force? > > > Because of the disorder of ChangeLog data caused by Shuffle in distributed > system, the data received by Sink may not be the order of global upsert. So > add upsert materialize operator before upsert sink. It receives the > upstream changelog records and generate an upsert view for the downstream. > By default, the materialize operator will be added when a distributed > disorder occurs on unique keys. You can also choose no > materialization(NONE) or force materialization(FORCE). > > Possible values: > "NONE" > "AUTO" > "FORCE" > > > public static final ConfigOption<UpsertMaterialize> > TABLE_EXEC_SINK_UPSERT_MATERIALIZE = > key("table.exec.sink.upsert-materialize") > .enumType(UpsertMaterialize.class) > .defaultValue(UpsertMaterialize.AUTO) > .withDescription( > Description.builder() > .text( > "Because of the disorder of > ChangeLog data caused by Shuffle in distributed system, " > + "the data received > by Sink may not be the order of global upsert. " > + "So add upsert > materialize operator before upsert sink. It receives the " > + "upstream changelog > records and generate an upsert view for the downstream.") > .linebreak() > .text( > "By default, the materialize > operator will be added when a distributed disorder " > + "occurs on unique > keys. You can also choose no materialization(NONE) " > + "or force > materialization(FORCE).") > .build()); > > > > > > 在 2023-02-22 15:34:27,"Shuo Cheng" <njucs...@gmail.com> 写道: > >Hi, > > > >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?", *checking out > >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details > about > >solution of disordering problems in KeyBy shuffling. > > > >Best, > >Shuo > > > >On Wed, Feb 22, 2023 at 10:23 AM casel.chen <casel_c...@126.com> wrote: > > > >> > >> > 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢? > >> > >> > >> 在 2023-02-20 09:50:50,"Shengkai Fang" <fskm...@gmail.com> 写道: > >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。 > >> > > >> >Best, > >> >Shengkai > >> > > >> >[1] > >> > > >> > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188 > >> > > >> >Shammon FY <zjur...@gmail.com> 于2023年2月20日周一 08:41写道: > >> > > >> >> Hi > >> >> > >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作 > >> >> > >> >> Best, > >> >> Shammon > >> >> > >> >> > >> >> On Sun, Feb 19, 2023 at 1:43 PM RS <tinyshr...@163.com> wrote: > >> >> > >> >> > Hi, > >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重 > >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by > 主键,然后再执行insert > >> into > >> >> > > >> >> > > >> >> > Thanks > >> >> > > >> >> > > >> >> > > >> >> > 在 2023-02-17 15:56:51,"casel.chen" <casel_c...@126.com> 写道: > >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner > >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。 > >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink > >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。 > >> >> > > > >> >> > > > >> >> > >请问: > >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗? > >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢? > >> >> > >我理解flink > >> >> > > >> >> > >> > sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。 > >> >> > > > >> >> > > >> >> > >> >