使用flink sql 将kafka的数据同步到mysql无法删除。

2023-02-23 文章 陈佳豪
-建表语法如下
String kafka = "CREATE TABLE `电话` " +
"(`rowID` VARCHAR(255),`名称` STRING,`手机` VARCHAR(255),`座机` VARCHAR(255),  " +
"  PRIMARY KEY (`rowID`) NOT ENFORCED  ) " +
" WITH " +
"('connector' = 'jdbc',   " +
" 'driver' = 'com.mysql.cj.jdbc.Driver',   " +
" 'url' = 'jdbc:mysql://XX:6506/meihua_test',  " +
"  'username' = 'root',  " +
"  'password' = '123456',  " +
"  'table-name' = '电话'  )";

String mysql = "CREATE TABLE `电话_1` " +
"(`rowid` VARCHAR(100)," +
"`63f73b332e77497da91286f0` VARCHAR(100)," +
"`63f73b3f2e77497da91286fb` VARCHAR(100)," +
"`63f73b3f2e77497da91286fc` VARCHAR(100)," +
"`op` STRING ," +
" PRIMARY KEY (rowid) NOT ENFORCED )" +
" WITH " +
"( 'connector' = 'kafka', " +
"'topic' = 'sz_worksheet-63f82984f3ec743e45b0d561-63f73b332e77497da91286ef'," +
" 'properties.bootstrap.servers' = 'XX:9092'," +
" 'scan.startup.mode' = 'earliest-offset', " +
"'format' = 'debezium-json' )";
-执行语句如下
String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as 
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as 
`名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from 
`电话_1` ) as t_1";
-操作数据如下


String insert = "insert into `电话` select `t_1`.`rowID` as `rowID`,`t_1`.`名称` as 
`名称`,`t_1`.`手机` as `手机`,`t_1`.`座机` as `座机` from" +
" ( select `rowid` as `rowID`,`63f73b332e77497da91286f0` as 
`名称`,`63f73b3f2e77497da91286fb` as `手机`,`63f73b3f2e77497da91286fc` as `座机` from 
`电话_1` ) as t_1";
-执行语句如下
{
"op":"d",
"before":{
"rowid":"f251af39-1a95-4d6f-b4cb-cdf93d5d1b6d"
},
"after":null
}
现在的结论是可以新增和修改,但是无法删除。难道insert into这个语句搞不定吗? 走的debezuim json序列化的格式。
各位大佬帮看下 谢谢。

Re: flink taskmanger重启失败的问题

2023-02-23 文章 Weihua Hu
从 region 改为 full 会扩容单个 Task 故障的影响范围,可以参考社区文档:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/task_failure_recovery/

Best,
Weihua


On Fri, Feb 24, 2023 at 2:12 PM 唐世伟  wrote:

> 谢谢回复,我看日志已经超出来yarn保存的期限被删了。另外Failover从region改为full。是不是能避免这个问题啊?
>
> > 2023年2月23日 上午11:36,Weihua Hu  写道:
> >
> > Hi,
> >
> > 在 Cancel 其他 task 时会先将 task 状态置为 cancelling,这时 task 失败是不会二次触发 Failover 的。
> > 可以检查下是不是作业划分了多个 region,多个 region 的异常是统一计数的。
> >
> > 或者可以贴一下日志吗?
> >
> >
> > Best,
> > Weihua
> >
> >
> > On Thu, Feb 23, 2023 at 11:16 AM 唐世伟  wrote:
> >
> >> 我们有一个flink任务,同时写10几张doris表,每次doris出问题的时候任务就挂,flink的重启策略没有效果。
> >> flink的重启配置入下:
> >> restart-strategy: failure-rate
> >> restart-strategy.failure-rate.delay: 60 s
> >> restart-strategy.failure-rate.failure-rate-interval: 10 min
> >> restart-strategy.failure-rate.max-failures-per-interval: 3
> >>
> >> 这边看了一下任务日志逻辑,发现任务写doris失败的时候,进入了重启流程,然后尝试cancel其他的operator。而每次cancel
> >>
> operator的时候都会触发当前operator的checkpoint。但是由于存在其他大量写doris表的算子。在执行checkpoint都会尝试flush数据到doris,导致再次报错calcel失败。而每次失败都会计入尝试重启次数,最后导致超过重启上限次数,任务直接挂了。请问这个是不是不太合理?理论上说,执行失败就失败了,没必要计入重启失败次数。最后导致重启失败。这个有办法调整吗?
>
>


Re: flink taskmanger重启失败的问题

2023-02-23 文章 唐世伟
谢谢回复,我看日志已经超出来yarn保存的期限被删了。另外Failover从region改为full。是不是能避免这个问题啊?

> 2023年2月23日 上午11:36,Weihua Hu  写道:
> 
> Hi,
> 
> 在 Cancel 其他 task 时会先将 task 状态置为 cancelling,这时 task 失败是不会二次触发 Failover 的。
> 可以检查下是不是作业划分了多个 region,多个 region 的异常是统一计数的。
> 
> 或者可以贴一下日志吗?
> 
> 
> Best,
> Weihua
> 
> 
> On Thu, Feb 23, 2023 at 11:16 AM 唐世伟  wrote:
> 
>> 我们有一个flink任务,同时写10几张doris表,每次doris出问题的时候任务就挂,flink的重启策略没有效果。
>> flink的重启配置入下:
>> restart-strategy: failure-rate
>> restart-strategy.failure-rate.delay: 60 s
>> restart-strategy.failure-rate.failure-rate-interval: 10 min
>> restart-strategy.failure-rate.max-failures-per-interval: 3
>> 
>> 这边看了一下任务日志逻辑,发现任务写doris失败的时候,进入了重启流程,然后尝试cancel其他的operator。而每次cancel
>> operator的时候都会触发当前operator的checkpoint。但是由于存在其他大量写doris表的算子。在执行checkpoint都会尝试flush数据到doris,导致再次报错calcel失败。而每次失败都会计入尝试重启次数,最后导致超过重启上限次数,任务直接挂了。请问这个是不是不太合理?理论上说,执行失败就失败了,没必要计入重启失败次数。最后导致重启失败。这个有办法调整吗?



Re: Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 Shuo Cheng
> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?

Sink upsert materialize would be applied in the following circumstances:
1. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to FORCE and sink's primary key
nonempty.
2. `TABLE_EXEC_SINK_UPSERT_MATERIALIZE` set to AUTO and sink's primary key
doesn't contain upsert keys of the input update stream.

Note: upsert materializing operator use state to resolve disorder problems
which may incur additional performance regression.

Best,
Shuo

On Fri, Feb 24, 2023 at 10:02 AM casel.chen  wrote:

> 你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force?
>
>
> Because of the disorder of ChangeLog data caused by Shuffle in distributed
> system, the data received by Sink may not be the order of global upsert. So
> add upsert materialize operator before upsert sink. It receives the
> upstream changelog records and generate an upsert view for the downstream.
> By default, the materialize operator will be added when a distributed
> disorder occurs on unique keys. You can also choose no
> materialization(NONE) or force materialization(FORCE).
>
> Possible values:
> "NONE"
> "AUTO"
> "FORCE"
>
>
> public static final ConfigOption
> TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
> key("table.exec.sink.upsert-materialize")
> .enumType(UpsertMaterialize.class)
> .defaultValue(UpsertMaterialize.AUTO)
> .withDescription(
> Description.builder()
> .text(
> "Because of the disorder of
> ChangeLog data caused by Shuffle in distributed system, "
> + "the data received
> by Sink may not be the order of global upsert. "
> + "So add upsert
> materialize operator before upsert sink. It receives the "
> + "upstream changelog
> records and generate an upsert view for the downstream.")
> .linebreak()
> .text(
> "By default, the materialize
> operator will be added when a distributed disorder "
> + "occurs on unique
> keys. You can also choose no materialization(NONE) "
> + "or force
> materialization(FORCE).")
> .build());
>
>
>
>
>
> 在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
> >Hi,
> >
> >Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
> >ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details
> about
> >solution of disordering problems in KeyBy shuffling.
> >
> >Best,
> >Shuo
> >
> >On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
> >
> >>
> >>
> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
> >>
> >>
> >> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
> >> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
> >> >
> >> >Best,
> >> >Shengkai
> >> >
> >> >[1]
> >> >
> >>
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
> >> >
> >> >Shammon FY  于2023年2月20日周一 08:41写道:
> >> >
> >> >> Hi
> >> >>
> >> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
> >> >>
> >> >> Best,
> >> >> Shammon
> >> >>
> >> >>
> >> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
> >> >>
> >> >> > Hi,
> >> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
> >> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by
> 主键,然后再执行insert
> >> into
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> >
> >> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
> >> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
> >> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
> >> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
> >> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
> >> >> > >
> >> >> > >
> >> >> > >请问:
> >> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
> >> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
> >> >> > >我理解flink
> >> >> >
> >> >>
> >>
> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
> >> >> > >
> >> >> >
> >> >>
> >>
>


Re:Re: Re: [急] flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?

2023-02-23 文章 casel.chen
你说的这个参数我看了默认值不是auto吗?需要我显式地指定为force? 


Because of the disorder of ChangeLog data caused by Shuffle in distributed 
system, the data received by Sink may not be the order of global upsert. So add 
upsert materialize operator before upsert sink. It receives the upstream 
changelog records and generate an upsert view for the downstream.
By default, the materialize operator will be added when a distributed disorder 
occurs on unique keys. You can also choose no materialization(NONE) or force 
materialization(FORCE).

Possible values:
"NONE"
"AUTO"
"FORCE"


public static final ConfigOption 
TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
key("table.exec.sink.upsert-materialize")
.enumType(UpsertMaterialize.class)
.defaultValue(UpsertMaterialize.AUTO)
.withDescription(
Description.builder()
.text(
"Because of the disorder of 
ChangeLog data caused by Shuffle in distributed system, "
+ "the data received by 
Sink may not be the order of global upsert. "
+ "So add upsert 
materialize operator before upsert sink. It receives the "
+ "upstream changelog 
records and generate an upsert view for the downstream.")
.linebreak()
.text(
"By default, the materialize 
operator will be added when a distributed disorder "
+ "occurs on unique keys. 
You can also choose no materialization(NONE) "
+ "or force 
materialization(FORCE).")
.build());





在 2023-02-22 15:34:27,"Shuo Cheng"  写道:
>Hi,
>
>Re *"如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?",  *checking out
>ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE for details about
>solution of disordering problems in KeyBy shuffling.
>
>Best,
>Shuo
>
>On Wed, Feb 22, 2023 at 10:23 AM casel.chen  wrote:
>
>>
>> 如果像楼上所说[1]主动keyby将同一主键记录汇聚到同一个TM处理的话,Flink如何确保同一主键变更记录(上游接的是cdc数据源)的时间先后顺序性呢?
>>
>>
>> 在 2023-02-20 09:50:50,"Shengkai Fang"  写道:
>> >我理解如果sink 表上带有 pk,就会主动 keyby 的[1]。
>> >
>> >Best,
>> >Shengkai
>> >
>> >[1]
>> >
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java#L188
>> >
>> >Shammon FY  于2023年2月20日周一 08:41写道:
>> >
>> >> Hi
>> >>
>> >> 如果join计算的关联字段里没有主键,在将join结果直接写入sink表时,像上面RS提到的,需要自己再增加一次shuffle操作
>> >>
>> >> Best,
>> >> Shammon
>> >>
>> >>
>> >> On Sun, Feb 19, 2023 at 1:43 PM RS  wrote:
>> >>
>> >> > Hi,
>> >> > connector里面配置的主键控制是写入存储的,有些存储在写入数据的时候,能根据主键自动更新去重
>> >> > 所以我感觉,你这里的应该是想在计算的时候shuffle(写入之前),你应该需要先执行一个 group by 主键,然后再执行insert
>> into
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> >
>> >> > 在 2023-02-17 15:56:51,"casel.chen"  写道:
>> >> > >作业场景是kafka cdc数据源关联几张redis维表再和其他流表进行双流regular inner
>> >> > join,最后将打宽表写入mongodb。使用的是flink 1.13.2 sql模式。开了debug日志。
>> >> > >测试下来发现相同主键的记录在不同的taskmanager上打了日志(我是在Sink
>> >> > Function的invoke方法打的日志),该行为导致最终结果表数据不正确。
>> >> > >
>> >> > >
>> >> > >请问:
>> >> > >flink sql写数据到带主键表时能确保相同主键的记录都被同一个taskmanager处理吗?
>> >> > >是因为flink版本旧不支持吗?从flink哪个版本开始支持的呢?
>> >> > >我理解flink
>> >> >
>> >>
>> sql结果表上定义主键的目的就是期望按主键进行shuffle,确保相同主键的数据被同一个taskmanager处理以确保正确的变更顺序,不知道我理解得对不对。
>> >> > >
>> >> >
>> >>
>>


答复:

2023-02-23 文章 704669594
退订



[ANNOUNCE] Apache Flink Kubernetes Operator 1.4.0 released

2023-02-23 文章 Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.4.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Release highlights:

   - Flink Job Autoscaler initial implementation
   - Stability improvements
   - Support for Zookeeper based HA

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at: https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352604

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Gyula Fora