Re: 关于 mongo db 的splitVector 权限问题

2024-05-23 文章 Jiabao Sun
Hi,

splitVector 是 MongoDB 计算分片的内部命令,在副本集部署模式下也可以使用此命令来计算 chunk 区间。
如果没有 splitVector 权限,会自动降级为 sample 切分策略。

Best,
Jiabao

evio12...@gmail.com  于2024年5月23日周四 16:57写道:

>
> hello~
>
>
> 我正在使用 flink-cdc mongodb connector 2.3.0
> 
>  (
> https://github.com/apache/flink-cdc/blob/release-2.3/docs/content/connectors/mongodb-cdc.md)
> ,
> 文档中指出 mongo 账号需要这些权限 'splitVector', 'listDatabases', 'listCollections',
> 'collStats', 'find', and 'changeStream' ,
>
>
> 我现在使用的mongo是 replica-set , 但是了解到 splitVector 权限主要是对分片集,
> 如果DBA不授权 splitVector , 会有什么影响呢?
>
> --
> evio12...@gmail.com
>


Re: flink集群如何将日志直接写入elasticsearch中?

2024-03-13 文章 Jiabao Sun
比较简单的方式是启动一个filebeat进程,抓取 jobmanager.log 和t askmanager.log

Best,
Jiabao

kellygeorg...@163.com  于2024年3月13日周三 15:30写道:

> 有没有比较方便快捷的解决方案?
>
>
>


RE: Re:RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-22 文章 Jiabao Sun
Hi,

ResumeToken[1] can be considered globally unique[2].

Best,
Jiabao

[1] https://www.mongodb.com/docs/manual/changeStreams/#resume-tokens
[2] 
https://img.alicdn.com/imgextra/i4/O1CN010e81SP1vkgoyL0nhd_!!66211-0-tps-2468-1360.jpg

On 2024/01/22 09:36:42 "casel.chen" wrote:
> 
> 
> 
> V1版本依赖于DebeziumSourceFunction,后者依赖于DebeziumEngine产生changelog
> V2版本虽然依赖了 flink-connector-debezium 但没有用到debezium内部类
> 
> 
> 另外有一个问题:mongodb change stream断点续传用的resumeToken是像mysql binlog offset一样全局唯一么?
> 如果数据源是像kafka一样每个分片有binlog offset的话,
> 是不是要在对应xxxOffset类中要定义一个Map类型的offsetField 
> (类似mongodb对应ChangeStreamOffset中的resumeTokenField)? 
> 当前mongodb中定义的是Json String类型
> 
> 在 2024-01-22 11:03:55,"Jiabao Sun"  写道:
> >Hi,
> >
> >Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
> >Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。
> >
> >Best,
> >Jiabao
> >
> >[1] https://github.com/mongodb/mongo-kafka
> >
> >
> >On 2024/01/22 02:57:38 "casel.chen" wrote:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> Flink CDC MongoDB connector 还是基于debezium实现的
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >> >Hi,
> >> >
> >> >可以参考 Flink CDC MongoDB connector 的实现。
> >> >
> >> >Best,
> >> >Jiabao
> >> >
> >> >
> >> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> >> 
> 

RE: Re:RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 Jiabao Sun
Hi,

Flink CDC MongoDB connector V1是基于 mongo-kafka 实现的,没有基于 debezium 实现。
Flink CDC MongoDB connector V2是基于增量快照读框架实现的,不依赖 mongo-kafka 和 debezium 。

Best,
Jiabao

[1] https://github.com/mongodb/mongo-kafka


On 2024/01/22 02:57:38 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> Flink CDC MongoDB connector 还是基于debezium实现的
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-22 10:14:32,"Jiabao Sun"  写道:
> >Hi,
> >
> >可以参考 Flink CDC MongoDB connector 的实现。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/22 02:06:37 "casel.chen" wrote:
> >> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> >> 3.x自行开发,查了一下现有大部分flink cdc source 
> >> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
> >>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> >> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!
> 

RE: 如何基于flink cdc 3.x自定义不基于debezium的全增量一体source connector?

2024-01-21 文章 Jiabao Sun
Hi,

可以参考 Flink CDC MongoDB connector 的实现。

Best,
Jiabao


On 2024/01/22 02:06:37 "casel.chen" wrote:
> 现有一种数据源不在debezium支持范围内,需要通过flink sql全增量一体消费,想着基于flink cdc 
> 3.x自行开发,查了一下现有大部分flink cdc source 
> connector都是基于debezium库开发的,只有oceanbase和tidb不是,但后二者用的source接口还是V1版本的,不是最新V2版本的incremental
>  snapshot,意味着全量阶段不支持checkpoint,如果作业失败需要重新从头消费。
> 想问一下有没有不基于debezium实现的V2版本source connector示例?谢谢!

RE: Re:RE: binlog文件丢失问题

2024-01-19 文章 Jiabao Sun
Hi,

日志中有包含 GTID 的内容吗?
用 SHOW VARIABLES LIKE 'gtid_mode’; 确认下是否开启了GTID呢?

Best,
Jiabao


On 2024/01/19 09:36:38 wyk wrote:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 抱歉,具体报错和代码如下:
> 
> 
> 报错部分:
> Caused by: java.lang.IllegalStateException: The connector is trying to read 
> binlog starting at 
> Struct{version=1.5.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1705645599953,db=,server_id=0,file=mysql_bin.007132,pos=729790304,row=0},
>  but this is no longer available on the server. Reconfigure the connector to 
> use a snapshot when needed.
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:179)
> at 
> com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:112)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:93)
> at 
> com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:65)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:170)
> at 
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:75)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 
> 代码部分: 
> if (!isBinlogAvailable(mySqlOffsetContext)) {
> throw new IllegalStateException(
> "The connector is trying to read binlog starting at "
> + mySqlOffsetContext.getSourceInfo()
> + ", but this is no longer "
> + "available on the server. Reconfigure the connector to 
> use a snapshot when needed.");
> }
> 
> 在 2024-01-19 17:33:03,"Jiabao Sun"  写道:
> >Hi,
> >
> >你的图挂了,可以贴一下图床链接或者直接贴一下代码。
> >
> >Best,
> >Jiabao
> >
> >
> >On 2024/01/19 09:16:55 wyk wrote:
> >> 
> >> 
> >> 各位大佬好:
> >> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> >> 
> >> 
> >> 问题描述:
> >> 场景: 公司mysql有两个备库: 备库1和备库2。
> >> 1. 现在备库1需要下线,需要将任务迁移至备库2
> >> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> >> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> >> 
> >> 
> >> 
> >> 
> >> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> >> 
> >> 
> >> 注意: 备库一个备库二的gtid是保持一致的
> >> 
> >> 
> >> 
> >> 
> >> 图一:
> >> 
> >> 
> >> 图二:
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> 

RE: binlog文件丢失问题

2024-01-19 文章 Jiabao Sun
Hi,

你的图挂了,可以贴一下图床链接或者直接贴一下代码。

Best,
Jiabao


On 2024/01/19 09:16:55 wyk wrote:
> 
> 
> 各位大佬好:
> 现在有一个binlog文件丢失问题,需要请教各位,具体问题描述如下:
> 
> 
> 问题描述:
> 场景: 公司mysql有两个备库: 备库1和备库2。
> 1. 现在备库1需要下线,需要将任务迁移至备库2
> 2.我正常将任务保存savepoint后,将链接信息修改为备库2从savepoint启动,这个时候提示报错binlog文件不存在问题,报错截图如下图一
> 3.我根据报错找到对应代码(如下图二)后,发现是一块校验binlog文件是否存在的逻辑,我理解的是我们从gtid启动不需要对binlog文件进行操作,就将这部分代码进行了注释,任务能够正常从savepoint启动,并且数据接入正常
> 
> 
> 
> 
> 疑问: 想问一下校验binlog文件是否存在这块逻辑是否需要,或者是应该修改为校验gtid是否存在,期待您的回复,谢谢
> 
> 
> 注意: 备库一个备库二的gtid是保持一致的
> 
> 
> 
> 
> 图一:
> 
> 
> 图二:
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

RE: Re:RE: RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

oracle cdc connector 已经接入增量快照读框架,动态加表也是可以统一去实现的。
可以去社区创建issue,也欢迎直接贡献。

Best,
Jiabao


On 2024/01/19 04:46:21 "casel.chen" wrote:
> 
> 
> 
> 
> 
> 
> 想知道oracle cdc connector不支持动态加表的原因是什么?可否自己扩展实现呢?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-01-19 11:53:49,"Jiabao Sun"  写道:
> >Hi,
> >
> >Oracle CDC connector[1] 目前是不支持动态加表的。
> >
> >Best,
> >Jiabao
> >
> >[1] 
> >https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html
> >
> >
> >On 2024/01/19 03:37:41 Jiabao Sun wrote:
> >> Hi,
> >> 
> >> 请提供一下 flink cdc 的版本,使用的什么连接器。
> >> 如果方便的话,也请提供一下日志。
> >> 另外,table 的正则表达式可以匹配到新增的表吗?
> >> 
> >> Best,
> >> Jiabao
> >> 
> >> [1] 
> >> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> >> 
> >> On 2024/01/19 03:27:22 王凯 wrote:
> >> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> >> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> >> > 
> >> > 
> >> > 王凯
> >> > 2813732...@qq.com
> >> > 
> >> > 
> >> > 
> >> > 
> 

RE: 退订

2024-01-18 文章 Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from u...@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
u...@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/19 03:39:52 李乐 wrote:
> 退订

RE: RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

Oracle CDC connector[1] 目前是不支持动态加表的。

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/oracle-cdc.html


On 2024/01/19 03:37:41 Jiabao Sun wrote:
> Hi,
> 
> 请提供一下 flink cdc 的版本,使用的什么连接器。
> 如果方便的话,也请提供一下日志。
> 另外,table 的正则表达式可以匹配到新增的表吗?
> 
> Best,
> Jiabao
> 
> [1] 
> https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15
> 
> On 2024/01/19 03:27:22 王凯 wrote:
> > 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> > 参数,当从savepoint重启时,新添加的表的数据不能同步
> > 
> > 
> > 王凯
> > 2813732...@qq.com
> > 
> > 
> > 
> > 

RE: flink cdc动态加表不生效

2024-01-18 文章 Jiabao Sun
Hi,

请提供一下 flink cdc 的版本,使用的什么连接器。
如果方便的话,也请提供一下日志。
另外,table 的正则表达式可以匹配到新增的表吗?

Best,
Jiabao

[1] 
https://ververica.github.io/flink-cdc-connectors/release-3.0/content/connectors/mysql-cdc%28ZH%29.html#id15

On 2024/01/19 03:27:22 王凯 wrote:
> 在使用flink cdc进行数据同步时,添加--scan.newly-added-table.enabled=true 
> 参数,当从savepoint重启时,新添加的表的数据不能同步
> 
> 
> 王凯
> 2813732...@qq.com
> 
> 
> 
> 

RE: 实时数仓场景落地问题

2024-01-14 文章 Jiabao Sun
Hi,

可以尝试使用 Flink CDC + Apache Paimon 去构建实时数仓。
目前 Paimon 已经支持使用 Flink CDC 将数据整库入湖,可以使用较小的成本进行实时入湖。
另外利用 Paimon partial update的特性,可以以较小的计算成本去构建 ADS 层宽表。
Paimon 也可以同时支持批式计算和流式计算,对于时效性和计算成本可以使用灵活的计算方式做平衡。

Best,
Jiabao


On 2024/01/14 12:57:29 海风 wrote:
> hello,公司里业务会拿一张t+1的离线数仓表名,经常是ads应用层的,问你可不可以做成实时表,大家有碰到这类需求嘛?我的理解现在虽然有实时数仓,或者流批一体这样探索,但是远没有到层级很深的ads层t+1离线表可能以较小的成本去实现实时化。
> 引申的问题是当前实时数仓已有较大规模的场景落地么?有哪些场景落地呢?落地的效果成本与效果大概是怎么样的呢?
> 
> 
> 

RE: 退订

2024-01-14 文章 Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org 
 if you want to
unsubscribe the mail from u...@flink.apache.org , 
and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 
 地址来取消订阅来自
u...@flink.apache.org  邮件组的邮件,你可以参考[1][2] 
管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/01/14 03:17:44 王春顺 wrote:
> 
> 退订

RE: 如何在IDE里面调试flink cdc 3.0作业?

2024-01-02 文章 Jiabao Sun
Hi,

可以参考下这篇文档[1],进行简单的测试。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1L6cJiqYkAsZ_nDa3MgRwV3SKQuw5OrMbqGC4YgzgKR4/edit#heading=h.aybxdd96r62i


On 2024/01/02 08:02:10 "casel.chen" wrote:
> 我想在Intellij Idea里面调试mysql-to-doris.yaml作业要如何操作呢?flink-cdc-cli模块需要依赖 
> flink-cdc-pipeline-connector-mysql 和 flink-cdc-pipeline-connector-doris 模块么?

Re: Flink CDC中如何在Snapshot阶段读取数据时进行限流?

2024-01-01 文章 Jiabao Sun
Hi,

GuavaFlinkConnectorRateLimiter 目前只在 flink-connector-gcp-pubsub[1] 有使用。
Flink CDC 还未支持限流[2],目前可以尝试降低 snapshot 并发数来缓解数据库压力。 

Best,
Jiabao

[1] 
https://github.com/apache/flink-connector-gcp-pubsub/blob/f5372f25cfc1954d00a4b2fc9342e8ed5a3ef3ab/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSource.java#L22
[2] https://github.com/ververica/flink-cdc-connectors/issues/510


> 2024年1月2日 11:39,casel.chen  写道:
> 
> 业务表存量数据很大,如果不加限流直接使用flink cdc读取snapshot阶段数据的话会造成业务库压力,触发数据库告警,影响在线业务。
> 请问Flink CDC中如何在Snapshot阶段读取数据时进行限流?
> 
> 
> 我看到社区之前有人提议过,但issue一直是open状态
> https://issues.apache.org/jira/browse/FLINK-18740
> 
> 
> 另外,我在flink最新master分支源码中有找到 
> GuavaFlinkConnectorRateLimiter,但没有找到调用它的例子,请问如何在flink作业中使用限流呢?



RE: FileSystem Connector如何优雅的支持同时写入多个路径

2023-12-29 文章 Jiabao Sun
Hi,

使用 SQL 的话不太好实现写入多个路径,
使用 DataStream 的话可以考虑自己实现一个 RichSinkFunction。

Best,
Jiabao

On 2023/12/29 08:37:34 jinzhuguang wrote:
> Flink版本:1.16.0
> 
> 看官网上的案例:
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,
>   ...
>   part_name1 INT,
>   part_name2 STRING
> ) PARTITIONED BY (part_name1, part_name2) WITH (
>   'connector' = 'filesystem',   -- 必选:指定连接器类型
>   'path' = 'file:///path/to/whatever',  -- 必选:指定路径
>   'format' = '...', -- 必选:文件系统连接器指定 format
> -- 有关更多详情,请参考 Table Formats
>   'partition.default-name' = '...', -- 可选:默认的分区名,动态分区模式下分区字段值是 null 或空字符串
> 
>   -- 可选:该属性开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 
> 的文件数,但是可能会导致数据倾斜,默认值是 false
>   'sink.shuffle-by-partition.enable' = '...',
>   ...
> )
> 目前只支持写入一个path,有没有大佬有过最佳实践,如何写入多个path。

