This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 164671558 [FLINK-37484][cdc-connector] unevenly chunk split also
enables unbounded-chunk-first.enabled. (#3954)
164671558 is described below
commit 164671558ad41751d8c250b37f3341afecbba413
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Apr 8 14:25:42 2025 +0800
[FLINK-37484][cdc-connector] unevenly chunk split also enables
unbounded-chunk-first.enabled. (#3954)
---
.../source/assigner/SnapshotSplitAssigner.java | 22 +++--
.../base/source/assigner/splitter/ChunkRange.java | 5 +
.../assigner/splitter/JdbcSourceChunkSplitter.java | 11 +--
.../mysql/source/assigners/ChunkRange.java | 5 +
.../mysql/source/assigners/MySqlChunkSplitter.java | 12 +--
.../assigners/MySqlSnapshotSplitAssigner.java | 17 +++-
.../source/assigners/MySqlChunkSplitterTest.java | 30 ------
.../assigners/MySqlSnapshotSplitAssignerTest.java | 103 +++++++++++++++------
.../postgres/source/PostgresSourceITCase.java | 25 +++--
9 files changed, 132 insertions(+), 98 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
index be5b04fe6..0b1a4100b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java
@@ -366,16 +366,20 @@ public class SnapshotSplitAssigner<C extends
SourceConfig> implements SplitAssig
hasRecordSchema = true;
tableSchemas.putAll(splits.iterator().next().getTableSchemas());
}
- final List<SchemalessSnapshotSplit> schemalessSnapshotSplits =
- splits.stream()
- .map(SnapshotSplit::toSchemalessSnapshotSplit)
- .collect(Collectors.toList());
+ List<String> splitIds = new ArrayList<>();
+ for (SnapshotSplit split : splits) {
+ SchemalessSnapshotSplit schemalessSnapshotSplit =
+ split.toSchemalessSnapshotSplit();
+ splitIds.add(schemalessSnapshotSplit.splitId());
+ if (sourceConfig.isAssignUnboundedChunkFirst() &&
split.getSplitEnd() == null) {
+ // assign unbounded split first
+ remainingSplits.add(0, schemalessSnapshotSplit);
+ } else {
+ remainingSplits.add(schemalessSnapshotSplit);
+ }
+ }
+
chunkNum += splits.size();
- remainingSplits.addAll(schemalessSnapshotSplits);
- List<String> splitIds =
- schemalessSnapshotSplits.stream()
- .map(SchemalessSnapshotSplit::splitId)
- .collect(Collectors.toList());
enumeratorMetrics.getTableMetrics(nextTable).addNewSplits(splitIds);
if (!chunkSplitter.hasNextChunk()) {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
index 5734fecea..59d56a275 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/ChunkRange.java
@@ -83,4 +83,9 @@ public class ChunkRange {
public int hashCode() {
return Objects.hash(chunkStart, chunkEnd);
}
+
+ @Override
+ public String toString() {
+ return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" +
chunkEnd + '}';
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
index cd1930a68..09fe4b90d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/splitter/JdbcSourceChunkSplitter.java
@@ -469,15 +469,8 @@ public abstract class JdbcSourceChunkSplitter implements
ChunkSplitter {
break;
}
}
- // add the unbounded split
- // assign unbounded split first, both the largest and smallest
unbounded chunks are
- // completed
- // in the first two splits
- if (sourceConfig.isAssignUnboundedChunkFirst()) {
- splits.add(0, ChunkRange.of(chunkStart, null));
- } else {
- splits.add(ChunkRange.of(chunkStart, null));
- }
+ // add the ending split
+ splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
index ff2affe28..8df82777c 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/ChunkRange.java
@@ -83,4 +83,9 @@ class ChunkRange {
public int hashCode() {
return Objects.hash(chunkStart, chunkEnd);
}
+
+ @Override
+ public String toString() {
+ return "ChunkRange{chunkStart=" + chunkStart + ", chunkEnd=" +
chunkEnd + '}';
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
index b986b5e81..4821eaba2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java
@@ -316,15 +316,9 @@ public class MySqlChunkSplitter implements ChunkSplitter {
break;
}
}
- // add the unbounded split
- // assign unbounded split first, both the largest and smallest
unbounded chunks are
- // completed
- // in the first two splits
- if (sourceConfig.isAssignUnboundedChunkFirst()) {
- splits.add(0, ChunkRange.of(chunkStart, null));
- } else {
- splits.add(ChunkRange.of(chunkStart, null));
- }
+
+ // add the ending split
+ splits.add(ChunkRange.of(chunkStart, null));
return splits;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index f65d96c60..e295fb51f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -324,12 +324,19 @@ public class MySqlSnapshotSplitAssigner implements
MySqlSplitAssigner {
tableSchema.putAll(splits.iterator().next().getTableSchemas());
tableSchemas.putAll(tableSchema);
}
- final List<MySqlSchemalessSnapshotSplit>
schemaLessSnapshotSplits =
- splits.stream()
-
.map(MySqlSnapshotSplit::toSchemalessSnapshotSplit)
- .collect(Collectors.toList());
+
+ for (MySqlSnapshotSplit split : splits) {
+ MySqlSchemalessSnapshotSplit schemalessSnapshotSplit =
+ split.toSchemalessSnapshotSplit();
+ if (sourceConfig.isAssignUnboundedChunkFirst() &&
split.getSplitEnd() == null) {
+ // assign unbounded split first
+ remainingSplits.add(0, schemalessSnapshotSplit);
+ } else {
+ remainingSplits.add(schemalessSnapshotSplit);
+ }
+ }
+
chunkNum += splits.size();
- remainingSplits.addAll(schemaLessSnapshotSplits);
if (!chunkSplitter.hasNextChunk()) {
remainingTables.remove(nextTable);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 381de76ca..33f50333a 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -85,34 +85,4 @@ class MySqlChunkSplitterTest {
ChunkRange.of(2147483637, 2147483647),
ChunkRange.of(2147483647, null));
}
-
- @Test
- public void testSplitEvenlySizedChunksEndingFirst() {
- MySqlSourceConfig sourceConfig =
- new MySqlSourceConfigFactory()
- .startupOptions(StartupOptions.initial())
- .databaseList("")
- .tableList("")
- .hostname("")
- .username("")
- .password("")
- .serverTimeZone(ZoneId.of("UTC").toString())
- .assignUnboundedChunkFirst(true)
- .createConfig(0);
- MySqlChunkSplitter splitter = new MySqlChunkSplitter(null,
sourceConfig);
-
- List<ChunkRange> res =
- splitter.splitEvenlySizedChunks(
- new TableId("catalog", "db", "tab"),
- Integer.MAX_VALUE - 20,
- Integer.MAX_VALUE,
- 20,
- 10,
- 10);
- Assertions.assertThat(res)
- .containsExactly(
- ChunkRange.of(2147483647, null),
- ChunkRange.of(null, 2147483637),
- ChunkRange.of(2147483637, 2147483647));
- }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index 37d9fe6fd..3511326cd 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -51,6 +51,7 @@ import static
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOpt
import static
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static
org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
import static
org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link MySqlSnapshotSplitAssigner}. */
class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase {
@@ -76,7 +77,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -88,7 +89,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -107,7 +108,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist",
"customers_sparse_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -127,7 +128,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"shopping_cart"},
"product_kind");
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -145,7 +146,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"evenly_shopping_cart"},
"product_no");
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -175,7 +176,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"shopping_cart_big"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -191,7 +192,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"address"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -206,7 +207,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"shopping_cart_dec"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -225,7 +226,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customer_card"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -243,7 +244,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
2000.0d,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_sparse_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
// test sparse table with smaller distribution factor upper
List<String> expected1 =
@@ -257,7 +258,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
2.0d,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_sparse_dist"});
- Assertions.assertThat(splits1).isEqualTo(expected1);
+ assertThat(splits1).isEqualTo(expected1);
// test sparse table that the approximate row count is bigger than
chunk size
List<String> expected2 =
@@ -268,7 +269,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
10d,
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_sparse_dist"});
- Assertions.assertThat(splits2).isEqualTo(expected2);
+ assertThat(splits2).isEqualTo(expected2);
}
@Test
@@ -285,7 +286,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_dense_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
// test dense table with bigger dense distribution factor lower
List<String> expected1 =
@@ -296,7 +297,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
0.9d,
new String[] {"customers_dense_dist"});
- Assertions.assertThat(splits1).isEqualTo(expected1);
+ assertThat(splits1).isEqualTo(expected1);
}
@Test
@@ -308,7 +309,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customer_card_single_line"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -325,7 +326,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"shopping_cart"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -342,7 +343,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"shopping_cart"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -360,7 +361,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -372,7 +373,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist"});
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -384,7 +385,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customer_card"});
} catch (Throwable t) {
- Assertions.assertThat(t)
+ assertThat(t)
.hasStackTraceContaining(
"The defined primary key [card_no] in Flink is not
matched with actual primary key [card_no, level] in MySQL");
}
@@ -425,7 +426,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey},
"id");
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -448,7 +449,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {tableWithoutPrimaryKey},
"name");
- Assertions.assertThat(splits).isEqualTo(expected);
+ assertThat(splits).isEqualTo(expected);
}
@Test
@@ -461,6 +462,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
new String[] {"customers_even_dist"},
"id",
+ false,
false);
final MySqlSnapshotSplitAssigner assigner =
@@ -471,10 +473,10 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
false,
getMySqlSplitEnumeratorContext());
- Assertions.assertThat(assigner.needToDiscoveryTables()).isTrue();
+ assertThat(assigner.needToDiscoveryTables()).isTrue();
assigner.open();
- Assertions.assertThat(assigner.getNext()).isPresent();
- Assertions.assertThat(assigner.needToDiscoveryTables()).isFalse();
+ assertThat(assigner.getNext()).isPresent();
+ assertThat(assigner.needToDiscoveryTables()).isFalse();
}
@Test
@@ -486,7 +488,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
"customers_even_dist [10] [18]",
"customers_even_dist [18] null",
"customer_card_single_line null null");
- Assertions.assertThat(
+ assertThat(
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.INITIAL_ASSIGNING_FINISHED))
.isEqualTo(expected);
@@ -500,12 +502,31 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
"customers_even_dist null [10]",
"customers_even_dist [10] [18]",
"customers_even_dist [18] null");
- Assertions.assertThat(
+ assertThat(
getTestAssignSnapshotSplitsFromCheckpoint(
AssignerStatus.NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED))
.isEqualTo(expected);
}
+ @Test
+ void testSplitEvenlySizedChunksEndingFirst() {
+ List<String> expected =
+ Arrays.asList(
+ "evenly_shopping_cart [109] null",
+ "evenly_shopping_cart null [105]",
+ "evenly_shopping_cart [105] [109]");
+ List<String> splits =
+ getTestAssignSnapshotSplits(
+ customerDatabase,
+ 4,
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+ new String[] {"evenly_shopping_cart"},
+ "product_no",
+ true);
+ assertThat(splits).isEqualTo(expected);
+ }
+
private List<String> getTestAssignSnapshotSplits(
int splitSize,
double distributionFactorUpper,
@@ -527,6 +548,24 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
double distributionFactorLower,
String[] captureTables,
String chunkKeyColumn) {
+ return getTestAssignSnapshotSplits(
+ database,
+ splitSize,
+ distributionFactorUpper,
+ distributionFactorLower,
+ captureTables,
+ chunkKeyColumn,
+ false);
+ }
+
+ private List<String> getTestAssignSnapshotSplits(
+ UniqueDatabase database,
+ int splitSize,
+ double distributionFactorUpper,
+ double distributionFactorLower,
+ String[] captureTables,
+ String chunkKeyColumn,
+ boolean assignUnboundedChunkFirst) {
MySqlSourceConfig configuration =
getConfig(
database,
@@ -535,7 +574,8 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
distributionFactorLower,
captureTables,
chunkKeyColumn,
- false);
+ false,
+ assignUnboundedChunkFirst);
List<TableId> remainingTables =
Arrays.stream(captureTables)
.map(t -> database.getDatabaseName() + "." + t)
@@ -567,7 +607,8 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
captureTables,
null,
- true);
+ true,
+ false);
List<TableId> remainingTables = new ArrayList<>();
List<TableId> alreadyProcessedTables = new ArrayList<>();
alreadyProcessedTables.add(processedTable);
@@ -684,7 +725,8 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
double distributionLower,
String[] captureTables,
String chunkKeyColumn,
- boolean scanNewlyAddedTableEnabled) {
+ boolean scanNewlyAddedTableEnabled,
+ boolean assignUnboundedChunkFirst) {
Map<ObjectPath, String> chunkKeys = new HashMap<>();
for (String table : captureTables) {
chunkKeys.put(new ObjectPath(database.getDatabaseName(), table),
chunkKeyColumn);
@@ -708,6 +750,7 @@ class MySqlSnapshotSplitAssignerTest extends
MySqlSourceTestBase {
.serverTimeZone(ZoneId.of("UTC").toString())
.chunkKeyColumn(chunkKeys)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
+ .assignUnboundedChunkFirst(assignUnboundedChunkFirst)
.createConfig(0);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index d2c50d15b..30dbc77d2 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -53,6 +53,7 @@ import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
@@ -305,6 +306,19 @@ class PostgresSourceITCase extends PostgresTestBase {
Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true"));
}
+ @Test
+ void testReadSingleTableWithSingleParallelismAndUnboundedChunkFirst()
throws Exception {
+ testPostgresParallelSource(
+ DEFAULT_PARALLELISM,
+ DEFAULT_SCAN_STARTUP_MODE,
+ PostgresTestUtils.FailoverType.TM,
+ PostgresTestUtils.FailoverPhase.SNAPSHOT,
+ new String[] {"Customers"},
+ RestartStrategies.fixedDelayRestart(1, 0),
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled", "true"));
+ }
+
@ParameterizedTest
@ValueSource(strings = {"initial", "latest-offset"})
void testDebeziumSlotDropOnStop(String scanStartupMode) throws Exception {
@@ -370,9 +384,8 @@ class PostgresSourceITCase extends PostgresTestBase {
optionalJobClient.get().cancel().get();
}
- @ParameterizedTest
- @ValueSource(strings = {"initial", "latest-offset"})
- void testSnapshotOnlyModeWithDMLPostHighWaterMark(String scanStartupMode)
throws Exception {
+ @Test
+ void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
// The data num is 21, set fetchSize = 22 to test the job is bounded.
List<String> records =
testBackfillWhenWritingEvents(
@@ -403,9 +416,8 @@ class PostgresSourceITCase extends PostgresTestBase {
assertEqualsInAnyOrder(expectedRecords, records);
}
- @ParameterizedTest
- @ValueSource(strings = {"initial", "latest-offset"})
- void testSnapshotOnlyModeWithDMLPreHighWaterMark(String scanStartupMode)
throws Exception {
+ @Test
+ void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
// The data num is 21, set fetchSize = 22 to test the job is bounded
List<String> records =
testBackfillWhenWritingEvents(
@@ -1066,6 +1078,7 @@ class PostgresSourceITCase extends PostgresTestBase {
PostgresTestUtils.FailoverPhase failoverPhase,
String[] captureCustomerTables)
throws Exception {
+
waitUntilJobRunning(tableResult);
CloseableIterator<Row> iterator = tableResult.collect();
Optional<JobClient> optionalJobClient = tableResult.getJobClient();