Re:使用mysql-cdc 的scan.startup.mode = specific-offset的读取模式,运行一段时间后,报错
我也遇到了这种情况,可能是你们的db做了主从切换。 因为binlog每台服务器的pos都不一样。 mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。 我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。 在 2021-06-07 16:00:21,"张海深" <18601255...@163.com> 写道: >Hi all: >最近用mysql-cdc的方式,使用Flink-sql整合数据,table的部分配置如下 >'debezium.min.row.count.to.stream.results'='1000', >'scan.startup.mode'='specific-offset', >'scan.startup.specific-offset.file'='mybinlog.29', >'scan.startup.specific-offset.pos'='542607677' >在上述稳定运行接分钟之后,异常抛出了以下错误: >org.apache.kafka.connect.errors.ConnectException: The connector is trying to >read binlog starting at GTIDs >36b8bd0f-a435-11eb-9962-b4055d9ecb74:21820662-21926801,d713fc3c-afc6-11eb-a7b8-b4055d9ec5e6:185910-192349 > and binlog file 'mybinlog.29', pos=808323397, skipping 6 events plus 1 >rows, but this is no longer available on the server. Reconfigure the connector >to use a snapshot when needed. >at >io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133) >at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:106) >at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758) >at >io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171) >at >java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) >at >java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) >at java.lang.Thread.run(Thread.java:834) >请问是什么情况下导致这个问题。
mysql主从切换导致通过flink mysql-cdc消费binlog 点位出错
由于各种原因,dba进行了数据库主从切换。 目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。 flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印 org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 and binlog file 'mysql-bin.000650', pos=521310219, skipping 2 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed. 由于pos=521310219在新的数据库服务器上位置不对,flink最后一次自动保存的checkpoint已经存储了pos=521310219,导致通过flink -s 的方式无法接着继续消费,并且job无法成功启动。 不知道大家有什么好的办法解决这个问题?
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗
flink sql cdc 采集mysql binlog 可以保留before,after的字段吗? 按照官方的例子,定义表结构后,是最新的字段值? 能否同时保留before和after?
Re:Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复
稳定复现 checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。 我们jobmanager没有做ha,不知道是否是这个原因导致的? 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。 >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: execution.savepoint.path, >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 在 2021-05-28 18:15:38,"刘建刚" 写道: >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗? >1、从savepoint恢复; >2、作业开始定期做savepoint; >3、作业failover。 >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。 >如果还是有问题,需要通过日志来排查了。 > >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道: > >> 我遇到的问题现象是这样的 >> >> >> >> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 >> >> >> >> >> flink run -d -s >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 >> -t yarn-per-job -m yarn-cluser -D yarn.application.name= >> /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod >> >> >> >> >> 2、flink-conf.xml >> >> >> >> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default >> >> >> >> >> 3、代码checkpoint设置 >> >> >> >> >>StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, >> 10)); >> >> >> >> >>CheckpointConfig checkpointConfig = env.getCheckpointConfig(); >> >> >> >> >> >> >> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); >> >> >> >> >>env.enableCheckpointing(1 * 60 * 1000); >> >> >> >> >> >> checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); >> >> >> >> >>checkpointConfig.setTolerableCheckpointFailureNumber(100); >> >> >> >> >>checkpointConfig.setCheckpointTimeout(60 * 1000); >> >> >> >> >>checkpointConfig.setMaxConcurrentCheckpoints(1); >> >> >> >> >> 4、问题现象 >> >> >> >> >> a)运维同事切换yarn >> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 >> >> >> >> >> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 >> >> >> >> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 >> restore,从日志中看还是从chk-100 restore的。 >> >> >> >> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction >> sourceMilApplysLogStream = MySQLSource.builder() >> >> >> >> >> 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 >> >> >> >> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? >> >> >> >> >> 2021-05-24 16:49:50,398 INFO >> org.apache.flink.configuration.GlobalConfiguration [] - Loading >> configuration property: execution.savepoint.path, >> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 >> >> >> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 >> >> >> >> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复
我遇到的问题现象是这样的 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。 flink run -d -s hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 -t yarn-per-job -m yarn-cluser -D yarn.application.name= /tmp/flink-1.0-SNAPSHOT.jar -c com.test.myStream --profile prod 2、flink-conf.xml state.checkpoints.dir: hdfs:///user/flink/checkpoints/default 3、代码checkpoint设置 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 10)); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.enableCheckpointing(1 * 60 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); checkpointConfig.setTolerableCheckpointFailureNumber(100); checkpointConfig.setCheckpointTimeout(60 * 1000); checkpointConfig.setMaxConcurrentCheckpoints(1); 4、问题现象 a)运维同事切换yarn resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器 b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200 c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200 restore,从日志中看还是从chk-100 restore的。 d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction sourceMilApplysLogStream = MySQLSource.builder() 重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费 e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗? 2021-05-24 16:49:50,398 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.savepoint.path, hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
flink mysql cdc支持mysql的json格式吗?
flink mysql cdc支持mysql的json格式吗?
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?
./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint?
如题 ./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint? 还是只能把sql放在java里边,通过tableEnvironment.sqlQuery执行?
flink cdc 消费mysql binlog 每次都是从头开始消费问题
大家好,我最近采用了flink cdc 对接mysql binlog ,每次重启或者停止job后,都是从表的第一条数据开始消费。 有做checkpoint和持久化,并且日志提示checkpoint成功,不知道为何重启应用始终是从头开始消费? 按照官方定义 .startupOptions(StartupOptions.initial()) 应该是历史+增量才对 flink 版本:1.12.2 flink cdc 版本:flink-sql-connector-mysql-cdc-1.4-SNAPSHOT.jar 相关核心代码: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(config.getProperty("stateBackend.path")).getCheckpointBackend()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.enableCheckpointing(10 * 1000); checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); DebeziumSourceFunction sourceMilApplysLogStream = MySQLSource.builder() .hostname(config.getProperty("datasource.db")) .port(Integer.parseInt(config.getProperty("datasource.port"))) .username(config.getProperty("datasource.username")) .password(config.getProperty("datasource.password")) .databaseList(config.getProperty("datasource.databaseList")) .tableList(config.getProperty("datasource.tableList")) .deserializer(new DebeziumDeserialization()) .serverId(Integer.parseInt(config.getProperty("datasource.server-id"))) .startupOptions(StartupOptions.initial()) .build();
Re:回复:flink sql join 内存占用以及数据延迟问题咨询
感谢sllence大佬的耐心解答,还想继续请教一下: 1、假如是设置了持久化的状态后端,不知道是以什么样的格式来存储state的?是每个流单独一个state(原始数据),还是join后的结果进行state? 2、cdc 默认采用了regular join,全量数据都在内存中,所以数据量大的业务会对集群造成较大负担。在实际生产环境中,假如这种类型的任务非常多,集群资源是不是很快就会被耗尽了?不知道 是否可以认为是cdc的一个问题? 3、我的实际生产中有etl的join需求,这些etl涉及多张表的写入,但是无法确认在join的流中的延迟和乱序时间,所以是不是除了regular join就没有更好的选择了? 感谢! 在 2021-04-12 15:07:05,"sllence" 写道: 1、regular join(就是普通的join),无窗口的限制,会关联全部的历史数据 2、状态存在状态后端(state backend),内存是一种默认的状态后端,可以选择filesystem或rockdb之类的状态后端进行大状态以及持久化的状态存储。regular join默认会存储全量的状态进行join,可以设置ttl过期掉不用的状态,但可能会照成已过期的状态关联不上导致结果异常 3、cdc目前应该还不支持事件时间以及水印的定义,也就是还不支持窗口join,可以参考issue https://issues.apache.org/jira/browse/FLINK-20281 4、每种join的语法不一样,用哪种join就看sql怎么定义,但cdc目前只支持regular join 5、假如用了持久化的状态后端可以在job重新启动时指定路径进行恢复 在2021年04月12日 14:38,董建<62...@163.com> 写道: 最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子 https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B 感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下: flink connector cdc 直接对接订单表,物流表,商品表表的binlog 1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在? 2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈? 3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了) 4、flink sql join的时候对应的哪种join是否可以指定? 5、假如job失败了重启,这些join后的数据有state吗? 感谢!
flink sql join 内存占用以及数据延迟问题咨询
最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子 https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B 感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下: flink connector cdc 直接对接订单表,物流表,商品表表的binlog 1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在? 2、假如是全量join , 这些数据是全部保存在内存中吗?如果业务表的数据很大或者每天的增量很大,flink使用这种方式,内存是否有瓶颈? 3、如果是具有窗口属性的join,假如流1join流2,如果流2延迟了,是否有可能导致join数据不正确(流2的数据由于延迟被丢下了) 4、flink sql join的时候对应的哪种join是否可以指定? 5、假如job失败了重启,这些join后的数据有state吗? 感谢!