RE: Flink SQL Windowing TVFs

2023-12-28 文章 Jiabao Sun
Hi,

在 1.14.0 版本中,CUMULATE 函数是需要用在GROUP BY聚合场景下的[1]。
部署到生产的 SQL 是否包含了 GROUP BY 表达式?
本地测试的Flink版本是不是1.14.0?

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate



On 2023/12/29 04:57:09 "jiaot...@mail.jj.cn" wrote:
> Hi,
>  我在使用1.14.0版本Flink,本地测试了CUMULATE(TABLE kafka, DESCRIPTOR(rowtime), 
> INTERVAL '60' SECOND, INTERVAL '1' DAYS)方法可以正常运行,但是当我将其部署到线上环境报了如下错误:
>  org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: Currently Flink doesn't support individual window 
> table-valued function CUMULATE(time_col=[rowtime], max_size=[8640 ms], 
> step=[1 min]).
>  Please use window table-valued function with the following computations:
>  1. aggregate using window_start and window_end as group keys.
>  2. topN using window_start and window_end as partition key.
>  3. join with join condition contains window starts equality of input 
> tables and window ends equality of input tables.
>  请问这是因为线上包版本导致的吗,如果是版本问题,具体是哪一个包呢
>  非常感谢
> 

Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

是的,目前来说会 block 住。
flush + apply schema change 一般来说不会持续太长时间,
且 schema 变更一般来说是低频事件,即使 block 也不会有太大性能影响。

