This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 91fc7bb50 [FLINK-35740][cdc-connector][mysql] Allow column as chunk 
key even it's not primary key
91fc7bb50 is described below

commit 91fc7bb506eb86f16e75624000f732b5e1ec573a
Author: SeungMin <semi...@gmail.com>
AuthorDate: Wed Jul 24 20:22:49 2024 +0900

    [FLINK-35740][cdc-connector][mysql] Allow column as chunk key even it's not 
primary key
    
    This closes #3448.
---
 .../docs/connectors/flink-sources/mysql-cdc.md     |  8 ++--
 .../docs/connectors/flink-sources/mysql-cdc.md     |  8 ++--
 .../connectors/base/options/JdbcSourceOptions.java |  3 +-
 .../mysql/source/config/MySqlSourceOptions.java    |  3 +-
 .../connectors/mysql/source/utils/ChunkUtils.java  | 11 +++--
 .../connectors/mysql/source/utils/RecordUtils.java | 17 +++-----
 .../assigners/MySqlSnapshotSplitAssignerTest.java  | 47 +++++++++++++++++++++-
 7 files changed, 69 insertions(+), 28 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
index 1d81555de..3a2f2a6a6 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
@@ -500,7 +500,7 @@ CREATE TABLE products (
 * (3)在快照读取之前,Source 不需要数据库锁权限。
 
 如果希望 source 并行运行,则每个并行 reader 都应该具有唯一的 server id,因此`server id`的范围必须类似于 
`5400-6400`,
-且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 首先通过表的主键将表划分成多个块(chunk),
+且范围必须大于并行度。在增量快照读取过程中,MySQL CDC Source 源首先会根据您指定的表块键将表分块(chunk)
 然后 MySQL CDC Source 将多个块分配给多个 reader 以并行读取表的数据。
 
 #### 并发读取
@@ -550,7 +550,7 @@ MySQL 集群中你监控的服务器出现故障后, 你只需将受监视的服
 
 当 MySQL CDC Source 启动时,它并行读取表的快照,然后以单并行度的方式读取表的 binlog。
 
-在快照阶段,根据表的主键和表行的大小将快照切割成多个快照块。
+在快照阶段,快照会根据表的分块键和表行的大小切割成多个快照块。
 快照块被分配给多个快照读取器。每个快照读取器使用 [区块读取算法](#snapshot-chunk-reading) 并将读取的数据发送到下游。
 Source 会管理块的进程状态(完成或未完成),因此快照阶段的 Source 可以支持块级别的 checkpoint。
 如果发生故障,可以恢复 Source 并继续从最后完成的块中读取块。
@@ -565,7 +565,9 @@ Flink 定期为 Source 执行 checkpoint,在故障转移的情况下,作业
 
 在执行增量快照读取时,MySQL CDC source 需要一个用于分片的的算法。
 MySQL CDC Source 使用主键列将表划分为多个分片(chunk)。 默认情况下,MySQL CDC source 
会识别表的主键列,并使用主键中的第一列作为用作分片列。
-如果表中没有主键, 增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 来回退到旧的快照读取机制。
+如果表中没有主键,用户必须指定 `scan.incremental.snapshot.chunk.key-column`、
+否则增量快照读取将失败,你可以禁用 `scan.incremental.snapshot.enabled` 恢复到旧的快照读取机制。
+请注意,使用不在主键中的列作为分块键可能会降低表的查询性能。
 
 对于数值和自动增量拆分列,MySQL CDC Source 按固定步长高效地拆分块。
 例如,如果你有一个主键列为`id`的表,它是自动增量 BIGINT 类型,最小值为`0`,最大值为`100`,
diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md 
b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index a677e440f..8bee137ef 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -530,7 +530,7 @@ Incremental snapshot reading is a new mechanism to read 
snapshot of a table. Com
 If you would like the source run in parallel, each parallel reader should have 
an unique server id, so the 'server-id' must be a range like '5400-6400', 
 and the range must be larger than the parallelism.
 
-During the incremental snapshot reading, the MySQL CDC Source firstly splits 
snapshot chunks (splits) by primary key of table,
+During the incremental snapshot reading, the MySQL CDC Source firstly splits 
snapshot chunks (splits) by user specified chunk key of table,
 and then MySQL CDC Source assigns the chunks to multiple readers to read the 
data of snapshot chunk.
 
 #### Controlling Parallelism
@@ -580,7 +580,7 @@ The CDC job may restart fails in this case. So the 
heartbeat event will help upd
 
 When the MySQL CDC source is started, it reads snapshot of table parallelly 
and then reads binlog of table with single parallelism.
 
-In snapshot phase, the snapshot is cut into multiple snapshot chunks according 
to primary key of table and the size of table rows.
+In snapshot phase, the snapshot is cut into multiple snapshot chunks according 
to chunk key of table and the size of table rows.
 Snapshot chunks is assigned to multiple snapshot readers. Each snapshot reader 
reads its received chunks with [chunk reading 
algorithm](#snapshot-chunk-reading) and send the read data to downstream.
 The source manages the process status (finished or not) of chunks, thus the 
source of snapshot phase can support checkpoint in chunk level.
 If a failure happens, the source can be restored and continue to read chunks 
from last finished chunks.
@@ -596,7 +596,9 @@ Flink performs checkpoints for the source periodically, in 
case of failover, the
 
 When performing incremental snapshot reading, MySQL CDC source need a 
criterion which used to split the table.
 MySQL CDC Source use a splitting column to split the table to multiple splits 
(chunks). By default, MySQL CDC source will identify the primary key column of 
the table and use the first column in primary key as the splitting column.
-If there is no primary key in the table, incremental snapshot reading will 
fail and you can disable `scan.incremental.snapshot.enabled` to fallback to old 
snapshot reading mechanism.
+If there is no primary key in the table, user must specify 
`scan.incremental.snapshot.chunk.key-column`, 
+otherwise incremental snapshot reading will fail and you can disable 
`scan.incremental.snapshot.enabled` to fallback to old snapshot reading 
mechanism.
+Please note that using a column not in primary key as a chunk key can result 
in slower table query performance.
 
 For numeric and auto incremental splitting column, MySQL CDC Source 
efficiently splits chunks by fixed step length.
 For example, if you had a table with a primary key column of `id` which is 
auto-incremental BIGINT type, the minimum value was `0` and maximum value was 
`100`,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java
index fedd6664e..422476364 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/options/JdbcSourceOptions.java
@@ -109,6 +109,5 @@ public class JdbcSourceOptions extends SourceOptions {
                     .noDefaultValue()
                     .withDescription(
                             "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
-                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle."
-                                    + "This column must be a column of the 
primary key.");
+                                    + "By default, the chunk key is the first 
column of the primary key and the chunk key is the RowId in oracle.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index 1a1c3f254..f3424c8df 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -243,8 +243,7 @@ public class MySqlSourceOptions {
                     .noDefaultValue()
                     .withDescription(
                             "The chunk key of table snapshot, captured tables 
are split into multiple chunks by a chunk key when read the snapshot of table."
-                                    + "By default, the chunk key is the first 
column of the primary key."
-                                    + "This column must be a column of the 
primary key.");
+                                    + "By default, the chunk key is the first 
column of the primary key.");
 
     @Experimental
     public static final ConfigOption<Boolean> 
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
index 2b40533bb..493256099 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/ChunkUtils.java
@@ -57,9 +57,8 @@ public class ChunkUtils {
 
     /**
      * Get the chunk key column. This column could be set by `chunkKeyColumn`. 
If the table doesn't
-     * have primary keys, `chunkKeyColumn` must be set. If the table has 
primary keys,
-     * `chunkKeyColumn` must be a column of them or else null. When the 
parameter `chunkKeyColumn`
-     * is not set and the table has primary keys, return the first column of 
primary keys.
+     * have primary keys, `chunkKeyColumn` must be set. When the parameter 
`chunkKeyColumn` is not
+     * set and the table has primary keys, return the first column of primary 
keys.
      */
     public static Column getChunkKeyColumn(Table table, Map<ObjectPath, 
String> chunkKeyColumns) {
         List<Column> primaryKeys = table.primaryKeyColumns();
@@ -68,7 +67,8 @@ public class ChunkUtils {
             throw new ValidationException(
                     "'scan.incremental.snapshot.chunk.key-column' must be set 
when the table doesn't have primary keys.");
         }
-        List<Column> searchColumns = primaryKeys.isEmpty() ? table.columns() : 
primaryKeys;
+
+        List<Column> searchColumns = table.columns();
         if (chunkKeyColumn != null) {
             Optional<Column> targetColumn =
                     searchColumns.stream()
@@ -79,9 +79,8 @@ public class ChunkUtils {
             }
             throw new ValidationException(
                     String.format(
-                            "Chunk key column '%s' doesn't exist in the %s 
[%s] of the table %s.",
+                            "Chunk key column '%s' doesn't exist in the 
columns [%s] of the table %s.",
                             chunkKeyColumn,
-                            primaryKeys.isEmpty() ? "user specified columns" : 
"primary keys",
                             searchColumns.stream()
                                     .map(Column::name)
                                     .collect(Collectors.joining(",")),
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
index 8a3a904f9..d85944adb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/RecordUtils.java
@@ -82,12 +82,7 @@ public class RecordUtils {
     }
 
     public static Struct getStructContainsChunkKey(SourceRecord record) {
-        // If the table has primary keys, chunk key is in the record key struct
-        if (record.key() != null) {
-            return (Struct) record.key();
-        }
-
-        // If the table doesn't have primary keys, chunk key is in the after 
struct for insert or
+        // Use chunk key in the after struct for insert or
         // the before struct for delete/update
         Envelope.Operation op = Envelope.operationFor(record);
         Struct value = (Struct) record.value();
@@ -109,9 +104,9 @@ public class RecordUtils {
         if (isDataChangeRecord(binlogRecord)) {
             Struct value = (Struct) binlogRecord.value();
             if (value != null) {
-                Struct keyStruct = getStructContainsChunkKey(binlogRecord);
+                Struct chunkKeyStruct = 
getStructContainsChunkKey(binlogRecord);
                 if (splitKeyRangeContains(
-                        getSplitKey(splitBoundaryType, nameAdjuster, 
keyStruct),
+                        getSplitKey(splitBoundaryType, nameAdjuster, 
chunkKeyStruct),
                         splitStart,
                         splitEnd)) {
                     boolean hasPrimaryKey = binlogRecord.key() != null;
@@ -124,7 +119,7 @@ public class RecordUtils {
                                     snapshotRecords,
                                     binlogRecord,
                                     hasPrimaryKey
-                                            ? keyStruct
+                                            ? (Struct) binlogRecord.key()
                                             : createReadOpValue(
                                                     binlogRecord, 
Envelope.FieldName.AFTER),
                                     false);
@@ -152,7 +147,7 @@ public class RecordUtils {
                             upsertBinlog(
                                     snapshotRecords,
                                     binlogRecord,
-                                    hasPrimaryKey ? keyStruct : 
structFromAfter,
+                                    hasPrimaryKey ? (Struct) 
binlogRecord.key() : structFromAfter,
                                     false);
                             break;
                         case DELETE:
@@ -160,7 +155,7 @@ public class RecordUtils {
                                     snapshotRecords,
                                     binlogRecord,
                                     hasPrimaryKey
-                                            ? keyStruct
+                                            ? (Struct) binlogRecord.key()
                                             : createReadOpValue(
                                                     binlogRecord, 
Envelope.FieldName.BEFORE),
                                     true);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
index fe4ccd1a2..759827d9d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java
@@ -166,7 +166,7 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
             assertTrue(
                     ExceptionUtils.findThrowableWithMessage(
                                     t,
-                                    "Chunk key column 'errorCol' doesn't exist 
in the primary keys [card_no,level] of the table")
+                                    "Chunk key column 'errorCol' doesn't exist 
in the columns [card_no,level,name,note] of the table")
                             .isPresent());
         }
     }
@@ -416,6 +416,51 @@ public class MySqlSnapshotSplitAssignerTest extends 
MySqlSourceTestBase {
         }
     }
 
+    @Test
+    public void testAssignTableWithoutPrimaryKeyWithChunkKeyColumn() {
+        String tableWithoutPrimaryKey = "customers_no_pk";
+        List<String> expected =
+                Arrays.asList(
+                        "customers_no_pk null [462]",
+                        "customers_no_pk [462] [823]",
+                        "customers_no_pk [823] [1184]",
+                        "customers_no_pk [1184] [1545]",
+                        "customers_no_pk [1545] [1906]",
+                        "customers_no_pk [1906] null");
+        List<String> splits =
+                getTestAssignSnapshotSplits(
+                        customerDatabase,
+                        4,
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {tableWithoutPrimaryKey},
+                        "id");
+        assertEquals(expected, splits);
+    }
+
+    @Test
+    public void 
testAssignTableWithPrimaryKeyWithChunkKeyColumnNotInPrimaryKey() {
+        String tableWithoutPrimaryKey = "customers";
+        List<String> expected =
+                Arrays.asList(
+                        "customers null [user_12]",
+                        "customers [user_12] [user_15]",
+                        "customers [user_15] [user_18]",
+                        "customers [user_18] [user_20]",
+                        "customers [user_20] [user_4]",
+                        "customers [user_4] [user_7]",
+                        "customers [user_7] null");
+        List<String> splits =
+                getTestAssignSnapshotSplits(
+                        customerDatabase,
+                        4,
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
+                        
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
+                        new String[] {tableWithoutPrimaryKey},
+                        "name");
+        assertEquals(expected, splits);
+    }
+
     @Test
     public void testEnumerateTablesLazily() {
         final MySqlSourceConfig configuration =

Reply via email to