Re: Re: Flink CDC MySqlSplitReader问题
Hi, 我记得这段逻辑是为了保证在新增表后,binlog 读取能和新增表的快照读取一起进行,保证binlog读取不会中断。 这里应该是会先读binlog,然后再读snapshot,再是binlog。这样的切换,来保证binlog 能一直有数据读出来。 Best, Hang casel.chen 于2023年12月22日周五 10:44写道: > 那意思是会优先处理已经在增量阶段的表,再处理新增快照阶段的表?顺序反过来的话会有什么影响?如果新增表全量数据比较多会导致其他表增量处理变慢是么? > > > > > > > > > > > > > > > > > > 在 2023-12-20 21:40:05,"Hang Ruan" 写道: > >Hi,casel > > > >这段逻辑应该只有在处理到新增表的时候才会用到。 > >CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。 > >但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。 > > > >Best, > >Hang > > > > > >key lou 于2023年12月20日周三 16:24写道: > > > >> 意思是当 有 binlog 就意味着 已经读完了 snapshot > >> > >> casel.chen 于2023年12月19日周二 16:45写道: > >> > >> > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢! > >> > > >> > > >> > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then > read > >> > snapshot split”这一句话我不理解。 > >> > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot > >> > split再读增量的binlog split么? > >> > > >> > > >> > private MySqlRecords pollSplitRecords() throws InterruptedException { > >> > Iterator dataIt; > >> > if (currentReader == null) { > >> > // (1) Reads binlog split firstly and then read snapshot > >> split > >> > if (binlogSplits.size() > 0) { > >> > // the binlog split may come from: > >> > // (a) the initial binlog split > >> > // (b) added back binlog-split in newly added table > >> process > >> > MySqlSplit nextSplit = binlogSplits.poll(); > >> > currentSplitId = nextSplit.splitId(); > >> > currentReader = getBinlogSplitReader(); > >> > currentReader.submitSplit(nextSplit); > >> > } else if (snapshotSplits.size() > 0) { > >> > MySqlSplit nextSplit = snapshotSplits.poll(); > >> > currentSplitId = nextSplit.splitId(); > >> > currentReader = getSnapshotSplitReader(); > >> > currentReader.submitSplit(nextSplit); > >> > } else { > >> > LOG.info("No available split to read."); > >> > } > >> > dataIt = currentReader.pollSplitRecords(); > >> > return dataIt == null ? finishedSplit() : > forRecords(dataIt); > >> > } else if (currentReader instanceof SnapshotSplitReader) { > >> > > >> > } > >> > ... > >> > } > >> >
Re: Flink CDC MySqlSplitReader问题
Hi,casel 这段逻辑应该只有在处理到新增表的时候才会用到。 CDC 读取数据正常是会在所有SnapshotSplit读取完成后,才会下发BinlogSplit。 但是如果是在增量阶段重启作业,同时新增加了一些表,就会出现同时有BinlogSplit和SnapshotSplit的情况,此时才会走到这段逻辑。 Best, Hang key lou 于2023年12月20日周三 16:24写道: > 意思是当 有 binlog 就意味着 已经读完了 snapshot > > casel.chen 于2023年12月19日周二 16:45写道: > > > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢! > > > > > > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read > > snapshot split”这一句话我不理解。 > > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot > > split再读增量的binlog split么? > > > > > > private MySqlRecords pollSplitRecords() throws InterruptedException { > > Iterator dataIt; > > if (currentReader == null) { > > // (1) Reads binlog split firstly and then read snapshot > split > > if (binlogSplits.size() > 0) { > > // the binlog split may come from: > > // (a) the initial binlog split > > // (b) added back binlog-split in newly added table > process > > MySqlSplit nextSplit = binlogSplits.poll(); > > currentSplitId = nextSplit.splitId(); > > currentReader = getBinlogSplitReader(); > > currentReader.submitSplit(nextSplit); > > } else if (snapshotSplits.size() > 0) { > > MySqlSplit nextSplit = snapshotSplits.poll(); > > currentSplitId = nextSplit.splitId(); > > currentReader = getSnapshotSplitReader(); > > currentReader.submitSplit(nextSplit); > > } else { > > LOG.info("No available split to read."); > > } > > dataIt = currentReader.pollSplitRecords(); > > return dataIt == null ? finishedSplit() : forRecords(dataIt); > > } else if (currentReader instanceof SnapshotSplitReader) { > > > > } > > ... > > } >
Re: Flink CDC MySqlSplitReader问题
意思是当 有 binlog 就意味着 已经读完了 snapshot casel.chen 于2023年12月19日周二 16:45写道: > 我在阅读flink-connector-mysql-cdc项目源码过程中遇到一个不清楚的地方,还请大佬指点,谢谢! > > > MySqlSplitReader类有一段代码如下,注释“(1) Reads binlog split firstly and then read > snapshot split”这一句话我不理解。 > 为什么要先读binlog split再读snapshot split?为保证记录的时序性,不是应该先读全量的snapshot > split再读增量的binlog split么? > > > private MySqlRecords pollSplitRecords() throws InterruptedException { > Iterator dataIt; > if (currentReader == null) { > // (1) Reads binlog split firstly and then read snapshot split > if (binlogSplits.size() > 0) { > // the binlog split may come from: > // (a) the initial binlog split > // (b) added back binlog-split in newly added table process > MySqlSplit nextSplit = binlogSplits.poll(); > currentSplitId = nextSplit.splitId(); > currentReader = getBinlogSplitReader(); > currentReader.submitSplit(nextSplit); > } else if (snapshotSplits.size() > 0) { > MySqlSplit nextSplit = snapshotSplits.poll(); > currentSplitId = nextSplit.splitId(); > currentReader = getSnapshotSplitReader(); > currentReader.submitSplit(nextSplit); > } else { > LOG.info("No available split to read."); > } > dataIt = currentReader.pollSplitRecords(); > return dataIt == null ? finishedSplit() : forRecords(dataIt); > } else if (currentReader instanceof SnapshotSplitReader) { > > } > ... > }