Best,
Jiabao


> 2023年12月28日 12:57,casel.chen  写道:
> 
> 
> 
> 
> 感谢解惑!
> 还有一个问题:如果一个 pipeline 涉及多张表数据同步,而只有一个表出现 schema 变更的话,其他表的数据处理也会 block 住吗?
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-12-28 01:16:40,"Jiabao Sun"  写道:
>> Hi,
>> 
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>> 
>> Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
>> writer,参考 DorisEventSerializer
>> 
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>> 被 block 的原因是 responseFuture没有 
>> complete,在SchemaOperator.sendRequestToCoordinator 使用 responseFuture.get() 
>> 在没有完成时会 block 住。 
>> 只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
>> waitFlushSuccess的responseFuture 标记为 complete。
>> 参考 
>> SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.
>> 
>> 保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。
>> 
>> Best,
>> Jiabao
>> 
>> [1] 
>> https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit
>> 
>>> 2023年12月27日 22:14,casel.chen  写道:
>>> 
>>> 看了infoq介绍flink cdc 3.0文章 
>>> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
>>> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
>>> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
>>> d6 其中d代表数据变更,s代表schema变更
>>> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
>>> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
>>> 而Task2处理 d3, d4, d5, s3, d6
>>> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
>>> 
>>> 
>>> SchemaOperator代码中
>>> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
>>> schemaChangeEvent) {
>>>   // The request will need to send a FlushEvent or block until flushing 
>>> finished
>>>   SchemaChangeResponse response = requestSchemaChange(tableId, 
>>> schemaChangeEvent);
>>>   if (response.isShouldSendFlushEvent()) {
>>>   LOG.info(
>>>   "Sending the FlushEvent for table {} in subtask {}.",
>>>   tableId,
>>>   getRuntimeContext().getIndexOfThisSubtask());
>>>   output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>>>   output.collect(new StreamRecord<>(schemaChangeEvent));
>>>   // The request will block until flushing finished in each sink 
>>> writer
>>>   requestReleaseUpstream();
>>>   }
>>>   }
>>> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
>>> 还要发送一次SchemaChangeEvent呢?
>>> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
>>> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
>>> upstream的呢?
>>> 求指教,谢谢!
>>> 



