[
https://issues.apache.org/jira/browse/FLINK-38568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu resolved FLINK-38568.
--------------------------------
Resolution: Implemented
Implemented via master: 7a6bfd85df5fed9b2c0e0dda8e8492a682c429f6
> Performance bottleneck in BinlogSplitReader with large number of snapshot
> splits
> --------------------------------------------------------------------------------
>
> Key: FLINK-38568
> URL: https://issues.apache.org/jira/browse/FLINK-38568
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: yuanfenghu
> Assignee: yuanfenghu
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.6.0
>
>
> h2. Background
> When using MySQL CDC connector with large tables split into thousands of
> chunks (e.g., 10,000+), the BinlogSplitReader.shouldEmit() method causes
> severe performance degradation.Performance Impact (observed in production):
> * CPU usage: 25.12% spent in splitKeyRangeContains()
> * 38,403 comparisons per binlog record
> * Algorithm: O ( n ) linear search through all finished snapshot splits
> Root Causes:
> * Linear search: For each binlog record, the code iterates through all
> finished splits to find which split contains the record
> * Unsorted list: The finished splits list is not sorted, preventing
> optimization
> h2. Solution
> 1. Sort splits by boundary in BinlogSplitReader.configureFilter():
> {code:java}
> splitsInfoMap.values().forEach(RecordUtils::sortFinishedSplitInfos); {code}
> 2. Replace linear search with binary search in BinlogSplitReader.shouldEmit():
> {code:java}
> for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
> if (RecordUtils.splitKeyRangeContains(...)) { return true; }}
>
> FinishedSnapshotSplitInfo matchedSplit =
> RecordUtils.findSplitByKeyBinary(finishedSplitsInfo.get(tableId),
> chunkKey);return matchedSplit != null &&
> position.isAfter(matchedSplit.getHighWatermark());{code}
> h2. Performance Improvement
> ||Metric||Before||After||Improvement||
> |Algorithm|O ( n )|O(log n)|-|
> |Comparisons|38,403|~16|2,400x|
> |CPU usage|25.12%|<0.01%|2,500x|
> |Time|34ms|<0.015ms|2,200x|
>
> {code:java}
> //代码占位符
> `---ts=2025-10-24 18:09:07.334;thread_name=Source Data Fetcher for Source:
> MySQL Source -> Parse -> Side Output -> Case-insensitive Convert
> (1/4)#0;id=111;is_daemon=false;priority=5;TCCL=org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader@19a11164
> `---[136.386382ms]
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:shouldEmit()
> +---[0.00% 0.002533ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:isDataChangeRecord()
> #248
> +---[0.00% 0.002138ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getTableId()
> #249
> +---[0.01% 0.010575ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getBinlogPosition()
> #253
> +---[0.04% 0.04841ms ]
> org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader:hasEnterPureBinlogPhase()
> #254
> +---[0.00% 6.7E-4ms ]
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getDatabaseSchema()
> #262
> +---[0.00% 0.006651ms ]
> io.debezium.connector.mysql.MySqlDatabaseSchema:tableFor() #262
> +---[0.00% 7.69E-4ms ]
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
> #263
> +---[0.00% 5.72E-4ms ]
> org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:getChunkKeyColumns()
> #263
> +---[0.00% 6.17E-4ms ]
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSourceConfig()
> #264
> +---[0.00% 4.89E-4ms ]
> org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig:isTreatTinyInt1AsBoolean()
> #264
> +---[0.02% 0.02135ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils:getChunkKeyColumnType()
> #261
> +---[0.00% 0.002676ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getStructContainsChunkKey()
> #266
> +---[0.00% 6.1E-4ms ]
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext:getSchemaNameAdjuster()
> #269
> +---[0.00% 0.003123ms ]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:getSplitKey()
> #268
> +---[13.63%
> min=4.54E-4ms,max=0.007806ms,total=18.587447ms,count=38403]
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitStart()
> #272
> +---[13.80%
> min=4.55E-4ms,max=0.095948ms,total=18.823901ms,count=38403]
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getSplitEnd()
> #272
> +---[25.12%
> min=7.74E-4ms,max=0.038913ms,total=34.265124ms,count=38403]
> org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils:splitKeyRangeContains()
> #271
> +---[0.00% 9.15E-4ms ]
> org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo:getHighWatermark()
> #273
> `---[0.03% 0.042434ms ]
> org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset:isAfter()
> #273 {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)