leonardBang commented on code in PR #4166:
URL: https://github.com/apache/flink-cdc/pull/4166#discussion_r2513619463


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java:
##########
@@ -454,39 +453,153 @@ public static BinlogOffset getBinlogPosition(Map<String, 
?> offset) {
     /** Returns the specific key contains in the split key range or not. */
     public static boolean splitKeyRangeContains(
             Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
-        // for all range
-        if (splitKeyStart == null && splitKeyEnd == null) {
-            return true;
+        return compareKeyWithRange(key, splitKeyStart, splitKeyEnd) == 0;
+    }
+
+    /**
+     * Sorts the list of FinishedSnapshotSplitInfo by splitStart in ascending 
order. This is
+     * required for binary search to work correctly.
+     *
+     * <p>Handles special cases: - Splits with null splitStart are considered 
as MIN value (sorted
+     * to front) - Splits with null splitEnd are considered as MAX value 
(sorted to back)
+     *
+     * <p>NOTE: Current implementation assumes single-field split keys (as 
indicated by
+     * getSplitKey()). If multi-field split keys are supported in the future, 
the comparison logic
+     * should be reviewed to ensure consistency with {@link
+     * #splitKeyRangeContains(Object[],Object[],Object[])}.
+     *
+     * @param splits List of splits to be sorted (sorted in-place)
+     */
+    public static void sortFinishedSplitInfos(List<FinishedSnapshotSplitInfo> 
splits) {
+        if (splits == null || splits.size() <= 1) {
+            return;
         }
-        // first split
-        if (splitKeyStart == null) {
-            int[] upperBoundRes = new int[key.length];
-            for (int i = 0; i < key.length; i++) {
-                upperBoundRes[i] = compareObjects(key[i], splitKeyEnd[i]);
+
+        splits.sort(
+                (leftSplit, rightSplit) -> {
+                    Object[] leftSplitStart = leftSplit.getSplitStart();
+                    Object[] rightSplitStart = rightSplit.getSplitStart();
+
+                    // Splits with null splitStart should come first (they are 
the first split)
+                    if (leftSplitStart == null && rightSplitStart == null) {
+                        return 0;
+                    }
+                    if (leftSplitStart == null) {
+                        return -1;
+                    }
+                    if (rightSplitStart == null) {
+                        return 1;
+                    }
+
+                    // Compare split starts
+                    return compareSplit(leftSplitStart, rightSplitStart);
+                });
+    }
+
+    /**
+     * Uses binary search to find the split containing the specified key in a 
sorted split list.
+     *
+     * <p>IMPORTANT: The splits list MUST be sorted by splitStart before 
calling this method. Use
+     * sortFinishedSplitInfos() to sort the list if needed.
+     *
+     * @param sortedSplits List of splits sorted by splitStart (MUST be 
sorted!)
+     * @param key The chunk key to search for
+     * @return The split containing the key, or null if not found
+     */
+    public static FinishedSnapshotSplitInfo findSplitByKeyBinary(
+            List<FinishedSnapshotSplitInfo> sortedSplits, Object[] key) {
+
+        if (sortedSplits == null || sortedSplits.isEmpty()) {
+            return null;
+        }
+

Review Comment:
   Current PR is already in a good shape, I only have two possible suggestions:
   
   (1) Considering that most tables use auto-increment primary keys and that 
INSERT operations outnumber UPDATE operations in the binlog, the majority of 
events in the binlog will typically correspond to either the last split or the 
first split. Leveraging this data locality, we can further optimize by first 
checking whether the changelog matches the first or last split and then 
performing binary search on the remaining splits. 
   
   (2) We can also do same optimization for IncrementalSourceStreamFetcher 
which is used by other cdc connectors 
   
   WDYT? @huyuanfeng2018 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to