Re: flink cdc 3.0 schema evolution原理是怎样的?

2023-12-27 文章 Jiabao Sun
Hi,

> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?

Sink 也会收到 SchemaChangeEvent,因为 Sink 可能需要根据 Schema 变更的情况来调整 serializer 或 
writer,参考 DorisEventSerializer

> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
被 block 的原因是 responseFuture没有 complete,在SchemaOperator.sendRequestToCoordinator 
使用 responseFuture.get() 在没有完成时会 block 住。 
只有当收到 FlushSuccessEvent 时,才会执行 schema 变更,当 schema 变更完毕后,将 
waitFlushSuccess的responseFuture 标记为 complete。
参考 
SchemaRegistryRequestHandler.handleSchemaChangeRequest:100~105,SchemaRegistryRequestHandler.flushSuccess:148~150.

保证顺序的问题比较复杂,可以参考一下源码和设计文档 [1]。

Best,
Jiabao

[1] 
https://docs.google.com/document/d/1tJ0JSnpe_a4BgLmTGQyG-hs4O7Ui8aUtdT4PVIkBWPY/edit

> 2023年12月27日 22:14,casel.chen  写道:
> 
> 看了infoq介绍flink cdc 3.0文章 
> https://xie.infoq.cn/article/a80608df71c5291186153600b,我对其中schema. 
> evolution设计原理想不明白,框架是如何做到schema change顺序性的。文章介绍得并不详细。
> 从mysql binlog产生changeEvent来看,所有的变更都是时间线性的,例如s1, d1, d2, s2, d3, d4, d5, s3, 
> d6 其中d代表数据变更,s代表schema变更
> 这意味着d1,d2使用的是s1 schema,而d3~d5用的是s2 schema,最后d6使用的是s3 schema。
> 如果flink开多个并发进行处理的话,这些变更序列会被分发到不同task上进行处理,例如2个并行度下,Task1处理 s1, d1, d2, s2, 
> 而Task2处理 d3, d4, d5, s3, d6
> 这时候数据schema版本顺序性如何保障?会不会用错误的schema版本处理了数据变更呢?
> 
> 
> SchemaOperator代码中
> private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
> schemaChangeEvent) {
>// The request will need to send a FlushEvent or block until flushing 
> finished
>SchemaChangeResponse response = requestSchemaChange(tableId, 
> schemaChangeEvent);
>if (response.isShouldSendFlushEvent()) {
>LOG.info(
>"Sending the FlushEvent for table {} in subtask {}.",
>tableId,
>getRuntimeContext().getIndexOfThisSubtask());
>output.collect(new StreamRecord<>(new FlushEvent(tableId)));
>output.collect(new StreamRecord<>(schemaChangeEvent));
>// The request will block until flushing finished in each sink 
> writer
>requestReleaseUpstream();
>}
>}
> 为什么最后output.collect(new StreamRecord<>(schemaChangeEvent)); 
> 还要发送一次SchemaChangeEvent呢?
> 当收到FlushSuccessEvent后SchemaRegistryRequestHandler不是已经调用MetadataApplier执行schemaChange动作了么?
> 最后一行requestReleaseUpstream()执行被block的原因是什么?是如何hold upstream然后再release 
> upstream的呢?
> 求指教,谢谢!
> 



