This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2e4abdb68 [FLINK-36683][cdc-connector/mongo] Support metadata
'row_kind' virtual column
2e4abdb68 is described below
commit 2e4abdb68ed960f09610cb8be332b770a07ba53e
Author: Runkang He <[email protected]>
AuthorDate: Mon Apr 7 17:04:29 2025 +0800
[FLINK-36683][cdc-connector/mongo] Support metadata 'row_kind' virtual
column
This closes #3705
---
.../docs/connectors/flink-sources/mongodb-cdc.md | 41 +++---
.../docs/connectors/flink-sources/mongodb-cdc.md | 24 ++--
.../mongodb/table/MongoDBReadableMetadata.java | 24 ++++
.../mongodb/source/MongoDBFullChangelogITCase.java | 147 +++++++++++++++++++++
.../mongodb/table/MongoDBTableFactoryTest.java | 7 +-
5 files changed, 215 insertions(+), 28 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index 5f12524a8..f52b3be98 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -296,6 +296,17 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
<td>Boolean</td>
<td>是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为
true。</td>
</tr>
+ <tr>
+ <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>
+ 快照读取阶段是否先分配 UnboundedChunk。<br>
+ 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
+ 这是一项实验特性,默认为 false。
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -333,15 +344,10 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
<td>它指示在数据库中进行更改的时间。 <br>如果记录是从表的快照而不是改变流中读取的,该值将始终为0。</td>
</tr>
<tr>
- <td>scan.incremental.snapshot.unbounded-chunk-first.enabled</td>
- <td>optional</td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>
- 快照读取阶段是否先分配 UnboundedChunk。<br>
- 这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
- 这是一项实验特性,默认为 false。
- </td>
+ <td>row_kind</td>
+ <td>STRING NOT NULL</td>
+ <td>当前记录对应的 changelog 类型。注意:当 Source 算子选择为每条记录输出 row_kind 字段后,下游 SQL
算子在处理消息撤回时会因为这个字段不同而比对失败,
+建议只在简单的同步作业中引用该元数据列。<br>'+I' 表示 INSERT 数据,'-D' 表示 DELETE 数据,'-U' 表示
UPDATE_BEFORE 数据,'+U' 表示 UPDATE_AFTER 数据。</td>
</tr>
</tbody>
</table>
@@ -349,15 +355,16 @@ MongoDB 的更改事件记录在消息之前没有更新。因此,我们只能
扩展的 CREATE TABLE 示例演示了用于公开这些元数据字段的语法:
```sql
CREATE TABLE products (
- db_name STRING METADATA FROM 'database_name' VIRTUAL,
+ db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
- operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
- _id STRING, // 必须声明
- name STRING,
- weight DECIMAL(10,3),
- tags ARRAY<STRING>, -- array
- price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
- suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
+ operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
+ _id STRING, // 必须声明
+ name STRING,
+ weight DECIMAL(10,3),
+ tags ARRAY<STRING>, -- array
+ price ROW<amount DECIMAL(10,2), currency STRING>, -- 嵌入式文档
+ suppliers ARRAY<ROW<name STRING, address STRING>>, -- 嵌入式文档
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
diff --git a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
index aebbbf08d..c4fb00af7 100644
--- a/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mongodb-cdc.md
@@ -368,21 +368,29 @@ The following format metadata can be exposed as read-only
(VIRTUAL) columns in a
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
<td>It indicates the time that the change was made in the database.
<br>If the record is read from snapshot of the table instead of the change
stream, the value is always 0.</td>
</tr>
+ <tr>
+ <td>row_kind</td>
+ <td>STRING NOT NULL</td>
+ <td>It indicates the row kind of the changelog,Note: The downstream SQL
operator may fail to compare due to this new added column when processing the
row retraction if
+the source operator chooses to output the 'row_kind' column for each record.
It is recommended to use this metadata column only in simple synchronization
jobs.
+<br>'+I' means INSERT message, '-D' means DELETE message, '-U' means
UPDATE_BEFORE message and '+U' means UPDATE_AFTER message.</td>
+ </tr>
</tbody>
</table>
The extended CREATE TABLE example demonstrates the syntax for exposing these
metadata fields:
```sql
CREATE TABLE products (
- db_name STRING METADATA FROM 'database_name' VIRTUAL,
+ db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
- operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
- _id STRING, // must be declared
- name STRING,
- weight DECIMAL(10,3),
- tags ARRAY<STRING>, -- array
- price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
- suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
+ operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
+ operation STRING METADATA FROM 'row_kind' VIRTUAL,
+ _id STRING, // must be declared
+ name STRING,
+ weight DECIMAL(10,3),
+ tags ARRAY<STRING>, -- array
+ price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded
document
+ suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded
documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
index c2baf021c..581873d20 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBReadableMetadata.java
@@ -19,7 +19,9 @@ package org.apache.flink.cdc.connectors.mongodb.table;
import org.apache.flink.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
+import org.apache.flink.cdc.debezium.table.RowDataMetadataConverter;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
@@ -81,6 +83,28 @@ public enum MongoDBReadableMetadata {
return TimestampData.fromEpochMillis(
(Long)
source.get(AbstractSourceInfo.TIMESTAMP_KEY));
}
+ }),
+
+ /**
+ * It indicates the row kind of the changelog. '+I' means INSERT message,
'-D' means DELETE
+ * message, '-U' means UPDATE_BEFORE message and '+U' means UPDATE_AFTER
message
+ */
+ ROW_KIND(
+ "row_kind",
+ DataTypes.STRING().notNull(),
+ new RowDataMetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(RowData rowData) {
+ return
StringData.fromString(rowData.getRowKind().shortString());
+ }
+
+ @Override
+ public Object read(SourceRecord record) {
+ throw new UnsupportedOperationException(
+ "Please call read(RowData rowData) method
instead.");
+ }
});
private final String key;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
index 7500a64ae..257f7ce1f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.java
@@ -56,6 +56,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils.assertEqualsInAnyOrder;
import static
org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -541,6 +542,152 @@ class MongoDBFullChangelogITCase extends
MongoDBSourceTestBase {
return records;
}
+ @ParameterizedTest(name = "parallelismSnapshot: {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMetadataColumns(boolean parallelismSnapshot) throws
Exception {
+ testMongoDBParallelSourceWithMetadataColumns(
+ DEFAULT_PARALLELISM, new String[] {"customers"}, true,
parallelismSnapshot);
+ }
+
+ private void testMongoDBParallelSourceWithMetadataColumns(
+ int parallelism,
+ String[] captureCustomerCollections,
+ boolean skipSnapshotBackfill,
+ boolean parallelismSnapshot)
+ throws Exception {
+ String customerDatabase =
+ "customer_" + Integer.toUnsignedString(new Random().nextInt(),
36);
+
+ // A - enable system-level fulldoc pre & post image feature
+ MONGO_CONTAINER.executeCommand(
+ "use admin; db.runCommand({ setClusterParameter: {
changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
+
+ // B - enable collection-level fulldoc pre & post image for change
capture collection
+ for (String collectionName : captureCustomerCollections) {
+ MONGO_CONTAINER.executeCommandInDatabase(
+ String.format(
+ "db.createCollection('%s'); db.runCommand({
collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })",
+ collectionName, collectionName),
+ customerDatabase);
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ env.setParallelism(parallelism);
+ env.enableCheckpointing(200L);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE customers ("
+ + " _id STRING NOT NULL,"
+ + " cid BIGINT NOT NULL,"
+ + " name STRING,"
+ + " address STRING,"
+ + " phone_number STRING,"
+ + " database_name STRING METADATA VIRTUAL,"
+ + " collection_name STRING METADATA VIRTUAL,"
+ + " row_kind STRING METADATA VIRTUAL,"
+ + " primary key (_id) not enforced"
+ + ") WITH ("
+ + " 'connector' = 'mongodb-cdc',"
+ + " 'scan.incremental.snapshot.enabled' =
'%s',"
+ + " 'hosts' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database' = '%s',"
+ + " 'collection' = '%s',"
+ + " 'heartbeat.interval.ms' = '500',"
+ + " 'scan.full-changelog' = 'true',"
+ + " 'scan.incremental.snapshot.backfill.skip'
= '%s'"
+ + ")",
+ parallelismSnapshot ? "true" : "false",
+ MONGO_CONTAINER.getHostAndPort(),
+ FLINK_USER,
+ FLINK_USER_PASSWORD,
+ customerDatabase,
+ getCollectionNameRegex(customerDatabase,
captureCustomerCollections),
+ skipSnapshotBackfill);
+
+ MONGO_CONTAINER.executeCommandFileInDatabase("customer",
customerDatabase);
+
+ // first step: check the snapshot data
+ List<String> snapshotForSingleTable =
+ Stream.of(
+ "+I[%s, %s, +I, 101, user_1, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 102, user_2, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 103, user_3, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 109, user_4, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 110, user_5, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 111, user_6, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 118, user_7, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 121, user_8, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 123, user_9, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1009, user_10, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1010, user_11, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1011, user_12, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1012, user_13, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1013, user_14, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1014, user_15, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1015, user_16, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1016, user_17, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1017, user_18, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1018, user_19, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 1019, user_20, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 2000, user_21, Shanghai,
123567891234]")
+ .map(s -> String.format(s, customerDatabase,
captureCustomerCollections[0]))
+ .collect(Collectors.toList());
+
+ tEnv.executeSql(sourceDDL);
+ TableResult tableResult =
+ tEnv.executeSql(
+ "select database_name, collection_name, row_kind, "
+ + "cid, name, address, phone_number from
customers");
+ CloseableIterator<Row> iterator = tableResult.collect();
+ JobID jobId = tableResult.getJobClient().get().getJobID();
+ List<String> expectedSnapshotData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerCollections.length; i++) {
+ expectedSnapshotData.addAll(snapshotForSingleTable);
+ }
+
+ assertEqualsInAnyOrder(
+ expectedSnapshotData, fetchRows(iterator,
expectedSnapshotData.size()));
+
+ // second step: check the change stream data
+ for (String collectionName : captureCustomerCollections) {
+ makeFirstPartChangeStreamEvents(
+ mongodbClient.getDatabase(customerDatabase),
collectionName);
+ }
+ for (String collectionName : captureCustomerCollections) {
+ makeSecondPartChangeStreamEvents(
+ mongodbClient.getDatabase(customerDatabase),
collectionName);
+ }
+
+ List<String> changeEventsForSingleTable =
+ Stream.of(
+ "-U[%s, %s, -U, 101, user_1, Shanghai,
123567891234]",
+ "+U[%s, %s, +U, 101, user_1, Hangzhou,
123567891234]",
+ "-D[%s, %s, -D, 102, user_2, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 102, user_2, Shanghai,
123567891234]",
+ "-U[%s, %s, -U, 103, user_3, Shanghai,
123567891234]",
+ "+U[%s, %s, +U, 103, user_3, Hangzhou,
123567891234]",
+ "-U[%s, %s, -U, 1010, user_11, Shanghai,
123567891234]",
+ "+U[%s, %s, +U, 1010, user_11, Hangzhou,
123567891234]",
+ "+I[%s, %s, +I, 2001, user_22, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 2002, user_23, Shanghai,
123567891234]",
+ "+I[%s, %s, +I, 2003, user_24, Shanghai,
123567891234]")
+ .map(s -> String.format(s, customerDatabase,
captureCustomerCollections[0]))
+ .collect(Collectors.toList());
+ List<String> expectedChangeStreamData = new ArrayList<>();
+ for (int i = 0; i < captureCustomerCollections.length; i++) {
+ expectedChangeStreamData.addAll(changeEventsForSingleTable);
+ }
+ List<String> actualChangeStreamData = fetchRows(iterator,
expectedChangeStreamData.size());
+ assertEqualsInAnyOrder(expectedChangeStreamData,
actualChangeStreamData);
+ tableResult.getJobClient().get().cancel().get();
+ }
+
private void testMongoDBParallelSource(
MongoDBTestUtils.FailoverType failoverType,
MongoDBTestUtils.FailoverPhase failoverPhase,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index c4613351e..5960fed9f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -87,7 +87,8 @@ class MongoDBTableFactoryTest {
Column.physical("eee", DataTypes.TIMESTAMP(3)),
Column.metadata("time",
DataTypes.TIMESTAMP_LTZ(3), "op_ts", true),
Column.metadata(
- "_database_name", DataTypes.STRING(),
"database_name", true)),
+ "_database_name", DataTypes.STRING(),
"database_name", true),
+ Column.metadata("_row_kind", DataTypes.STRING(),
"row_kind", true)),
Collections.emptyList(),
UniqueConstraint.primaryKey("pk",
Collections.singletonList("_id")));
@@ -222,7 +223,7 @@ class MongoDBTableFactoryTest {
DynamicTableSource actualSource =
createTableSource(SCHEMA_WITH_METADATA, properties);
MongoDBTableSource mongoDBSource = (MongoDBTableSource) actualSource;
mongoDBSource.applyReadableMetadata(
- Arrays.asList("op_ts", "database_name"),
+ Arrays.asList("op_ts", "database_name", "row_kind"),
SCHEMA_WITH_METADATA.toSourceRowDataType());
actualSource = mongoDBSource.copy();
@@ -255,7 +256,7 @@ class MongoDBTableFactoryTest {
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue());
expectedSource.producedDataType =
SCHEMA_WITH_METADATA.toSourceRowDataType();
- expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
+ expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name",
"row_kind");
Assertions.assertThat(actualSource).isEqualTo(expectedSource);