This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aeb5e8c9 [FLINK-37224][docs] Add the missing documents and parameters 
of MongoDB CDC
6aeb5e8c9 is described below

commit 6aeb5e8c9fb2a59b46adc3bf2349a59e9deac2f7
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Feb 6 10:46:16 2025 +0800

    [FLINK-37224][docs] Add the missing documents and parameters of MongoDB CDC
    
    This closes #3895
---
 .../docs/connectors/flink-sources/mongodb-cdc.md   | 56 ++++++++++++++++++++++
 1 file changed, 56 insertions(+)

diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md 
b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
index 636c229f1..9491de622 100644
--- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
@@ -488,6 +488,62 @@ public class MongoDBIncrementalSourceExample {
 - 如果使用数据库正则表达式,则需要 `readAnyDatabase` 角色。
 - 增量快照功能仅支持 MongoDB 4.0 之后的版本。
 
+### 完整的 Changelog
+
+MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。
+
+- 前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。
+
+- 后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。
+
+MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 
`ChangelogNormalize` 下游节点。
+
+为了启用此功能,你需要满足以下条件:
+
+- MongoDB 的版本必须为 6.0 或更高版本。
+- 启用 `preAndPostImages` 功能。
+
+```javascript
+db.runCommand({
+  setClusterParameter: {
+    changeStreamOptions: {
+      preAndPostImages: {
+        expireAfterSeconds: 'off' // replace with custom image expiration time
+      }
+    }
+  }
+})
+```
+
+- 为希望监控的 collection 启用 `changeStreamPreAndPostImages` 功能:
+```javascript
+db.runCommand({
+  collMod: "<< collection name >>", 
+  changeStreamPreAndPostImages: {
+    enabled: true 
+  } 
+})
+```
+
+在 DataStream 中开启 MongoDB CDC 的 `scan.full-changelog` 功能:
+
+```java
+MongoDBSource.builder()
+    .scanFullChangelog(true)
+    ...
+    .build()
+```
+
+或者使用 Flink SQL:
+
+```SQL
+CREATE TABLE mongodb_source (...) WITH (
+    'connector' = 'mongodb-cdc',
+    'scan.full-changelog' = 'true',
+    ...
+)
+```
+
 数据类型映射
 ----------------
 [BSON](https://docs.mongodb.com/manual/reference/bson-types/) **二进制 
JSON**的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。

Reply via email to