RE: lock up表过滤条件下推导致的bug

2023-12-25 文章 Jiabao Sun
Hi,

邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。

Best,
Jiabao


On 2023/12/25 12:22:41 杨光跃 wrote:
> 我的sql如下:
> 、
> 
> 
> t_purch_apply_sent_route 是通过flink cdc创建的
> t_purch_apply_sent_route_goods 是普通的jdbc
> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据
> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推
> 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
> 
> 
> 
> 

RE: Re:Flink脏数据处理

2023-12-21 文章 Jiabao Sun
Hi,

需要精准控制异常数据的话,就不太推荐flink sql了。
考虑使用DataStream将异常数据用侧流输出[1],再做补偿。

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/side_output/


On 2023/12/06 08:45:20 Xuyang wrote:
> Hi, 
> 目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
> 1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
> 2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。
> 
> 
> 但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。
> 
> 
> 要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
> connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]
> 
> 
> [1] 
> https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program
> 
> 
> 
> --
> 
> Best!
> Xuyang
> 
> 
> 
> 
> 
> 在 2023-12-06 15:26:56,"刘建"  写道:
> >Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
> >我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等
> 

RE: Re:Re:flink sql支持批量lookup join

2023-12-21 文章 Jiabao Sun
Hi, casel.

使用三次lookup join是可以实现的,加上缓存,性能应该不差。

