This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 164671558 [FLINK-37484][cdc-connector] unevenly chunk split also 
enables unbounded-chunk-first.enabled. (#3954)
164671558 is described below

commit 164671558ad41751d8c250b37f3341afecbba413
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Apr 8 14:25:42 2025 +0800

    [FLINK-37484][cdc-connector] unevenly chunk split also enables 
unbounded-chunk-first.enabled. (#3954)
---
 .../source/assigner/SnapshotSplitAssigner.java     |  22 +++--
 .../base/source/assigner/splitter/ChunkRange.java  |   5 +
 .../assigner/splitter/JdbcSourceChunkSplitter.java |  11 +--
 .../mysql/source/assigners/ChunkRange.java         |   5 +
 .../mysql/source/assigners/MySqlChunkSplitter.java |  12 +--
 .../assigners/MySqlSnapshotSplitAssigner.java      |  17 +++-
 .../source/assigners/MySqlChunkSplitterTest.java   |  30 ------
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 103 +++++++++++++++------
 .../postgres/source/PostgresSourceITCase.java      |  25 +++--
 9 files changed, 132 insertions(+), 98 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index be5b04fe6..0b1a4100b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -366,16 +366,20 @@ public class SnapshotSplitAssigner<C extends 
SourceConfig> implements SplitAssig
                     hasRecordSchema = true;
                     
tableSchemas.putAll(splits.iterator().next().getTableSchemas());
                 }
-                final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
-                        splits.stream()
-                                .map(SnapshotSplit::toSchemalessSnapshotSplit)
-                                .collect(Collectors.toList());
+                List<String> splitIds = new ArrayList<>();
+                for (SnapshotSplit split : splits) {
+                    SchemalessSnapshotSplit schemalessSnapshotSplit =
+                            split.toSchemalessSnapshotSplit();
+                    splitIds.add(schemalessSnapshotSplit.splitId());
+                    if (sourceConfig.isAssignUnboundedChunkFirst() && 
split.getSplitEnd() == null) {
+                        // assign unbounded split first
+                        remainingSplits.add(0, schemalessSnapshotSplit);
+                    } else {
+                        remainingSplits.add(schemalessSnapshotSplit);
+                    }
+                }
+
                 chunkNum += splits.size();
-                remainingSplits.addAll(schemalessSnapshotSplits);
-                List<String> splitIds =
-                        schemalessSnapshotSplits.stream()
-                                .map(SchemalessSnapshotSplit::splitId)
-                                .collect(Collectors.toList());
                 
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
 
                 if (!chunkSplitter.hasNextChunk()) {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
index 5734fecea..59d56a275 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
@@ -83,4 +83,9 @@ public class ChunkRange {
     public int hashCode() {
         return Objects.hash(chunkStart, chunkEnd);
     }
+
+    @Override
+    public String toString() {
+        return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" + 
chunkEnd + '}';
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index cd1930a68..09fe4b90d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -469,15 +469,8 @@ public abstract class JdbcSourceChunkSplitter implements 
ChunkSplitter {
                 break;
             }
         }
-        // add the unbounded split
-        // assign unbounded split first, both the largest and smallest 
unbounded chunks are
-        // completed
-        // in the first two splits
-        if (sourceConfig.isAssignUnboundedChunkFirst()) {
-            splits.add(0, ChunkRange.of(chunkStart, null));
-        } else {
-            splits.add(ChunkRange.of(chunkStart, null));
-        }
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
         return splits;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
index ff2affe28..8df82777c 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
@@ -83,4 +83,9 @@ class ChunkRange {
     public int hashCode() {
         return Objects.hash(chunkStart, chunkEnd);
     }
+
+    @Override
+    public String toString() {
+        return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" + 
chunkEnd + '}';
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index b986b5e81..4821eaba2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -316,15 +316,9 @@ public class MySqlChunkSplitter implements ChunkSplitter {
                 break;
             }
         }
-        // add the unbounded split
-        // assign unbounded split first, both the largest and smallest 
unbounded chunks are
-        // completed
-        // in the first two splits
-        if (sourceConfig.isAssignUnboundedChunkFirst()) {
-            splits.add(0, ChunkRange.of(chunkStart, null));
-        } else {
-            splits.add(ChunkRange.of(chunkStart, null));
-        }
+
+        // add the ending split
+        splits.add(ChunkRange.of(chunkStart, null));
         return splits;
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index f65d96c60..e295fb51f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -324,12 +324,19 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
                     
tableSchema.putAll(splits.iterator().next().getTableSchemas());
                     tableSchemas.putAll(tableSchema);
                 }
-                final List<MySqlSchemalessSnapshotSplit> 
schemaLessSnapshotSplits =
-                        splits.stream()
-                                
.map(MySqlSnapshotSplit::toSchemalessSnapshotSplit)
-                                .collect(Collectors.toList());
+
+                for (MySqlSnapshotSplit split : splits) {
+                    MySqlSchemalessSnapshotSplit schemalessSnapshotSplit =
+                            split.toSchemalessSnapshotSplit();
+                    if (sourceConfig.isAssignUnboundedChunkFirst() && 
split.getSplitEnd() == null) {
+                        // assign unbounded split first
+                        remainingSplits.add(0, schemalessSnapshotSplit);
+                    } else {
+                        remainingSplits.add(schemalessSnapshotSplit);
+                    }
+                }
+
                 chunkNum += splits.size();
-                remainingSplits.addAll(schemaLessSnapshotSplits);
                 if (!chunkSplitter.hasNextChunk()) {
                     remainingTables.remove(nextTable);
                 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 381de76ca..33f50333a 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -85,34 +85,4 @@ class MySqlChunkSplitterTest {
                         ChunkRange.of(2147483637, 2147483647),
                         ChunkRange.of(2147483647, null));
     }
-
-    @Test
-    public void testSplitEvenlySizedChunksEndingFirst() {
-        MySqlSourceConfig sourceConfig =
-                new MySqlSourceConfigFactory()
-                        .startupOptions(StartupOptions.initial())
-                        .databaseList("")
-                        .tableList("")
-                        .hostname("")
-                        .username("")
-                        .password("")
-                        .serverTimeZone(ZoneId.of("UTC").toString())
-                        .assignUnboundedChunkFirst(true)
-                        .createConfig(0);
-        MySqlChunkSplitter splitter = new MySqlChunkSplitter(null, 
sourceConfig);
-
-        List<ChunkRange> res =
-                splitter.splitEvenlySizedChunks(
-                        new TableId("catalog", "db", "tab"),
-                        Integer.MAX_VALUE - 20,
-                        Integer.MAX_VALUE,
-                        20,
-                        10,
-                        10);
-        Assertions.assertThat(res)
-                .containsExactly(
-                        ChunkRange.of(2147483647, null),
-                        ChunkRange.of(null, 2147483637),
-                        ChunkRange.of(2147483637, 2147483647));
-    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 37d9fe6fd..3511326cd 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -51,6 +51,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
 import static 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
 import static 
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
 import static 
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link MySqlSnapshotSplitAssigner}. */
 class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
@@ -76,7 +77,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_even_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -88,7 +89,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -107,7 +108,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_even_dist", 
"customers_sparse_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -127,7 +128,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"shopping_cart"},
                         "product_kind");
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -145,7 +146,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"evenly_shopping_cart"},
                         "product_no");
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -175,7 +176,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"shopping_cart_big"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -191,7 +192,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"address"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -206,7 +207,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"shopping_cart_dec"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -225,7 +226,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customer_card"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -243,7 +244,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         2000.0d,
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_sparse_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
 
         // test sparse table with smaller distribution factor upper
         List<String> expected1 =
