This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 23e23d652 [FLINK-39372][connect/mysql] Fix comparison logic for binlog
filename with various digits (#4358)
23e23d652 is described below
commit 23e23d6525394f8efff0761c8ba82e031425ce5f
Author: yuxiqian <[email protected]>
AuthorDate: Wed Apr 1 12:37:17 2026 +0800
[FLINK-39372][connect/mysql] Fix comparison logic for binlog filename with
various digits (#4358)
---
.../mysql/source/offset/BinlogOffset.java | 3 +
.../mysql/source/offset/BinlogOffsetTest.java | 85 ++++++++++++++--------
2 files changed, 58 insertions(+), 30 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
index 67ffc3b82..a7f86b62e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffset.java
@@ -245,6 +245,9 @@ public class BinlogOffset implements
Comparable<BinlogOffset>, Serializable {
if (this.getFilename() != null
&& that.getFilename() != null
&& this.getFilename().compareToIgnoreCase(that.getFilename())
!= 0) {
+ if (this.getFilename().length() != that.getFilename().length()) {
+ return Integer.compare(this.getFilename().length(),
that.getFilename().length());
+ }
return this.getFilename().compareToIgnoreCase(that.getFilename());
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
index 58f34c16a..5767a6562 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetTest.java
@@ -35,7 +35,7 @@ public class BinlogOffsetTest {
BinlogOffset.builder()
.setGtidSet(String.join(",", PART_OF_GTID_SET_2,
PART_OF_GTID_SET_1))
.build();
- assetCompareTo(offset1, offset2, 0);
+ assertCompareTo(offset1, offset2, 0);
// The test uses GTID instead of position for comparison.
offset1 =
@@ -48,7 +48,7 @@ public class BinlogOffsetTest {
.setGtidSet(String.join(",", PART_OF_GTID_SET_2,
PART_OF_GTID_SET_1))
.setBinlogFilePosition("binlog.001", 456)
.build();
- assetCompareTo(offset1, offset2, 0);
+ assertCompareTo(offset1, offset2, 0);
// Test different GTID sets where one contains another
BinlogOffset offset3 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
@@ -58,8 +58,8 @@ public class BinlogOffsetTest {
.build();
// offset3 should be before offset4
- assetCompareTo(offset3, offset4, -1);
- assetCompareTo(offset4, offset3, 1);
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
// The test uses GTID instead of position for comparison.
offset3 =
@@ -72,16 +72,16 @@ public class BinlogOffsetTest {
.setGtidSet("abcd:1-5") // Contains offset3's GTID set
.setBinlogFilePosition("binlog.001", 23)
.build();
- assetCompareTo(offset3, offset4, -1);
- assetCompareTo(offset4, offset3, 1);
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
// Test completely different GTID sets
BinlogOffset offset5 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_1).build();
BinlogOffset offset6 =
BinlogOffset.builder().setGtidSet(PART_OF_GTID_SET_2).build();
// offsets don't contain each other, result is always 1
- assetCompareTo(offset5, offset6, 1);
- assetCompareTo(offset6, offset5, 1);
+ assertCompareTo(offset5, offset6, 1);
+ assertCompareTo(offset6, offset5, 1);
}
@Test
@@ -92,8 +92,8 @@ public class BinlogOffsetTest {
BinlogOffset offset2 =
BinlogOffset.builder().setGtidSet(FULL_GTID_SET).setSkipEvents(10).build();
- assetCompareTo(offset1, offset2, -1);
- assetCompareTo(offset2, offset1, 1);
+ assertCompareTo(offset1, offset2, -1);
+ assertCompareTo(offset2, offset1, 1);
// Test same GTID and skip events but different skip rows
BinlogOffset offset3 =
@@ -109,8 +109,8 @@ public class BinlogOffsetTest {
.setSkipRows(20)
.build();
- assetCompareTo(offset3, offset4, -1);
- assetCompareTo(offset4, offset3, 1);
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
}
@Test
@@ -125,8 +125,8 @@ public class BinlogOffsetTest {
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
456).build();
// When one has GTID and another doesn't, the one without GTID is
considered older
- assetCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
- assetCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
+ assertCompareTo(offsetWithGtid, offsetWithoutGtid, 1);
+ assertCompareTo(offsetWithoutGtid, offsetWithGtid, -1);
// Test the reverse scenario
BinlogOffset offsetWithGtid2 =
@@ -137,8 +137,8 @@ public class BinlogOffsetTest {
.setSkipEvents(5)
.build();
- assetCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
- assetCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
+ assertCompareTo(offsetWithGtid2, offsetWithoutGtid2, 1);
+ assertCompareTo(offsetWithoutGtid2, offsetWithGtid2, -1);
}
@Test
@@ -148,23 +148,23 @@ public class BinlogOffsetTest {
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
BinlogOffset offset2 =
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
- assetCompareTo(offset1, offset2, 0);
+ assertCompareTo(offset1, offset2, 0);
// Test different file names
BinlogOffset offset3 =
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
123).build();
BinlogOffset offset4 =
BinlogOffset.builder().setBinlogFilePosition("binlog.002",
123).build();
- assetCompareTo(offset3, offset4, -1);
- assetCompareTo(offset4, offset3, 1);
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
// Test different positions in same file
BinlogOffset offset5 =
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
100).build();
BinlogOffset offset6 =
BinlogOffset.builder().setBinlogFilePosition("binlog.001",
200).build();
- assetCompareTo(offset5, offset6, -1);
- assetCompareTo(offset6, offset5, 1);
+ assertCompareTo(offset5, offset6, -1);
+ assertCompareTo(offset6, offset5, 1);
}
@Test
@@ -180,8 +180,8 @@ public class BinlogOffsetTest {
.setBinlogFilePosition("binlog.001", 123)
.setSkipEvents(10)
.build();
- assetCompareTo(offset1, offset2, -1);
- assetCompareTo(offset2, offset1, 1);
+ assertCompareTo(offset1, offset2, -1);
+ assertCompareTo(offset2, offset1, 1);
// Test with skip rows
BinlogOffset offset3 =
@@ -196,8 +196,32 @@ public class BinlogOffsetTest {
.setSkipEvents(5)
.setSkipRows(20)
.build();
- assetCompareTo(offset3, offset4, -1);
- assetCompareTo(offset4, offset3, 1);
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
+ }
+
+ @Test
+ public void testCompareToWithDifferentFilenameLength() {
+ BinlogOffset offset1 =
+ BinlogOffset.builder().setBinlogFilePosition("mysql.999",
123).build();
+ BinlogOffset offset2 =
+ BinlogOffset.builder().setBinlogFilePosition("mysql.1000",
123).build();
+ assertCompareTo(offset1, offset2, -1);
+ assertCompareTo(offset2, offset1, 1);
+
+ BinlogOffset offset3 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.99",
100).build();
+ BinlogOffset offset4 =
+ BinlogOffset.builder().setBinlogFilePosition("binlog.100",
100).build();
+ assertCompareTo(offset3, offset4, -1);
+ assertCompareTo(offset4, offset3, 1);
+
+ BinlogOffset offset5 =
+ BinlogOffset.builder().setBinlogFilePosition("mysql.9999",
50).build();
+ BinlogOffset offset6 =
+ BinlogOffset.builder().setBinlogFilePosition("mysql.10000",
50).build();
+ assertCompareTo(offset5, offset6, -1);
+ assertCompareTo(offset6, offset5, 1);
}
@Test
@@ -217,8 +241,8 @@ public class BinlogOffsetTest {
.build();
// Should compare based on timestamp since server IDs are different
- assetCompareTo(offset1, offset2, -1);
- assetCompareTo(offset2, offset1, 1);
+ assertCompareTo(offset1, offset2, -1);
+ assertCompareTo(offset2, offset1, 1);
// Test same timestamps but different server IDs
BinlogOffset offset3 =
@@ -239,11 +263,12 @@ public class BinlogOffsetTest {
// But since server IDs are different and timestamps are same, it will
fall through to file
// position comparison
// Since file positions are same, it will compare skip events (default
0)
- assetCompareTo(offset3, offset4, 0);
+ assertCompareTo(offset3, offset4, 0);
}
- private void assetCompareTo(BinlogOffset offset1, BinlogOffset offset2,
int expected) {
+ private void assertCompareTo(BinlogOffset offset1, BinlogOffset offset2,
int expected) {
int actual = offset1.compareTo(offset2);
- Assertions.assertThat(expected).isEqualTo(actual);
+ // compareTo does not guarantee returning -1, 0, or 1. Just check the
sign.
+ Assertions.assertThat(Integer.signum(actual)).isEqualTo(expected);
}
}