WITH users AS (
SELECT *
  FROM (VALUES(1, 'zhangsan'), (2, 'lisi'), (3, 'wangwu')) T (id, name)
)
SELECT orders.id, 
   u1.name as creator_name,
   u2.name as approver_name,
   u3.name as deployer_name
FROM (
   SELECT *
  FROM (VALUES(1, 1, 2, 3)) T (id, creator_id, approver_id, deployer_id)
) AS orders
LEFT JOIN users AS u1 ON orders.creator_id = u1.id
LEFT JOIN users AS u2 ON orders.approver_id = u2.id
LEFT JOIN users AS u3 ON orders.deployer_id = u3.id;

Best,
Jiabao

On 2023/11/22 12:44:47 "casel.chen" wrote:
> 有一张维表 user,包含id和name字段
> id  | name
> -
> 1 | zhangsan
> 2 | lisi
> 3 | wangwu
> 
> 
> 现在实时来了一条交易数据 
> id  | creator_id  | approver_id  | deployer_id
> -
> 1   | 1| 2   | 3
> 
> 
> 希望lookup维表user返回各用户名称
> id   |  creator_name   |  approver_name  |  deployer_name
> 
> 1| zhangsan  |  lisi|. wangwu
> 
> 
> 
> 以上场景用flink sql要如何实现?
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2023-11-22 12:37:10,"Xuyang"  写道:
> >Hi, casel.
> >可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
> >k3=v3的用法的。
> >
> >
> >
> >
> >--
> >
> >Best!
> >Xuyang
> >
> >
> >
> >
> >在 2023-11-22 11:55:11,"casel.chen"  写道:
> >>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
> >>
> >>
> >>id key1 key2 key3
> >>想实现批量lookup查询返回一行数据 id value1 value2 value3
> >>
> >>
> >>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
> >>id key1 key2 key3
> >>先将多列转成多行
> >>id key1
> >>id key2
> >>id key3
> >>
> >>分别进行lookup join后得到
> >>id value1
> >>id value2
> >>id value3
> >>最后多行转多列返回一行数据
> >>
> >>id value1 value2 value3
> >>
> >>
> >>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?
> 