@@ -257,7 +258,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         2.0d,
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_sparse_dist"});
-        Assertions.assertThat(splits1).isEqualTo(expected1);
+        assertThat(splits1).isEqualTo(expected1);
 
         // test sparse table that the approximate row count is bigger than 
chunk size
         List<String> expected2 =
@@ -268,7 +269,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         10d,
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_sparse_dist"});
-        Assertions.assertThat(splits2).isEqualTo(expected2);
+        assertThat(splits2).isEqualTo(expected2);
     }
 
     @Test
@@ -285,7 +286,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_dense_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
 
         // test dense table with bigger dense distribution factor lower
         List<String> expected1 =
@@ -296,7 +297,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         0.9d,
                         new String[] {"customers_dense_dist"});
-        Assertions.assertThat(splits1).isEqualTo(expected1);
+        assertThat(splits1).isEqualTo(expected1);
     }
 
     @Test
@@ -308,7 +309,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customer_card_single_line"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -325,7 +326,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"shopping_cart"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -342,7 +343,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"shopping_cart"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -360,7 +361,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_even_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -372,7 +373,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_even_dist"});
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -384,7 +385,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                     
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                     new String[] {"customer_card"});
         } catch (Throwable t) {
-            Assertions.assertThat(t)
+            assertThat(t)
                     .hasStackTraceContaining(
                             "The defined primary key [card_no] in Flink is not 
matched with actual primary key [card_no, level] in MySQL");
         }
@@ -425,7 +426,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {tableWithoutPrimaryKey},
                         "id");
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -448,7 +449,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {tableWithoutPrimaryKey},
                         "name");
