Re:使用mysql-cdc 的scan.startup.mode = specific-offset的读取模式,运行一段时间后,报错

2021-06-07 Thread



我也遇到了这种情况,可能是你们的db做了主从切换。
因为binlog每台服务器的pos都不一样。
mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。
我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。














在 2021-06-07 16:00:21,"张海深" <18601255...@163.com> 写道:
>Hi all:
>最近用mysql-cdc的方式,使用Flink-sql整合数据,table的部分配置如下
>'debezium.min.row.count.to.stream.results'='1000',
>'scan.startup.mode'='specific-offset',
>'scan.startup.specific-offset.file'='mybinlog.29',
>'scan.startup.specific-offset.pos'='542607677'
>在上述稳定运行接分钟之后,异常抛出了以下错误:
>org.apache.kafka.connect.errors.ConnectException: The connector is trying to 
>read binlog starting at GTIDs 
>36b8bd0f-a435-11eb-9962-b4055d9ecb74:21820662-21926801,d713fc3c-afc6-11eb-a7b8-b4055d9ec5e6:185910-192349
> and binlog file 'mybinlog.29', pos=808323397, skipping 6 events plus 1 
>rows, but this is no longer available on the server. Reconfigure the connector 
>to use a snapshot when needed.
>at 
>io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)
>at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106)
>at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
>at 
>io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
>at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
>at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
>at java.lang.Thread.run(Thread.java:834)
>请问是什么情况下导致这个问题。


mysql主从切换导致通过flink mysql-cdc消费binlog 点位出错

2021-06-03 Thread
由于各种原因,dba进行了数据库主从切换。
目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。
flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印
org.apache.kafka.connect.errors.ConnectException: The connector is trying to 
read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 
and binlog file 'mysql-bin.000650', pos=521310219, skipping 2 events plus 1 
rows, but this is no longer available on the server. Reconfigure the connector 
to use a snapshot when needed.


由于pos=521310219在新的数据库服务器上位置不对,flink最后一次自动保存的checkpoint已经存储了pos=521310219,导致通过flink
 -s  的方式无法接着继续消费,并且job无法成功启动。
不知道大家有什么好的办法解决这个问题?





flink sql cdc 采集mysql binlog 可以保留before,after的字段吗

2021-05-31 Thread
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗?
按照官方的例子,定义表结构后,是最新的字段值?
能否同时保留before和after?

Re:Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread
稳定复现
checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
我们jobmanager没有做ha,不知道是否是这个原因导致的?
日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100














在 2021-05-28 18:15:38,"刘建刚"  写道:
>这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
>1、从savepoint恢复;
>2、作业开始定期做savepoint;
>3、作业failover。
>如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
>如果还是有问题,需要通过日志来排查了。
>
>董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
>
>> 我遇到的问题现象是这样的
>>
>>
>>
>>
>> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>>
>>
>>
>>
>> flink run -d -s
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
>> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>>
>>
>>
>>
>> 2、flink-conf.xml
>>
>>
>>
>>
>> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>>
>>
>>
>>
>> 3、代码checkpoint设置
>>
>>
>>
>>
>>StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>
>>
>>
>>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
>> 10));
>>
>>
>>
>>
>>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>>
>>
>>
>>
>>
>>  
>> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>>env.enableCheckpointing(1 * 60 * 1000);
>>
>>
>>
>>
>>
>>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>>
>>
>>
>>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>>
>>
>>
>>
>>checkpointConfig.setCheckpointTimeout(60 * 1000);
>>
>>
>>
>>
>>checkpointConfig.setMaxConcurrentCheckpoints(1);
>>
>>
>>
>>
>> 4、问题现象
>>
>>
>>
>>
>> a)运维同事切换yarn
>> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>>
>>
>>
>>
>>
>> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>>
>>
>>
>>
>> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
>> restore,从日志中看还是从chk-100 restore的。
>>
>>
>>
>>
>> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
>> sourceMilApplysLogStream = MySQLSource.builder()
>>
>>
>>
>>
>>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>>
>>
>>
>>
>> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>>
>>
>>
>>
>> 2021-05-24 16:49:50,398 INFO
>> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
>> configuration property: execution.savepoint.path,
>> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>>
>>
>>
>> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>>
>>
>>
>>
>> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 Thread
我遇到的问题现象是这样的




1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。




flink run -d -s 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 
-t yarn-per-job -m yarn-cluser -D yarn.application.name= 
/tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod




2、flink-conf.xml




state.checkpoints.dir: hdfs:///user/flink/checkpoints/default




3、代码checkpoint设置




   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();




   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10));




   CheckpointConfig checkpointConfig = env.getCheckpointConfig();




   
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




   env.enableCheckpointing(1 * 60 * 1000);




   checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);




   checkpointConfig.setTolerableCheckpointFailureNumber(100);




   checkpointConfig.setCheckpointTimeout(60 * 1000);




   checkpointConfig.setMaxConcurrentCheckpoints(1);




4、问题现象




a)运维同事切换yarn 
resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器




  b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200




c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 
restore的。




d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction 
sourceMilApplysLogStream = MySQLSource.builder()




  重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费




e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?




2021-05-24 16:49:50,398 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: execution.savepoint.path, 
hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100



预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费




现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。

flink mysql cdc支持mysql的json格式吗?

2021-05-18 Thread
flink mysql cdc支持mysql的json格式吗?

集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?

2021-05-17 Thread
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?



./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint?

2021-05-11 Thread
如题
./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint?
还是只能把sql放在java里边,通过tableEnvironment.sqlQuery执行?

flink cdc 消费mysql binlog 每次都是从头开始消费问题

2021-04-23 Thread
大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。
有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费?
按照官方定义
.startupOptions(StartupOptions.initial()) 应该是历史+增量才对
flink 版本:1.12.2
flink cdc  版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar
相关核心代码:
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new 
RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend());
CheckpointConfig checkpointConfig = env.getCheckpointConfig();

checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.enableCheckpointing(10 * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


DebeziumSourceFunction sourceMilApplysLogStream = 
MySQLSource.builder()
.hostname(config.getProperty("datasource.db"))
.port(Integer.parseInt(config.getProperty("datasource.port")))
.username(config.getProperty("datasource.username"))
.password(config.getProperty("datasource.password"))
.databaseList(config.getProperty("datasource.databaseList"))
.tableList(config.getProperty("datasource.tableList"))
.deserializer(new DebeziumDeserialization())

.serverId(Integer.parseInt(config.getProperty("datasource.server-id")))
.startupOptions(StartupOptions.initial())
.build();





Re:回复:flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 Thread
感谢sllence大佬的耐心解答,还想继续请教一下:
1、假如是设置了持久化的状态后端,不知道是以什么样的格式来存储state的?是每个流单独一个state(原始数据),还是join后的结果进行state?
2、cdc 默认采用了regular 
join,全量数据都在内存中,所以数据量大的业务会对集群造成较大负担。在实际生产环境中,假如这种类型的任务非常多,集群资源是不是很快就会被耗尽了?不知道 
是否可以认为是cdc的一个问题?
3、我的实际生产中有etl的join需求,这些etl涉及多张表的写入,但是无法确认在join的流中的延迟和乱序时间,所以是不是除了regular 
join就没有更好的选择了?


感谢!


















在 2021-04-12 15:07:05,"sllence"  写道:

1、regular join(就是普通的join),无窗口的限制,会关联全部的历史数据
2、状态存在状态后端(state 
backend),内存是一种默认的状态后端,可以选择filesystem或rockdb之类的状态后端进行大状态以及持久化的状态存储。regular 
join默认会存储全量的状态进行join,可以设置ttl过期掉不用的状态,但可能会照成已过期的状态关联不上导致结果异常
3、cdc目前应该还不支持事件时间以及水印的定义,也就是还不支持窗口join,可以参考issue 
https://issues.apache.org/jira/browse/FLINK-20281
4、每种join的语法不一样,用哪种join就看sql怎么定义,但cdc目前只支持regular join
5、假如用了持久化的状态后端可以在job重新启动时指定路径进行恢复


在2021年04月12日 14:38,董建<62...@163.com> 写道:
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子
https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B
感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下:
flink  connector cdc 直接对接订单表,物流表,商品表表的binlog
1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在?
2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈?
3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了)
4、flink sql join的时候对应的哪种join是否可以指定?
5、假如job失败了重启,这些join后的数据有state吗?
感谢!

flink sql join 内存占用以及数据延迟问题咨询

2021-04-12 Thread
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子
https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B
感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下:
flink  connector cdc 直接对接订单表,物流表,商品表表的binlog
1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在?
2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈?
3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了)
4、flink sql join的时候对应的哪种join是否可以指定?
5、假如job失败了重启,这些join后的数据有state吗?
感谢!