RE: flink1.15-flink1.18官方提供写入Elasticsearch的接口报序列化异常

2023-12-18 文章 Jiabao Sun
Hi,

createIndexRequest是否不是静态的,scala的话可以在object中声明该方法。
Lambda中访问非静态方法,并且外部类不是可序列化的,可能会导致lambda无法被序列化。

Best,
Jiabao


On 2023/12/12 07:53:53 李世钰 wrote:
> val result: ElasticsearchSink[String] = new Elasticsearch7SinkBuilder[String]
>   // This instructs the sink to emit after every element, otherwise they would
>   // be buffered
>   .setBulkFlushMaxActions(1)
>   .setHosts(new HttpHost("127.0.0.1", 9200, "http"))
>   .setEmitter(
> (element: String, context: SinkWriter.Context, indexer: RequestIndexer) 
> =
>   indexer.add(createIndexRequest(element)))
>   .build()
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: The elasticsearch emitter must be serializable. 
> 
> Caused by: java.lang.IllegalStateException: The elasticsearch emitter must be 
> serializable.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> at 
> org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase.setEmitter(ElasticsearchSinkBuilderBase.java:77)
> at 
> org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder.setEmitter(Elasticsearch7SinkBuilder.java:63)

Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 文章 Jiabao Sun
Hi,
生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。
另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。
Best,
Jiabao
--
From:jinzhuguang 
Send Time:2023年9月20日(星期三) 20:56
To:user-zh 
Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?


RE: 咨询求助: Least函数输入正常 但是返回值异常

2023-08-20 文章 Jiabao Sun
Hi,

方便提供一下复现的用例吗?

Best,
Jiabao


On 2023/08/21 02:19:53 guifeng huang wrote:
> (Flink1.15版本)
> 咨询求助: Least函数输入参数(Double类型)正常, 在Flink shell里测试函数无问题, 结果符合预期. 
> 但是实际生产流里进行使用的时候发现返回结果有异, 以下是3种case
> - 返回结果正确, 符合预期
> - 返回0, 不符合预期, 未知原因
> - 返回结果和理论正确值有微小的gap, 找了几个case都是1位数值里的差距.
> 看看有没有其他的老师遇到过同样的问题 

Re: Flink消费MySQL

2023-08-07 文章 Jiabao Sun
Hi,

可以尝试使用 flink-cdc-connectors 去实时关联。
使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
被关联的表变化不大的话可以考虑 lookup join。

Best,
Jiabao


> 2023年8月8日 上午11:10,小昌同学  写道:
> 
> 谢谢老师指导呀;
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> 老师这一块有更好的建议嘛
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 发送日期 | 2023年8月8日 10:37 |
> | 收件人 |  |
> | 主题 | Re: Flink消费MySQL |
> Hi,
> 
> 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> 
> 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> 
> Best,
> Shammon FY
> 
> On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> 
> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
> 
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
> 
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
> 
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
> 
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
> 
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
> 
> @Override
> public void cancel() {
> }
> };
> 
> 
> |
> 
> 
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |



Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-08-02 文章 Jiabao Sun
你好,

不需要将所有的依赖都改为snapshot,仅需要将我们项目内的版本加上 snapshot 后缀。
可以在项目中统一替换版本号 1.x.x -> 1.x.x-SNAPSHOT,或者使用 mvn versions:set 
-DnewVersion=1.x.x-SNAPSHOT 设置。


