This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bad12d4d0 [cdc] Add debezium-bson format document and bugfix bson 
value convert to java value (#4935)
0bad12d4d0 is described below

commit 0bad12d4d04a6fb52bae2203964d7e70365a76fb
Author: chuangchuang <[email protected]>
AuthorDate: Wed Mar 12 15:08:05 2025 +0800

    [cdc] Add debezium-bson format document and bugfix bson value convert to 
java value (#4935)
---
 docs/content/cdc-ingestion/debezium-bson.md        | 329 +++++++++++++++++++++
 docs/content/cdc-ingestion/kafka-cdc.md            |   4 +
 .../format/debezium/DebeziumBsonRecordParser.java  |  84 +++++-
 .../format/debezium/DebeziumJsonRecordParser.java  |   4 +-
 .../KafkaDebeziumJsonDeserializationSchema.java    |   9 +-
 .../action/cdc/mongodb/BsonValueConvertor.java     |  46 ++-
 .../debezium/DebeziumBsonRecordParserTest.java     | 267 +++++++++++++++++
 .../kafka/debezium-bson/table/event/event-bson.txt |  19 ++
 .../debezium-bson/table/event/event-delete.txt     |  23 ++
 .../debezium-bson/table/event/event-insert.txt     |  19 ++
 .../kafka/debezium-bson/table/event/event-json.txt |  19 ++
 .../debezium-bson/table/event/event-update.txt     |  21 ++
 12 files changed, 807 insertions(+), 37 deletions(-)

diff --git a/docs/content/cdc-ingestion/debezium-bson.md 
b/docs/content/cdc-ingestion/debezium-bson.md
new file mode 100644
index 0000000000..52eb0422d5
--- /dev/null
+++ b/docs/content/cdc-ingestion/debezium-bson.md
@@ -0,0 +1,329 @@
+---
+title: "Debezium BSON"
+weight: 6
+type: docs
+aliases:
+- /cdc-ingestion/debezium-bson.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Debezium BSON Format
+
+
+The debezium-bson format is one of the formats supported by <a href="{{< ref 
"/cdc-ingestion/kafka-cdc" >}}">Kafka CDC</a>.
+It is the format obtained by collecting mongodb through debezium, which is 
similar to 
+<a 
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/debezium/";>debezium-json</a>
 format.