-        Assertions.assertThat(splits).isEqualTo(expected);
+        assertThat(splits).isEqualTo(expected);
     }
 
     @Test
@@ -461,6 +462,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         new String[] {"customers_even_dist"},
                         "id",
+                        false,
                         false);
 
         final MySqlSnapshotSplitAssigner assigner =
@@ -471,10 +473,10 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         false,
                         getMySqlSplitEnumeratorContext());
 
-        Assertions.assertThat(assigner.needToDiscoveryTables()).isTrue();
+        assertThat(assigner.needToDiscoveryTables()).isTrue();
         assigner.open();
-        Assertions.assertThat(assigner.getNext()).isPresent();
-        Assertions.assertThat(assigner.needToDiscoveryTables()).isFalse();
+        assertThat(assigner.getNext()).isPresent();
+        assertThat(assigner.needToDiscoveryTables()).isFalse();
     }
 
     @Test
@@ -486,7 +488,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         "customers_even_dist [10] [18]",
                         "customers_even_dist [18] null",
                         "customer_card_single_line null null");
-        Assertions.assertThat(
+        assertThat(
                         getTestAssignSnapshotSplitsFromCheckpoint(
                                 AssignerStatus.INITIAL_ASSIGNING_FINISHED))
                 .isEqualTo(expected);
@@ -500,12 +502,31 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         "customers_even_dist null [10]",
                         "customers_even_dist [10] [18]",
                         "customers_even_dist [18] null");
-        Assertions.assertThat(
+        assertThat(
                         getTestAssignSnapshotSplitsFromCheckpoint(
                                 
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED))
                 .isEqualTo(expected);
     }
 
+    @Test
+    void testSplitEvenlySizedChunksEndingFirst() {
+        List<String> expected =
+                Arrays.asList(
+                        "evenly_shopping_cart [109] null",
+                        "evenly_shopping_cart null [105]",
+                        "evenly_shopping_cart [105] [109]");
+        List<String> splits =
+                getTestAssignSnapshotSplits(
+                        customerDatabase,
+                        4,
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {"evenly_shopping_cart"},
+                        "product_no",
+                        true);
+        assertThat(splits).isEqualTo(expected);
+    }
+
     private List<String> getTestAssignSnapshotSplits(
             int splitSize,
             double distributionFactorUpper,
@@ -527,6 +548,24 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
             double distributionFactorLower,
             String[] captureTables,
             String chunkKeyColumn) {
+        return getTestAssignSnapshotSplits(
+                database,
+                splitSize,
+                distributionFactorUpper,
+                distributionFactorLower,
+                captureTables,
+                chunkKeyColumn,
+                false);
+    }
+
+    private List<String> getTestAssignSnapshotSplits(
+            UniqueDatabase database,
+            int splitSize,
+            double distributionFactorUpper,
+            double distributionFactorLower,
+            String[] captureTables,
+            String chunkKeyColumn,
+            boolean assignUnboundedChunkFirst) {
         MySqlSourceConfig configuration =
                 getConfig(
                         database,
@@ -535,7 +574,8 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         distributionFactorLower,
                         captureTables,
                         chunkKeyColumn,
-                        false);
+                        false,
+                        assignUnboundedChunkFirst);
         List<TableId> remainingTables =
                 Arrays.stream(captureTables)
                         .map(t -> database.getDatabaseName() + "." + t)