> 2023年8月2日 下午2:25,jinzhuguang  写道:
> 
> 非常感谢你的提醒,我现在用maven工具修改了所有的版本号为snapshot,但是flink-connectors(connectors的父模块)也变成snapshot,打包的时候仓库里找不到他了,而且也没法想flink-runtime这些包手动改下版本好,这种该怎么办
> 
>> 2023年7月27日 11:05,Jiabao Sun  写道:
>> 
>> 你好,
>> 
>> 通常在 pom 中引入 maven-deploy-plugin,并且通过  声明私服地址,使用 mvn 
>> clean deploy 命令部署到nexus私服。
>> 部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 
>> versions-maven-plugin 统一设置。
>> 
>> 
>>   
>>   
>>   
>>  org.apache.maven.plugins
>>  maven-deploy-plugin
>>  2.8.2
>>  
>>${maven.deploy.skip}
>>  
>>
>>   
>>   
>> 
>>   
>>   
>>   private-snapshots
>>   
>> https://xxx.xxx.xxx/nexus/content/repositories/snapshots/
>>   
>>   
>>   private-releases
>>   https://xxx.xxx.xxx/nexus/content/repositories/releases/
>>   
>>   
>> 
>> 
>> 
>>> 2023年7月27日 上午10:48,jinzhuguang  写道:
>>> 
>>> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?



Re: 如何把自己新增的定制化connector deploy snapshot版本到私服仓库

2023-07-26 文章 Jiabao Sun
你好,

通常在 pom 中引入 maven-deploy-plugin,并且通过  声明私服地址,使用 mvn 
clean deploy 命令部署到nexus私服。
部署到 SNAPSHOT 仓库需要项目版本号包含 -SNAPSHOT 后缀,可以在IDE中全局替换,也可以使用 versions-maven-plugin 
统一设置。





org.apache.maven.plugins
maven-deploy-plugin
2.8.2

  ${maven.deploy.skip}

  





private-snapshots
https://xxx.xxx.xxx/nexus/content/repositories/snapshots/


private-releases
https://xxx.xxx.xxx/nexus/content/repositories/releases/





> 2023年7月27日 上午10:48,jinzhuguang  写道:
> 
> 我是基于flink 1.16.0开发的,由于版本号没有snapshot,现在又无法发布release版本的,我该怎么办?



RE: flink如何正确使用mybatis

2023-07-26 文章 Jiabao Sun
SqlSession 需要关闭,建议使用 SqlSessionManager,可以不用手动关闭 SqlSession。


On 2023/07/18 02:13:16 lxk wrote:
> 在flink内需要使用mybatis做些简化查询的工作,目前我的使用方式如下
> 
> public class MybatisUtil {
> 
> private static final Logger LOGGER = 
> LogFactory.createNewLogger("MybatisUtil");
> private static ThreadLocal tl = new ThreadLocal();
> private static SqlSessionFactory factory = null;
> //private static  SqlSession sqlSession = null;
> static {
> // 1 读取配置文件 config.xml
> InputStream in = null;
> try {
> in = Resources.getResourceAsStream("batis.xml");
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> throw new RuntimeException(e);
> }
> // 2 创建SqlSessionFactory
> factory = new SqlSessionFactoryBuilder().build(in);
> }
> 
> 
> 
> public static SqlSession getSqlSession(){
> SqlSession sqlSession = tl.get();
> if(sqlSession == null){
> sqlSession = factory.openSession();
> tl.set(sqlSession);
> LOGGER.info("sqlSession创建成功,连接为:{},时间为:{}", sqlSession,LocalTimeUtil.now());
> }
> return sqlSession;
> }
> 
> 
> }
> 以上是工具类
> 我在open方法中获取sqlsession,然后在invoke方法中使用mapper
> public void open(Configuration parameters) throws Exception {
> sqlSession = MybatisUtil.getSqlSession();
> }
> 
> public List map(HeaderFullWithPreOrder headerFullWithPreOrder) 
> throws Exception {
> SelectAddCartMapper mapper = sqlSession.getMapper(SelectAddCartMapper.class);
> ...其他省略
> }
> 
> 想问下这种方式使用是否正确。以及sqlsession是否需要关闭,看见相关帖子有说如果sqlsession不关闭的话会把连接打满
>