+However, MongoDB does not have a fixed schema, and the field types of each 
document may be different, so the before/after fields
+in JSON are all string types, while the debezium-json format requires a JSON 
object type.
+
+
+## Prepare MongoDB BSON Jar
+
+Can be downloaded from the [Maven 
repository](https://mvnrepository.com/artifact/org.mongodb/bson)
+
+```
+bson-*.jar
+```
+
+## Introduction
+
+{{< hint info >}}
+The debezium bson format requires insert/update/delete event messages include 
the full document, and include a field that represents the state of the 
document before the change.
+This requires setting debezium's capture.mode to 
change_streams_update_full_with_pre_image and 
[capture.mode.full.update.type](https://debezium.io/documentation/reference/stable/connectors/mongodb.html#mongodb-property-capture-mode-full-update-type)
 to post_image.
+Before version 6.0 of MongoDB, it was not possible to obtain 'Update Before' 
information. Therefore, using the id field in the Kafka Key as 'Update before' 
information
+{{< /hint >}}
+
+Here is a simple example for an update operation captured from a Mongodb 
customers collection in JSON format:
+
+```json
+{
+  "schema": {
+    "type": "struct",
+    "fields": [
+      {
+        "type": "string",
+        "optional": true,
+        "name": "io.debezium.data.Json",
+        "version": 1,
+        "field": "before"
+      },
+      {
+        "type": "string",
+        "optional": true,
+        "name": "io.debezium.data.Json",
+        "version": 1,
+        "field": "after"
+      },
+      ...
+    ]
+  },
+  "payload": {
+    "before": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : 
\"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, 
\"tags\":[\"success\"]}",
+    "after": "{\"_id\": {\"$oid\" : \"596e275826f08b2730779e1f\"}, \"name\" : 
\"Anne\", \"create_time\" : {\"$numberLong\" : \"1558965506000\"}, 
\"tags\":[\"passion\",\"success\"]}",
+    "source": {
+      "db": "inventory",
+      "rs": "rs0",
+      "collection": "customers",
+      ...
+    },
+    "op": "u",
+    "ts_ms": 1558965515240,
+    "ts_us": 1558965515240142,
+    "ts_ns": 1558965515240142879
+  }
+}
+```
+
+This document from the MongoDB collection customers has 4 columns, the _id is 
a BSON ObjectID, name is a string,
+create_time is a long, tags is an array of string. The following is the 
processing result in debezium-bson format:
+
+Document Schema:
+
+| Field Name | Field Type | Key         |
+|------------|------------|-------------|
+| _id        | STRING     | Primary Key |
+| name       | STRING     |             |
+| create_time| STRING     |             |
+| tags       | STRING     |             |
+
+Records:
+
+| RowKind | _id                      | name  | create_time                | 
tags                  |
+|---------|--------------------------|-------|----------------------------|-----------------------|
+|   -U    | 596e275826f08b2730779e1f | Anne  | 1558965506000              | 
["success"]           |
+|   +U    | 596e275826f08b2730779e1f | Anne  | 1558965506000              | 
["passion","success"] |
+
+
+### How it works
+Because the schema field of the event message does not have the field 
information of the document, the debezium-bson format does not require event 
messages to have schema information. The specific operations are as follows:
+
+- Parse the before/after fields of the event message into BSONDocument.
+- Recursive traversal all fields of BSONDocument and convert BsonValue to Java 
Object.
+- All top-level fields of before/after are converted to string type, and _id 
is fixed to primary key
+- If the top-level fields of before/after is a basic type(such as 
Integer/Long, etc.), it is directly converted to a string, if not, it is 
converted to a JSON string
+
+Below is a list of top-level field BsonValue conversion examples:
+
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">BsonValue Type</th>
+        <th class="text-left" style="width: 40%">Json Value</th>
+        <th class="text-left" style="width: 40%">Conversion Result String</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>BsonString</h5></td>
+        <td>"hello"</td>
+        <td>"hello"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonInt32</h5></td>
+        <td>123</td>
+        <td>"123"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonInt64</h5></td>
+        <td>
+            <ul>
+                <li>1735934393769</li>
+                <li>{"$numberLong": "1735934393769"}</li>
+            </ul>
+        </td>
+        <td>"1735934393769"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonDouble</h5></td>
+        <td>
+            <ul>
+                <li>{"$numberDouble": "3.14"}</li>
+                <li>{"$numberDouble": "NaN"}</li>
+                <li>{"$numberDouble": "Infinity"}</li>
+                <li>{"$numberDouble": "-Infinity"}</li>
+            </ul>
+        </td>
+        <td>
+            <ul>
+                <li>"3.14"</li>
+                <li>"NaN"</li>
+                <li>"Infinity"</li>
+                <li>"-Infinity"</li>
+            </ul>
+        </td>
+    </tr>
+    <tr>
+        <td><h5>BsonBoolean</h5></td>
+        <td>
+            <ul>
+                <li>true</li>
+                <li>false</li>
+            </ul>
+        </td>
+        <td>
+            <ul>
+                <li>"true"</li>
+                <li>"false"</li>
+            </ul>
+        </td>
+    </tr>
+    <tr>
+        <td><h5>BsonArray</h5></td>
+        <td>[1,2,{"$numberLong": "1735934393769"}]</td>
+        <td>"[1,2,1735934393769]"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonObjectId</h5></td>
+        <td>{"$oid": "596e275826f08b2730779e1f"}</td>
+        <td>"596e275826f08b2730779e1f"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonDateTime</h5></td>
+        <td>{"$date": 1735934393769 }</td>
+        <td>"1735934393769"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonNull</h5></td>
+        <td>null</td>
+        <td>null</td>
+    </tr>
+    <tr>
+        <td><h5>BsonUndefined</h5></td>
+        <td>{"$undefined": true}</td>
+        <td>null</td>
+    </tr>
+    <tr>
+        <td><h5>BsonBinary</h5></td>
+        <td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "0"}</td>
+        <td>"uE2/4v5MSVOiJZkOo3APKQ=="</td>
+    </tr>
+    <tr>
+        <td><h5>BsonBinary(type=UUID)</h5></td>
+        <td>{"$binary": "uE2/4v5MSVOiJZkOo3APKQ==", "$type": "4"}</td>
+        <td>"b84dbfe2-fe4c-4953-a225-990ea3700f29"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonDecimal128</h5></td>
+        <td>
+            <ul>
+                <li>{"$numberDecimal": "3.14"}</li>
+                <li>{"$numberDecimal": "NaN"}</li>
+            </ul>
+        </td>
+        <td>
+            <ul>
+                <li>"3.14"</li>
+                <li>"NaN"</li>
+            </ul>
+        </td>
+    </tr>
+    <tr>
+        <td><h5>BsonRegularExpression</h5></td>
+        <td>{"$regularExpression": {"pattern": "^pass$", "options": "i"}}</td>
+        <td>"/^pass$/i"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonSymbol</h5></td>
+        <td>{"$symbol": "symbol"}</td>
+        <td>"symbol"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonTimestamp</h5></td>
+        <td>{"$timestamp": {"t": 1736997330, "i": 2}}</td>
+        <td>"1736997330"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonMinKey</h5></td>
+        <td>{"$minKey": 1}</td>
+        <td>"BsonMinKey"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonMaxKey</h5></td>
+        <td>{"$maxKey": 1}</td>
+        <td>"BsonMaxKey"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonJavaScript</h5></td>
+        <td>{"$code": "function(){}"}</td>
+        <td>"function(){}"</td>
+    </tr>
+    <tr>
+        <td><h5>BsonJavaScriptWithScope</h5></td>
+        <td>{"$code": "function(){}", "$scope": {"name": "Anne"}}</td>
+        <td>'{"$code": "function(){}", "$scope": {"name": "Anne"}}'</td>
+    </tr>
+    <tr>
+        <td><h5>BsonDocument</h5></td>
+        <td>
+<pre>
+{
+  "decimalPi": {"$numberDecimal": "3.14"},
+  "doublePi": {"$numberDouble": "3.14"},
+  "doubleNaN": {"$numberDouble": "NaN"},
+  "decimalNaN": {"$numberDecimal": "NaN"},
+  "long": {"$numberLong": "100"},
+  "bool": true,
+  "array": [
+    {"$numberInt": "1"},
+    {"$numberLong": "2"}
+  ]
+}
+</pre>
+        </td>
+        <td>
+<pre>
+'{
+  "decimalPi":3.14,
+  "doublePi":3.14,
+  "doubleNaN":"NaN",
+  "decimalNaN":"NaN",
+  "long":100,
+  "bool":true,
+  "array":[1,2]
+}'
+</pre>
+        </td>
+    </tr>
+    </tbody>
+</table>
+
+
+### How to use
+Use debezium-bson by adding the kafka_conf parameter 
**value.format=debezium-bson**. Let’s take table synchronization as an example:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    kafka_sync_table \
+    --warehouse hdfs:///path/to/warehouse \
+    --database test_db \
+    --table ods_mongodb_customers \
+    --primary_keys _id \
+    --kafka_conf properties.bootstrap.servers=127.0.0.1:9020 \
+    --kafka_conf topic=customers \
+    --kafka_conf properties.group.id=123456 \
+    --kafka_conf value.format=debezium-bson \
+    --catalog_conf metastore=filesystem \
+    --table_conf bucket=4 \
+    --table_conf changelog-producer=input \
+    --table_conf sink.parallelism=4
+```
+
+
diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index 20ea3e6199..dfad34fad3 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -67,6 +67,10 @@ If a message in a Kafka topic is a change event captured 
from another database u
          <td><a 
href="https://docs.aws.amazon.com/dms/latest/userguide/Welcome.html";>aws-dms-json</a></td>
         <td>True</td>
         </tr>
+        <tr>
+         <td><a href="{{< ref "/cdc-ingestion/debezium-bson" 
>}}">debezium-bson</a></td>
+        <td>True</td>
+        </tr>
     </tbody>
 </table>
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 10dcd56b80..397575c8e5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -22,13 +22,17 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.TypeUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
 
 import org.bson.BsonDocument;
 import org.bson.BsonValue;
@@ -37,14 +41,23 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_BEFORE;
 import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_PAYLOAD;
 import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_SCHEMA;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.FIELD_TYPE;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_DELETE;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_INSERT;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_READE;
+import static 
org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils.OP_UPDATE;
+import static org.apache.paimon.utils.JsonSerdeUtil.fromJson;
+import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
 import static org.apache.paimon.utils.JsonSerdeUtil.writeValueAsString;
 
 /**
@@ -63,19 +76,47 @@ public class DebeziumBsonRecordParser extends 
DebeziumJsonRecordParser {
 
     private static final String FIELD_COLLECTION = "collection";
     private static final String FIELD_OBJECT_ID = "_id";
+    private static final String FIELD_KEY_ID = "id";
     private static final List<String> PRIMARY_KEYS = 
Collections.singletonList(FIELD_OBJECT_ID);
 
+    private ObjectNode keyRoot;
+
     public DebeziumBsonRecordParser(TypeMapping typeMapping, 
List<ComputedColumn> computedColumns) {
         super(typeMapping, computedColumns);
     }
 
+    @Override
+    public List<RichCdcMultiplexRecord> extractRecords() {
+        String operation = getAndCheck(FIELD_TYPE).asText();
+        List<RichCdcMultiplexRecord> records = new ArrayList<>();
+        switch (operation) {
+            case OP_INSERT:
+            case OP_READE:
+                processRecord(getData(), RowKind.INSERT, records);
+                break;
+            case OP_UPDATE:
+                processDeleteRecord(operation, records);
+                processRecord(getData(), RowKind.INSERT, records);
+                break;
+            case OP_DELETE:
+                processDeleteRecord(operation, records);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unknown record 
operation: " + operation);
+        }
+        return records;
+    }
+
     @Override
     protected void setRoot(CdcSourceRecord record) {
-        JsonNode node = (JsonNode) record.getValue();
-        if (node.has(FIELD_SCHEMA)) {
-            root = node.get(FIELD_PAYLOAD);
-        } else {
-            root = node;
+        root = (JsonNode) record.getValue();
+        if (root.has(FIELD_SCHEMA)) {
+            root = root.get(FIELD_PAYLOAD);
+        }
+
+        keyRoot = (ObjectNode) record.getKey();
+        if (!isNull(keyRoot) && keyRoot.has(FIELD_SCHEMA)) {
+            keyRoot = (ObjectNode) keyRoot.get(FIELD_PAYLOAD);
         }
     }
 
@@ -128,4 +169,37 @@ public class DebeziumBsonRecordParser extends 
DebeziumJsonRecordParser {
     protected String format() {
         return "debezium-bson";
     }
+
+    public boolean checkBeforeExists() {
+        return !isNull(root.get(FIELD_BEFORE));
+    }
+
+    private void processDeleteRecord(String operation, 
List<RichCdcMultiplexRecord> records) {
+        if (checkBeforeExists()) {
+            processRecord(getBefore(operation), RowKind.DELETE, records);
+        } else {
+            // Before version 6.0 of MongoDB, it was not possible to obtain 
'Update Before'
+            // information. Therefore, data is first deleted using the key 'id'
+            JsonNode idNode = null;
+            Preconditions.checkArgument(
+                    !isNull(keyRoot) && !isNull(idNode = 
keyRoot.get(FIELD_KEY_ID)),
+                    "Invalid %s format: missing '%s' field in key when '%s' is 
'%s' for: %s.",
+                    format(),
+                    FIELD_KEY_ID,
+                    FIELD_TYPE,
+                    operation,
+                    keyRoot);
+
+            // Deserialize id from json string to JsonNode
+            Map<String, JsonNode> record =
+                    Collections.singletonMap(
+                            FIELD_OBJECT_ID, fromJson(idNode.asText(), 
JsonNode.class));
+
+            try {
+                processRecord(new TextNode(writeValueAsString(record)), 
RowKind.DELETE, records);
+            } catch (JsonProcessingException e) {
+                throw new RuntimeException("Failed to deserialize key 
record.", e);
+            }
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 66ddc2e1ca..6e06e3adca 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -109,11 +109,11 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
         return records;
     }
 
-    private JsonNode getData() {
+    protected JsonNode getData() {
         return getAndCheck(dataField());
     }
 
-    private JsonNode getBefore(String op) {
+    protected JsonNode getBefore(String op) {
         return getAndCheck(FIELD_BEFORE, FIELD_TYPE, op);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
index 76211cf56d..507c9eb63c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumJsonDeserializationSchema.java
@@ -64,7 +64,14 @@ public class KafkaDebeziumJsonDeserializationSchema
         }
 
         try {
-            return new CdcSourceRecord(objectMapper.readValue(message.value(), 
JsonNode.class));
+            byte[] key = message.key();
+            JsonNode keyNode = null;
+            if (key != null) {
+                keyNode = objectMapper.readValue(key, JsonNode.class);
+            }
+
+            JsonNode valueNode = objectMapper.readValue(message.value(), 
JsonNode.class);
+            return new CdcSourceRecord(null, keyNode, valueNode);
         } catch (Exception e) {
             LOG.error("Invalid Json:\n{}", new String(message.value()));
             throw e;
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
index 16d43821bd..666ef92242 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/BsonValueConvertor.java
@@ -47,8 +47,8 @@ import org.bson.BsonValue;
 import org.bson.types.Decimal128;
 import org.bson.types.ObjectId;
 
-import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,15 +61,22 @@ public class BsonValueConvertor {
         return bsonTimestamp.getTime();
     }
 
-    private static BigDecimal convert(BsonDecimal128 bsonValue) {
+    private static Number convert(BsonDecimal128 bsonValue) {
         return convert(bsonValue.decimal128Value());
     }
 
-    private static BigDecimal convert(Decimal128 bsonValue) {
-        if (bsonValue.isNaN() || bsonValue.isInfinite()) {
-            return null;
+    private static Number convert(Decimal128 bsonValue) {
+        if (bsonValue.isNaN()) {
+            return Double.NaN;
+        } else if (bsonValue.isInfinite()) {
+            if (bsonValue.isNegative()) {
+                return Double.NEGATIVE_INFINITY;
+            } else {
+                return Double.POSITIVE_INFINITY;
+            }
+        } else {
+            return bsonValue.bigDecimalValue();
         }
-        return bsonValue.bigDecimalValue();
     }
 
     private static String convert(BsonObjectId objectId) {
@@ -84,7 +91,7 @@ public class BsonValueConvertor {
         if (BsonBinarySubType.isUuid(bsonBinary.getType())) {
             return bsonBinary.asUuid().toString();
         } else {
-            return toHex(bsonBinary.getData());
+            return Base64.getEncoder().encodeToString(bsonBinary.getData());
         }
     }
 
@@ -99,11 +106,7 @@ public class BsonValueConvertor {
     }
 
     private static Double convert(BsonDouble bsonDouble) {
-        double value = bsonDouble.getValue();
-        if (Double.isNaN(value) || Double.isInfinite(value)) {
-            return null;
-        }
-        return value;
+        return bsonDouble.getValue();
     }
 
     private static String convert(BsonString string) {
@@ -136,8 +139,8 @@ public class BsonValueConvertor {
 
     private static Map<String, Object> convert(BsonJavaScriptWithScope 
javascriptWithScope) {
         return CollectionUtil.map(
-                Pair.of("code", javascriptWithScope.getCode()),
-                Pair.of("scope", convert(javascriptWithScope.getScope())));
+                Pair.of("$code", javascriptWithScope.getCode()),
+                Pair.of("$scope", convert(javascriptWithScope.getScope())));
     }
 
     private static String convert(BsonNull bsonNull) {
@@ -224,19 +227,4 @@ public class BsonValueConvertor {
                         "Unsupported BSON type: " + bsonValue.getBsonType());
         }
     }
-
-    public static String toHex(byte[] bytes) {
-        StringBuilder sb = new StringBuilder();
-
-        for (byte b : bytes) {
-            String s = Integer.toHexString(255 & b);
-            if (s.length() < 2) {
-                sb.append("0");
-            }
-
-            sb.append(s);
-        }
-
-        return sb.toString();
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
new file mode 100644
index 0000000000..78d3bcc3de
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.format.debezium;
+
+import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.action.cdc.format.DataFormat;
+import 
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
+import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.StringUtils;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.TextNode;
+
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test for DebeziumBsonRecordParser. */
+public class DebeziumBsonRecordParserTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
+    private static List<CdcSourceRecord> insertList = new ArrayList<>();
+    private static List<CdcSourceRecord> updateList = new ArrayList<>();
+    private static List<CdcSourceRecord> deleteList = new ArrayList<>();
+
+    private static ArrayList<CdcSourceRecord> bsonRecords = new ArrayList<>();
+    private static ArrayList<CdcSourceRecord> jsonRecords = new ArrayList<>();
+
+    private static Map<String, String> keyEvent = new HashMap<>();
+
+    private static KafkaDeserializationSchema<CdcSourceRecord> 
kafkaDeserializationSchema = null;
+
+    private static Map<String, String> beforeEvent = new HashMap<>();
+
+    private static Map<String, String> afterEvent = new HashMap<>();
+
+    @BeforeAll
+    public static void beforeAll() throws Exception {
+        DataFormat dataFormat = new DebeziumBsonDataFormatFactory().create();
+        kafkaDeserializationSchema = dataFormat.createKafkaDeserializer(null);
+
+        keyEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+
+        beforeEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+        beforeEvent.put("created_at", "1736207571013");
+        beforeEvent.put("created_by", "peter");
+        beforeEvent.put("tags", "[\"pending\"]");
+        beforeEvent.put("updated_at", "1739455297970");
+
+        afterEvent.put("_id", "67ab25755c0d5ac87eb8c632");
+        afterEvent.put("created_at", "1736207571013");
+        afterEvent.put("created_by", "peter");
+        afterEvent.put("tags", "[\"succeed\"]");
+        afterEvent.put("updated_at", "1739455397970");
+
+        String insertRes = "kafka/debezium-bson/table/event/event-insert.txt";
+        String updateRes = "kafka/debezium-bson/table/event/event-update.txt";
+        String deleteRes = "kafka/debezium-bson/table/event/event-delete.txt";
+        String bsonPth = "kafka/debezium-bson/table/event/event-bson.txt";
+        String jsonPath = "kafka/debezium-bson/table/event/event-json.txt";
+
+        parseCdcSourceRecords(insertRes, insertList);
+
+        parseCdcSourceRecords(updateRes, updateList);
+
+        parseCdcSourceRecords(deleteRes, deleteList);
+
+        parseCdcSourceRecords(bsonPth, bsonRecords);
+
+        parseCdcSourceRecords(jsonPath, jsonRecords);
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        insertList.clear();
+        updateList.clear();
+        deleteList.clear();
+        bsonRecords.clear();
+        jsonRecords.clear();
+    }
+
+    private static void parseCdcSourceRecords(String resourcePath, 
List<CdcSourceRecord> records)
+            throws Exception {
+        URL url = 
DebeziumBsonRecordParserTest.class.getClassLoader().getResource(resourcePath);
+        List<String> line = Files.readAllLines(Paths.get(url.toURI()));
+        String key = null;
+        for (String json : line) {
+            if (StringUtils.isNullOrWhitespaceOnly(json) || 
!json.startsWith("{")) {
+                continue;
+            }
+            if (key == null) {
+                key = json;
+            } else {
+                // test kafka deserialization
+                records.add(deserializeKafkaSchema(key, json));
+                key = null;
+            }
+        }
+    }
+
+    @Test
+    public void extractInsertRecord() throws Exception {
+        DebeziumBsonRecordParser parser =
+                new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        Assertions.assertFalse(insertList.isEmpty());
+        for (CdcSourceRecord cdcRecord : insertList) {
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assertions.assertEquals(schema.primaryKeys(), 
Arrays.asList("_id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assertions.assertEquals(records.size(), 1);
+
+            CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+            Assertions.assertEquals(result.kind(), RowKind.INSERT);
+            Assertions.assertEquals(beforeEvent, result.data());
+
+            String dbName = parser.getDatabaseName();
+            Assertions.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assertions.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+
+    @Test
+    public void extractUpdateRecord() throws Exception {
+        DebeziumBsonRecordParser parser =
+                new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        Assertions.assertFalse(updateList.isEmpty());
+        for (CdcSourceRecord cdcRecord : updateList) {
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assertions.assertEquals(schema.primaryKeys(), 
Arrays.asList("_id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assertions.assertEquals(records.size(), 2);
+
+            CdcRecord updateBefore = 
records.get(0).toRichCdcRecord().toCdcRecord();
+            Assertions.assertEquals(updateBefore.kind(), RowKind.DELETE);
+            if (parser.checkBeforeExists()) {
+                Assertions.assertEquals(beforeEvent, updateBefore.data());
+            } else {
+                Assertions.assertEquals(keyEvent, updateBefore.data());
+            }
+
+            CdcRecord updateAfter = 
records.get(1).toRichCdcRecord().toCdcRecord();
+            Assertions.assertEquals(updateAfter.kind(), RowKind.INSERT);
+            Assertions.assertEquals(afterEvent, updateAfter.data());
+
+            String dbName = parser.getDatabaseName();
+            Assertions.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assertions.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+
+    @Test
+    public void extractDeleteRecord() throws Exception {
+        DebeziumBsonRecordParser parser =
+                new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+        Assertions.assertFalse(deleteList.isEmpty());
+        for (CdcSourceRecord cdcRecord : deleteList) {
+            Schema schema = parser.buildSchema(cdcRecord);
+            Assertions.assertEquals(schema.primaryKeys(), 
Arrays.asList("_id"));
+
+            List<RichCdcMultiplexRecord> records = parser.extractRecords();
+            Assertions.assertEquals(records.size(), 1);
+
+            CdcRecord result = records.get(0).toRichCdcRecord().toCdcRecord();
+            Assertions.assertEquals(result.kind(), RowKind.DELETE);
+            if (parser.checkBeforeExists()) {
+                Assertions.assertEquals(beforeEvent, result.data());
+            } else {
+                Assertions.assertEquals(keyEvent, result.data());
+            }
+
+            String dbName = parser.getDatabaseName();
+            Assertions.assertEquals(dbName, "bigdata_test");
+
+            String tableName = parser.getTableName();
+            Assertions.assertEquals(tableName, "sync_test_table");
+
+            MessageQueueCdcTimestampExtractor extractor = new 
MessageQueueCdcTimestampExtractor();
+            Assertions.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+        }
+    }
+
+    @Test
+    public void bsonConvertJsonTest() throws Exception {
+        DebeziumBsonRecordParser parser =
+                new DebeziumBsonRecordParser(TypeMapping.defaultMapping(), 
Collections.emptyList());
+
+        Assertions.assertFalse(jsonRecords.isEmpty());
+        for (int i = 0; i < jsonRecords.size(); i++) {
+            CdcSourceRecord bsonRecord = bsonRecords.get(i);
+            CdcSourceRecord jsonRecord = jsonRecords.get(i);
+
+            JsonNode bsonTextNode =
+                    new 
TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
+            Map<String, String> resultMap = 
parser.extractRowData(bsonTextNode, RowType.builder());
+
+            ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
+
+            expectNode
+                    .fields()
+                    .forEachRemaining(
+                            entry -> {
+                                String key = entry.getKey();
+                                String expectValue = null;
+                                if (!JsonSerdeUtil.isNull(entry.getValue())) {
+                                    expectValue = entry.getValue().asText();
+                                }
+                                Assertions.assertEquals(expectValue, 
resultMap.get(key));
+                            });
+        }
+    }
+
+    private static CdcSourceRecord deserializeKafkaSchema(String key, String 
value)
+            throws Exception {
+        return kafkaDeserializationSchema.deserialize(
+                new ConsumerRecord<>("topic", 0, 0, key.getBytes(), 
value.getBytes()));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
new file mode 100644
index 0000000000..be744a1147
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-bson.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id": {"$oid": "596e275826f08b2730779e1f"},"name": "Sally","age": 
25,"created_time": {"$numberLong": "1735934393769"},"updated_time": 
1735934393769,"deleted_time": {"$date": 1735934393769 },"register_time": 
{"$timestamp": {"t": 1736997330, "i": 2}},"double_nan": {"$numberDouble": 
"NaN"},"double_inf": {"$numberDouble": "Infinity"},"double_ninf": 
{"$numberDouble": "-Infinity"},"double_zero": {"$numberDouble": 
"0"},"boolean_true": true,"boolean_false": false,"array": ["a", "b", 
"c"],"array [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
new file mode 100644
index 0000000000..a1de2bc514
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-delete.txt
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": null,"updateDescription": null,"source": {"version": 
"2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 
1739321992000,"snapshot": "false","db": "bigdata_test","sequence": 
null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": 
"sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": 
null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "\"67ab25755c0d5ac87eb8c632\""}
+{"before": null,"after": null,"updateDescription": null,"source": {"version": 
"2.7.0.Final","connector": "mongodb","name": "mongodb_bigdata_test","ts_ms": 
1739321992000,"snapshot": "false","db": "bigdata_test","sequence": 
null,"ts_us": 1739321992000000,"ts_ns": 1739321992000000000,"collection": 
"sync_test_table","ord": 28,"lsid": null,"txnNumber": null,"wallTime": 
null},"op": "d","ts_ms": 1739321992890,"transaction": null}
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": 
{\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": 
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": 
null,"updateDescription": null,"source": {"version": "2.7.0.Final","connector": 
"mongodb","name": "mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": 
"false","db": "bigdata_test","sequence": null,"ts_us": 
1739321992000000,"ts_ns": 1739321992000000000,"collection": " [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
new file mode 100644
index 0000000000..ff97846015
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-insert.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\": 
\"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": 
\"1736207571013\"},\"created_by\": \"peter\",\"tags\": 
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","updateDescription": 
null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": 
"mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": 
"bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 
1739321992000000000,"collection": " [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
new file mode 100644
index 0000000000..115ee5a91b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-json.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "dummy"}
+{"id":"596e275826f08b2730779e1f","name":"Sally","age":"25","created_time":"1735934393769","updated_time":"1735934393769","deleted_time":"1735934393769","register_time":"1736997330","double_nan":"NaN","double_inf":"Infinity","double_ninf":"-Infinity","double_zero":"0.0","boolean_true":"true","boolean_false":"false","array":"[\"a\",\"b\",\"c\"]","array_mix":"[\"1\",2,1735934393769]","decimal":"3.14","decimal_nan":"NaN","regex":"/^pass$/i","symbol":"symbol","minKey":"BsonMinKey","maxKey":"B
 [...]
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
new file mode 100644
index 0000000000..035ce3ff87
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium-bson/table/event/event-update.txt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": null,"after": "{\"_id\": {\"$oid\": 
\"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": 
\"1736207571013\"},\"created_by\": \"peter\",\"tags\": 
[\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": 
null,"source": {"version": "2.7.0.Final","connector": "mongodb","name": 
"mongodb_bigdata_test","ts_ms": 1739321992000,"snapshot": "false","db": 
"bigdata_test","sequence": null,"ts_us": 1739321992000000,"ts_ns": 
1739321992000000000,"collection": " [...]
+{"id": "{\"$oid\": \"67ab25755c0d5ac87eb8c632\"}"}
+{"before": "{\"_id\": {\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": 
{\"$numberLong\": \"1736207571013\"},\"created_by\": \"peter\",\"tags\": 
[\"pending\"],\"updated_at\": {\"$date\": 1739455297970}}","after": "{\"_id\": 
{\"$oid\": \"67ab25755c0d5ac87eb8c632\"},\"created_at\": {\"$numberLong\": 
\"1736207571013\"},\"created_by\": \"peter\",\"tags\": 
[\"succeed\"],\"updated_at\": {\"$date\": 1739455397970}}","updateDescription": 
null,"source": {"version": "2.7.0.Final","connector [...]


Reply via email to