@@ -567,7 +607,8 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                         
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
                         captureTables,
                         null,
-                        true);
+                        true,
+                        false);
         List<TableId> remainingTables = new ArrayList<>();
         List<TableId> alreadyProcessedTables = new ArrayList<>();
         alreadyProcessedTables.add(processedTable);
@@ -684,7 +725,8 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
             double distributionLower,
             String[] captureTables,
             String chunkKeyColumn,
-            boolean scanNewlyAddedTableEnabled) {
+            boolean scanNewlyAddedTableEnabled,
+            boolean assignUnboundedChunkFirst) {
         Map<ObjectPath, String> chunkKeys = new HashMap<>();
         for (String table : captureTables) {
             chunkKeys.put(new ObjectPath(database.getDatabaseName(), table), 
chunkKeyColumn);
@@ -708,6 +750,7 @@ class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
                 .serverTimeZone(ZoneId.of("UTC").toString())
                 .chunkKeyColumn(chunkKeys)
                 .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+                .assignUnboundedChunkFirst(assignUnboundedChunkFirst)
                 .createConfig(0);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index d2c50d15b..30dbc77d2 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -53,6 +53,7 @@ import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assumptions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -305,6 +306,19 @@ class PostgresSourceITCase extends PostgresTestBase {
                 
Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true"));
     }
 
+    @Test
+    void testReadSingleTableWithSingleParallelismAndUnboundedChunkFirst() 
throws Exception {
+        testPostgresParallelSource(
+                DEFAULT_PARALLELISM,
+                DEFAULT_SCAN_STARTUP_MODE,
+                PostgresTestUtils.FailoverType.TM,
+                PostgresTestUtils.FailoverPhase.SNAPSHOT,
+                new String[] {"Customers"},
+                RestartStrategies.fixedDelayRestart(1, 0),
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled", "true"));
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {"initial", "latest-offset"})
     void testDebeziumSlotDropOnStop(String scanStartupMode) throws Exception {
@@ -370,9 +384,8 @@ class PostgresSourceITCase extends PostgresTestBase {
         optionalJobClient.get().cancel().get();
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"initial", "latest-offset"})
-    void testSnapshotOnlyModeWithDMLPostHighWaterMark(String scanStartupMode) 
throws Exception {
+    @Test
+    void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
         // The data num is 21, set fetchSize = 22 to test the job is bounded.
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -403,9 +416,8 @@ class PostgresSourceITCase extends PostgresTestBase {
         assertEqualsInAnyOrder(expectedRecords, records);
     }
 
-    @ParameterizedTest
-    @ValueSource(strings = {"initial", "latest-offset"})
-    void testSnapshotOnlyModeWithDMLPreHighWaterMark(String scanStartupMode) 
throws Exception {
+    @Test
+    void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
         // The data num is 21, set fetchSize = 22 to test the job is bounded
         List<String> records =
                 testBackfillWhenWritingEvents(
@@ -1066,6 +1078,7 @@ class PostgresSourceITCase extends PostgresTestBase {
             PostgresTestUtils.FailoverPhase failoverPhase,
             String[] captureCustomerTables)
             throws Exception {
+
         waitUntilJobRunning(tableResult);
         CloseableIterator<Row> iterator = tableResult.collect();
         Optional<JobClient> optionalJobClient = tableResult.getJobClient();

Reply via email to