Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-21 文章 casel.chen
如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?


在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
>我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>
>Best,
>Shengkai
>
>[1]
>https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>
>Shammon FY  于2023年2月20日周一 08:41写道:
>
>> Hi
>>
>> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>>
>> Best,
>> Shammon
>>
>>
>> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>>
>> > Hi,
>> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> > >
>> > >
>> > >请问:
>> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> > >我理解flink
>> >
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> > >
>> >
>>


Re:Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 文章 casel.chen
Flink SQL作业示意如下:


create table user_source_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_id BIGINT NOT NULL,
  proctime AS PROCTIME()
) with (
 'connector' = 'kafka', 
 'format' = 'canal-json',
 ...
);


create table department_dim_table (
   id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
   name STRING
) with (
 'connector' = 'jdbc',
 ...
);


create table user_rich_sink_table (
  id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
  name STRING,
  dept_name STRING
) with (
 'connector' = 'jdbc'
 ...
);


insert into user_rich_sink_table 
select id, name, d.name as dept_name 
from user_source_table u
  left join department_dim_table for system_time as of u.proctime as d 
  on u.dept_id = d.id;


用户id是主键,按你所说需要在最后insert into语句之前自己显示加group by用户id再insert?
现在是发现当作业并行度大于1时,相同用户id的记录会落到不同TaskManager上,造成数据更新状态不一致。





在 2023-02-20 08:41:20,"Shammon FY"  写道:
>Hi
>
>如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>
>Best,
>Shammon
>
>
>On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>
>> Hi,
>> connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>>
>>
>> Thanks
>>
>>
>>
>> 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >
>> >
>> >请问:
>> >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >我理解flink
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >
>>


Re:Re:[急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-19 文章 casel.chen






你说的这个在写入之前进行shuffle(先执行一个group by主键)这个操作我认为应该是Flink框架层面的事情,不应该在作业层面显式添加。
Flink框架应该在执行sink的时候判断目标表是否有主键,如果有主键的话应该插入一个group by算子将相同主键的记录发到同一个TaskManager处理。
我听说 Flink新版本1.15还是1.16不记得了已经改进了这个问题,有谁知道吗?有相关issue或PR链接没?











在 2023-02-19 13:43:29,"RS"  写道:
>Hi,
>connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert into
>
>
>Thanks
>
>
>
>在 2023-02-17 15:56:51,"casel.chen"  写道:
>>作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner 
>>join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>>测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink 
>>Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>>
>>
>>请问:
>>flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>>是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>>我理解flink 
>>sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>>