This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3457a922b [FLINK-36611][pipeline-connector/kafka] Support output 
schema for Kafka debezium-json format
3457a922b is described below

commit 3457a922b5da331243fb3a4a1bc26a1b58e81b36
Author: MOBIN <[email protected]>
AuthorDate: Mon Apr 7 23:04:12 2025 +0800

    [FLINK-36611][pipeline-connector/kafka] Support output schema for Kafka 
debezium-json format
    
    This closes #3791
---
 .../docs/connectors/pipeline-connectors/kafka.md   |  88 +++++-
 .../docs/connectors/pipeline-connectors/kafka.md   |  89 +++++-
 .../flink-cdc-pipeline-connector-kafka/pom.xml     |  23 ++
 .../kafka/json/ChangeLogJsonFormatFactory.java     |  13 +-
 .../DebeziumJsonRowDataSerializationSchema.java    | 163 +++++++++++
 .../debezium/DebeziumJsonSerializationSchema.java  | 317 +++++++++++++++++----
 .../kafka/json/debezium/DebeziumJsonStruct.java    |  87 ++++++
 .../kafka/sink/KafkaDataSinkFactory.java           |   2 +
 .../kafka/sink/KafkaDataSinkOptions.java           |   7 +
 .../utils/JsonRowDataSerializationSchemaUtils.java |  44 +++
 .../DebeziumJsonSerializationSchemaTest.java       | 117 ++++++++
 .../cdc/pipeline/tests/MysqlToKafkaE2eITCase.java  |  76 +++++
 .../mysqlToKafka/debezium-json-with-schema.txt     |  37 +++
 13 files changed, 997 insertions(+), 66 deletions(-)

diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
index 136eb4d4d..419499b75 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/kafka.md
@@ -150,6 +150,13 @@ Pipeline 连接器配置项
       <td>String</td>
       <td>自定义的上游表名到下游 Kafka Topic 名的映射关系。 每个映射关系由 `;` 分割,上游表的 TableId 和下游 
Kafka 的 Topic 名由 `:` 分割。 举个例子,我们可以配置 `sink.tableId-to-topic.mapping` 的值为 
`mydb.mytable1:topic1;mydb.mytable2:topic2`。 </td>
     </tr>
+    <tr>
+      <td>sink.debezium-json.include-schema.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>如果配置了这个参数,每条debezium记录都将包含debezium schema信息。 
只有当`value.format`为`debezium-json`时才生效。 </td>
+    </tr>
     </tbody>
 </table>    
 </div>
@@ -180,6 +187,63 @@ Pipeline 连接器配置项
   }
 }
 ```
+当`sink.debezium-json.include-schema.enabled=true`时,输出示例如下:
+```json
+{
+  "schema":{
+    "type":"struct",
+    "fields":[
+      {
+        "type":"struct",
+        "fields":[
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col1"
+          },
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col2"
+          }
+        ],
+        "optional":true,
+        "field":"before"
+      },
+      {
+        "type":"struct",
+        "fields":[
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col1"
+          },
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col2"
+          }
+        ],
+        "optional":true,
+        "field":"after"
+      }
+    ],
+    "optional":false
+  },
+  "payload":{
+    "before": null,
+    "after": {
+      "col1": "1",
+      "col2": "1"
+    },
+    "op": "c",
+    "source": {
+      "db": "default_namespace",
+      "table": "table1"
+    }
+  }
+}
+```
 
 #### canal-json
 参考 [Canal | Apache 
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
 canal-json 格式会包含 `old`,`data`,`type`,`database`,`table`,`pkNames` 几个元素, 但是 
`ts` 并不会包含在其中。   
@@ -204,12 +268,16 @@ Pipeline 连接器配置项
 
 数据类型映射
 ----------------
+[Literal 
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
 反映数据的实际存储类型 (对应debezium schema中的type字段)<br>
+[Semantic 
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
 反映数据的逻辑类型 (对应对应debezium schema中的name字段)。
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
       <tr>
         <th class="text-left">CDC type</th>
         <th class="text-left">JSON type</th>
+        <th class="text-left">Literal type</th>
+        <th class="text-left">Semantic type</th>
         <th class="text-left" style="width:60%;">NOTE</th>
       </tr>
     </thead>
@@ -217,29 +285,35 @@ Pipeline 连接器配置项
     <tr>
       <td>TINYINT</td>
       <td>TINYINT</td>
+      <td>INT16</td>
       <td></td>
     </tr>
     <tr>
       <td>SMALLINT</td>
       <td>SMALLINT</td>
+      <td>INT16</td>
       <td></td>
     </tr>
     <tr>
       <td>INT</td>
       <td>INT</td>
+      <td>INT32</td>
       <td></td>
     </tr>
     <tr>
       <td>BIGINT</td>
       <td>BIGINT</td>
+      <td>INT64</td>
       <td></td>
     </tr>
     <tr>
+      <td>FLOAT</td>
       <td>FLOAT</td>
       <td>FLOAT</td>
       <td></td>
     </tr>
     <tr>
+      <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td></td>
@@ -247,9 +321,12 @@ Pipeline 连接器配置项
     <tr>
       <td>DECIMAL(p, s)</td>
       <td>DECIMAL(p, s)</td>
+      <td>BYTES</td>
+      <td>org.apache.kafka.connect.data.Decimal</td>
       <td></td>
     </tr>
     <tr>
+      <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td></td>
@@ -257,26 +334,33 @@ Pipeline 连接器配置项
     <tr>
       <td>DATE</td>
       <td>DATE</td>
+      <td>io.debezium.time.Date</td>
       <td></td>
     </tr>
     <tr>
-      <td>TIMESTAMP</td>
-      <td>DATETIME</td>
+      <td>TIMESTAMP(p)</td>
+      <td>TIMESTAMP(p)</td>
+      <td>INT64</td>
+      <td>p <=3 io.debezium.time.Timestamp <br>p >3 
io.debezium.time.MicroTimestamp </td>
       <td></td>
     </tr>
     <tr>
       <td>TIMESTAMP_LTZ</td>
       <td>TIMESTAMP_LTZ</td>
+      <td>STRING</td>
+      <td>io.debezium.time.ZonedTimestamp</td>
       <td></td>
     </tr>
     <tr>
       <td>CHAR(n)</td>
       <td>CHAR(n)</td>
+      <td>STRING</td>
       <td></td>
     </tr>
     <tr>
       <td>VARCHAR(n)</td>
       <td>VARCHAR(n)</td>
+      <td>STRING</td>
       <td></td>
     </tr>
     </tbody>
diff --git a/docs/content/docs/connectors/pipeline-connectors/kafka.md 
b/docs/content/docs/connectors/pipeline-connectors/kafka.md
index 619ade45c..b9cf96e40 100644
--- a/docs/content/docs/connectors/pipeline-connectors/kafka.md
+++ b/docs/content/docs/connectors/pipeline-connectors/kafka.md
@@ -148,6 +148,13 @@ Pipeline Connector Options
       <td>String</td>
       <td>Custom table mappings for each table from upstream tableId to 
downstream Kafka topic. Each mapping is separated by `;`, separate upstream 
tableId and downstream Kafka topic by `:`, For example, we can set 
`sink.tableId-to-topic.mapping` like 
`mydb.mytable1:topic1;mydb.mytable2:topic2`. </td>
     </tr>
+    <tr>
+      <td>sink.debezium-json.include-schema.enabled</td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If this parameter is configured, each debezium record will contain 
debezium schema information. Is only supported when using debezium-json. </td>
+    </tr>
     </tbody>
 </table>    
 </div>
@@ -178,6 +185,63 @@ An output example is:
   }
 }
 ```
+When `sink.debezium-json.include-schema.enabled` is true, the output format 
will be:
+```json
+{
+  "schema":{
+    "type":"struct",
+    "fields":[
+      {
+        "type":"struct",
+        "fields":[
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col1"
+          },
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col2"
+          }
+        ],
+        "optional":true,
+        "field":"before"
+      },
+      {
+        "type":"struct",
+        "fields":[
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col1"
+          },
+          {
+            "type":"string",
+            "optional":true,
+            "field":"col2"
+          }
+        ],
+        "optional":true,
+        "field":"after"
+      }
+    ],
+    "optional":false
+  },
+  "payload":{
+    "before": null,
+    "after": {
+      "col1": "1",
+      "col2": "1"
+    },
+    "op": "c",
+    "source": {
+      "db": "default_namespace",
+      "table": "table1"
+    }
+  }
+}
+```
 
 #### canal-json
 Refer to [Canal | Apache 
Flink](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/#available-metadata),
 canal-json format will contains 
`old`,`data`,`type`,`database`,`table`,`pkNames` elements, but `ts` is not 
included.      
@@ -202,12 +266,16 @@ An output example is:
 
 Data Type Mapping
 ----------------
+[Literal 
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
 defines the physical storage format of data (type field of the debezium 
schema)<br>
+[Semantic 
type](https://debezium.io/documentation/reference/3.1/connectors/mysql.html#mysql-data-types):
 defines the logical meaning of data (name field of the debezium schema).
 <div class="wy-table-responsive">
 <table class="colwidths-auto docutils">
     <thead>
       <tr>
         <th class="text-left">CDC type</th>
         <th class="text-left">JSON type</th>
+        <th class="text-left">Literal type</th>
+        <th class="text-left">Semantic type</th>
         <th class="text-left" style="width:60%;">NOTE</th>
       </tr>
     </thead>
@@ -215,29 +283,35 @@ Data Type Mapping
     <tr>
       <td>TINYINT</td>
       <td>TINYINT</td>
+      <td>INT16</td>
       <td></td>
     </tr>
     <tr>
       <td>SMALLINT</td>
       <td>SMALLINT</td>
+      <td>INT16</td>
       <td></td>
     </tr>
     <tr>
       <td>INT</td>
       <td>INT</td>
+      <td>INT32</td>
       <td></td>
     </tr>
     <tr>
       <td>BIGINT</td>
       <td>BIGINT</td>
+      <td>INT64</td>
       <td></td>
     </tr>
     <tr>
+      <td>FLOAT</td>
       <td>FLOAT</td>
       <td>FLOAT</td>
       <td></td>
     </tr>
     <tr>
+      <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td>DOUBLE</td>
       <td></td>
@@ -245,9 +319,12 @@ Data Type Mapping
     <tr>
       <td>DECIMAL(p, s)</td>
       <td>DECIMAL(p, s)</td>
+      <td>BYTES</td>
+      <td>org.apache.kafka.connect.data.Decimal</td>
       <td></td>
     </tr>
     <tr>
+      <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td>BOOLEAN</td>
       <td></td>
@@ -255,26 +332,34 @@ Data Type Mapping
     <tr>
       <td>DATE</td>
       <td>DATE</td>
+      <td>INT32</td>
+      <td>io.debezium.time.Date</td>
       <td></td>
     </tr>
     <tr>
-      <td>TIMESTAMP</td>
-      <td>TIMESTAMP</td>
+      <td>TIMESTAMP(p)</td>
+      <td>TIMESTAMP(p)</td>
+      <td>INT64</td>
+      <td>p <=3 io.debezium.time.Timestamp <br>p >3 
io.debezium.time.MicroTimestamp </td>
       <td></td>
     </tr>
     <tr>
       <td>TIMESTAMP_LTZ</td>
       <td>TIMESTAMP_LTZ</td>
+      <td>STRING</td>
+      <td>io.debezium.time.ZonedTimestamp</td>
       <td></td>
     </tr>
     <tr>
       <td>CHAR(n)</td>
       <td>CHAR(n)</td>
+      <td>STRING</td>
       <td></td>
     </tr>
     <tr>
       <td>VARCHAR(n)</td>
       <td>VARCHAR(n)</td>
+      <td>STRING</td>
       <td></td>
     </tr>
     </tbody>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index 2c4f474ca..dee1f2f46 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -29,6 +29,7 @@ limitations under the License.
     <artifactId>flink-cdc-pipeline-connector-kafka</artifactId>
 
     <properties>
+        <version.kafka>3.4.0</version.kafka>
     </properties>
 
     <dependencies>
@@ -74,6 +75,25 @@ limitations under the License.
             <version>${flink.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>${debezium.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${version.kafka}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-json</artifactId>
+            <version>${version.kafka}</version>
+        </dependency>
     </dependencies>
 
     <build>
@@ -101,6 +121,9 @@ limitations under the License.
                                 <relocation>
                                     <pattern>org.apache.kafka</pattern>
                                     
<shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>
+                                    <excludes>
+                                        
<exclude>org/apache/kafka/connect/data/**</exclude>
+                                    </excludes>
                                 </relocation>
                                 <relocation>
                                     
<pattern>org.apache.flink.connector.kafka</pattern>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
index ec336d1c5..3b55a05ba 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.kafka.json;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.utils.Preconditions;
 import 
org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
 import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
@@ -30,6 +31,7 @@ import org.apache.flink.formats.json.JsonFormatOptionsUtil;
 
 import java.time.ZoneId;
 
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static 
org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER;
 import static 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL;
 
@@ -50,6 +52,14 @@ public class ChangeLogJsonFormatFactory {
      */
     public static SerializationSchema<Event> createSerializationSchema(
             ReadableConfig formatOptions, JsonSerializationType type, ZoneId 
zoneId) {
+        boolean isIncludedDebeziumSchema =
+                Boolean.parseBoolean(
+                        
formatOptions.toMap().get(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()));
+        Preconditions.checkArgument(
+                !(isIncludedDebeziumSchema && 
!type.equals(JsonSerializationType.DEBEZIUM_JSON)),
+                SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key()
+                        + " is only supported when using debezium-json.");
+
         TimestampFormat timestampFormat = 
JsonFormatOptionsUtil.getTimestampFormat(formatOptions);
         JsonFormatOptions.MapNullKeyMode mapNullKeyMode =
                 JsonFormatOptionsUtil.getMapNullKeyMode(formatOptions);
@@ -70,7 +80,8 @@ public class ChangeLogJsonFormatFactory {
                             mapNullKeyLiteral,
                             zoneId,
                             encodeDecimalAsPlainNumber,
-                            ignoreNullFields);
+                            ignoreNullFields,
+                            isIncludedDebeziumSchema);
                 }
             case CANAL_JSON:
                 {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
new file mode 100644
index 000000000..349dee4b8
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonRowDataSerializationSchema.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.debezium;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonParserRowDataDeserializationSchema;
+import org.apache.flink.formats.json.JsonRowDataDeserializationSchema;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an object of Flink internal data 
structure into a JSON
+ * bytes.
+ *
+ * <p>Serializes the input Flink object into a JSON string and converts it 
into <code>byte[]</code>.
+ *
+ * <p>Result <code>byte[]</code> messages can be deserialized using {@link
+ * JsonRowDataDeserializationSchema} or {@link 
JsonParserRowDataDeserializationSchema}.
+ */
+public class DebeziumJsonRowDataSerializationSchema implements 
SerializationSchema<RowData> {
+    private static final long serialVersionUID = 1L;
+
+    /** RowType to generate the runtime converter. */
+    private final RowType rowType;
+
+    /** The converter that converts internal data formats to JsonNode. */
+    private final RowDataToJsonConverters.RowDataToJsonConverter 
runtimeConverter;
+
+    /** Object mapper that is used to create output JSON objects. */
+    private transient ObjectMapper mapper;
+
+    /** Reusable object node. */
+    private transient ObjectNode node;
+
+    /** Timestamp format specification which is used to parse timestamp. */
+    private final TimestampFormat timestampFormat;
+
+    /** The handling mode when serializing null keys for map data. */
+    private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
+
+    /** The string literal when handling mode for map null key LITERAL. */
+    private final String mapNullKeyLiteral;
+
+    /** Flag indicating whether to serialize all decimals as plain numbers. */
+    private final boolean encodeDecimalAsPlainNumber;
+
+    /** Flag indicating whether to ignore null fields. */
+    private final boolean ignoreNullFields;
+
+    private final boolean isIncludedDebeziumSchema;
+
+    public DebeziumJsonRowDataSerializationSchema(
+            RowType rowType,
+            TimestampFormat timestampFormat,
+            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            boolean encodeDecimalAsPlainNumber,
+            boolean ignoreNullFields,
+            boolean isIncludedDebeziumSchema) {
+        this.rowType = rowType;
+        this.timestampFormat = timestampFormat;
+        this.mapNullKeyMode = mapNullKeyMode;
+        this.mapNullKeyLiteral = mapNullKeyLiteral;
+        this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+        this.runtimeConverter =
+                
JsonRowDataSerializationSchemaUtils.createRowDataToJsonConverters(
+                                timestampFormat,
+                                mapNullKeyMode,
+                                mapNullKeyLiteral,
+                                ignoreNullFields)
+                        .createConverter(rowType);
+        this.ignoreNullFields = ignoreNullFields;
+        this.isIncludedDebeziumSchema = isIncludedDebeziumSchema;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {
+        mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        .configure(
+                                
JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN,
+                                encodeDecimalAsPlainNumber);
+    }
+
+    @Override
+    public byte[] serialize(RowData row) {
+        if (node == null || ignoreNullFields) {
+            node = mapper.createObjectNode();
+        }
+
+        try {
+            runtimeConverter.convert(mapper, node, row);
+            if (isIncludedDebeziumSchema) {
+                // schema is a nested json string, asText() can return a pure 
string without other
+                // escape characters such as "\"
+                String schemaValue = node.get("schema").asText();
+                JsonNode schemaNode = mapper.readTree(schemaValue);
+                node.set("schema", schemaNode);
+                return mapper.writeValueAsBytes(node);
+            }
+            return mapper.writeValueAsBytes(node);
+        } catch (Throwable t) {
+            throw new RuntimeException(String.format("Could not serialize row 
'%s'.", row), t);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DebeziumJsonRowDataSerializationSchema that = 
(DebeziumJsonRowDataSerializationSchema) o;
+        return rowType.equals(that.rowType)
+                && timestampFormat.equals(that.timestampFormat)
+                && mapNullKeyMode.equals(that.mapNullKeyMode)
+                && mapNullKeyLiteral.equals(that.mapNullKeyLiteral)
+                && encodeDecimalAsPlainNumber == 
that.encodeDecimalAsPlainNumber
+                && ignoreNullFields == that.ignoreNullFields
+                && isIncludedDebeziumSchema == that.isIncludedDebeziumSchema;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                rowType,
+                timestampFormat,
+                mapNullKeyMode,
+                mapNullKeyLiteral,
+                encodeDecimalAsPlainNumber,
+                ignoreNullFields,
+                isIncludedDebeziumSchema);
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index 238c2121d..b1945b5c1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -23,14 +23,15 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.types.DecimalType;
+import org.apache.flink.cdc.common.types.TimestampType;
 import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
-import 
org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
-import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
@@ -38,11 +39,33 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import io.debezium.data.Bits;
+import io.debezium.time.Date;
+import io.debezium.time.MicroTime;
+import io.debezium.time.MicroTimestamp;
+import io.debezium.time.Timestamp;
+import io.debezium.time.ZonedTimestamp;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+
 import java.time.ZoneId;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
 
 import static java.lang.String.format;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.AFTER;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.BEFORE;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.OPERATION;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumPayload.SOURCE;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.DATABASE;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumSource.TABLE;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.PAYLOAD;
+import static 
org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonStruct.DebeziumStruct.SCHEMA;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 
 /**
@@ -65,6 +88,8 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
 
     private transient GenericRowData reuseGenericRowData;
 
+    private transient GenericRowData payloadGenericRowData;
+
     private final TimestampFormat timestampFormat;
 
     private final JsonFormatOptions.MapNullKeyMode mapNullKeyMode;
@@ -75,17 +100,24 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
 
     private final boolean ignoreNullFields;
 
+    private final boolean isIncludedDebeziumSchema;
+
     private final ZoneId zoneId;
 
     private InitializationContext context;
 
+    private Map<TableId, String> schemaMap = new HashMap<>();
+
+    JsonConverter jsonConverter;
+
     public DebeziumJsonSerializationSchema(
             TimestampFormat timestampFormat,
             JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
             String mapNullKeyLiteral,
             ZoneId zoneId,
             boolean encodeDecimalAsPlainNumber,
-            boolean ignoreNullFields) {
+            boolean ignoreNullFields,
+            boolean isIncludedDebeziumSchema) {
         this.timestampFormat = timestampFormat;
         this.mapNullKeyMode = mapNullKeyMode;
         this.mapNullKeyLiteral = mapNullKeyLiteral;
@@ -93,11 +125,23 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
         this.zoneId = zoneId;
         jsonSerializers = new HashMap<>();
         this.ignoreNullFields = ignoreNullFields;
+        this.isIncludedDebeziumSchema = isIncludedDebeziumSchema;
     }
 
     @Override
     public void open(InitializationContext context) {
-        reuseGenericRowData = new GenericRowData(4);
+        if (isIncludedDebeziumSchema) {
+            reuseGenericRowData = new GenericRowData(2);
+            payloadGenericRowData = new GenericRowData(4);
+            reuseGenericRowData.setField(PAYLOAD.getPosition(), 
payloadGenericRowData);
+
+            this.jsonConverter = new JsonConverter();
+            final HashMap<String, Object> configs = new HashMap<>(2);
+            configs.put(ConverterConfig.TYPE_CONFIG, 
ConverterType.VALUE.getName());
+            jsonConverter.configure(configs);
+        } else {
+            reuseGenericRowData = new GenericRowData(4);
+        }
         this.context = context;
     }
 
@@ -115,16 +159,22 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
                                 
jsonSerializers.get(schemaChangeEvent.tableId()).getSchema(),
                                 schemaChangeEvent);
             }
+
+            if (isIncludedDebeziumSchema) {
+                schemaMap.put(schemaChangeEvent.tableId(), 
convertSchemaToDebeziumSchema(schema));
+            }
             LogicalType rowType =
                     
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
-            JsonRowDataSerializationSchema jsonSerializer =
-                    
JsonRowDataSerializationSchemaUtils.createSerializationSchema(
-                            createJsonRowType(fromLogicalToDataType(rowType)),
+            DebeziumJsonRowDataSerializationSchema jsonSerializer =
+                    new DebeziumJsonRowDataSerializationSchema(
+                            createJsonRowType(
+                                    fromLogicalToDataType(rowType), 
isIncludedDebeziumSchema),
                             timestampFormat,
                             mapNullKeyMode,
                             mapNullKeyLiteral,
                             encodeDecimalAsPlainNumber,
-                            ignoreNullFields);
+                            ignoreNullFields,
+                            isIncludedDebeziumSchema);
             try {
                 jsonSerializer.open(context);
             } catch (Exception e) {
@@ -138,81 +188,226 @@ public class DebeziumJsonSerializationSchema implements 
SerializationSchema<Even
         }
 
         DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
-        reuseGenericRowData.setField(
-                3,
-                GenericRowData.of(
-                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
-                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+        BiConsumer<DataChangeEvent, GenericRowData> converter;
         try {
             switch (dataChangeEvent.op()) {
                 case INSERT:
-                    reuseGenericRowData.setField(0, null);
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_INSERT);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertInsertEventToRowData;
+                    break;
                 case DELETE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(1, null);
-                    reuseGenericRowData.setField(2, OP_DELETE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertDeleteEventToRowData;
+                    break;
                 case UPDATE:
                 case REPLACE:
-                    reuseGenericRowData.setField(
-                            0,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.before(), false));
-                    reuseGenericRowData.setField(
-                            1,
-                            jsonSerializers
-                                    .get(dataChangeEvent.tableId())
-                                    
.getRowDataFromRecordData(dataChangeEvent.after(), false));
-                    reuseGenericRowData.setField(2, OP_UPDATE);
-                    return jsonSerializers
-                            .get(dataChangeEvent.tableId())
-                            .getSerializationSchema()
-                            .serialize(reuseGenericRowData);
+                    converter = this::convertUpdateEventToRowData;
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             format(
                                     "Unsupported operation '%s' for 
OperationType.",
                                     dataChangeEvent.op()));
             }
+
+            if (isIncludedDebeziumSchema) {
+                converter.accept(dataChangeEvent, payloadGenericRowData);
+                reuseGenericRowData.setField(
+                        SCHEMA.getPosition(),
+                        
StringData.fromString(schemaMap.get(dataChangeEvent.tableId())));
+            } else {
+                converter.accept(dataChangeEvent, reuseGenericRowData);
+            }
+            return jsonSerializers
+                    .get(dataChangeEvent.tableId())
+                    .getSerializationSchema()
+                    .serialize(reuseGenericRowData);
         } catch (Throwable t) {
             throw new RuntimeException(format("Could not serialize event 
'%s'.", event), t);
         }
     }
 
+    /**
+     * convert CDC {@link Schema} to Debezium schema.
+     *
+     * @param schema CDC schema
+     * @return Debezium schema json string
+     */
+    public String convertSchemaToDebeziumSchema(Schema schema) {
+        List<Column> columns = schema.getColumns();
+        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
+        SchemaBuilder beforeBuilder = SchemaBuilder.struct();
+        SchemaBuilder afterBuilder = SchemaBuilder.struct();
+        for (Column column : columns) {
+            SchemaBuilder field = convertCDCDataTypeToDebeziumDataType(column);
+            beforeBuilder.field(column.getName(), field).optional();
+            afterBuilder.field(column.getName(), field).optional();
+        }
+        schemaBuilder.field("before", beforeBuilder);
+        schemaBuilder.field("after", afterBuilder);
+        schemaBuilder.build();
+        return jsonConverter.asJsonSchema(schemaBuilder).toString();
+    }
+
+    private static SchemaBuilder convertCDCDataTypeToDebeziumDataType(Column 
column) {
+        org.apache.flink.cdc.common.types.DataType columnType = 
column.getType();
+        final SchemaBuilder field;
+        switch (columnType.getTypeRoot()) {
+            case TINYINT:
+            case SMALLINT:
+                field = SchemaBuilder.int16();
+                break;
+            case INTEGER:
+                field = SchemaBuilder.int32();
+                break;
+            case BIGINT:
+                field = SchemaBuilder.int64();
+                break;
+            case DECIMAL:
+                final int decimalPrecision = ((DecimalType) 
columnType).getPrecision();
+                final int decimalScale = ((DecimalType) columnType).getScale();
+                field =
+                        Decimal.builder(decimalScale)
+                                .parameter(
+                                        "connect.decimal.precision",
+                                        String.valueOf(decimalPrecision));
+                break;
+            case BOOLEAN:
+                field = SchemaBuilder.bool();
+                break;
+            case FLOAT:
+                field = SchemaBuilder.float32();
+                break;
+            case DOUBLE:
+                field = SchemaBuilder.float64();
+                break;
+            case DATE:
+                field = 
SchemaBuilder.int32().name(Date.SCHEMA_NAME).version(1);
+                break;
+            case TIME_WITHOUT_TIME_ZONE:
+                field = 
SchemaBuilder.int64().name(MicroTime.SCHEMA_NAME).version(1);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_TIME_ZONE:
+                int timestampPrecisionPrecision = ((TimestampType) 
columnType).getPrecision();
+                if (timestampPrecisionPrecision > 3) {
+                    field = 
SchemaBuilder.int64().name(MicroTimestamp.SCHEMA_NAME).version(1);
+                } else {
+                    field = 
SchemaBuilder.int64().name(Timestamp.SCHEMA_NAME).version(1);
+                }
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                field = 
SchemaBuilder.string().name(ZonedTimestamp.SCHEMA_NAME).version(1);
+                break;
+            case BINARY:
+            case VARBINARY:
+                field =
+                        SchemaBuilder.bytes()
+                                .name(Bits.LOGICAL_NAME)
+                                .parameter(
+                                        Bits.LENGTH_FIELD,
+                                        Integer.toString(
+                                                
org.apache.flink.cdc.common.types.DataTypes
+                                                        .getLength(columnType)
+                                                        .orElse(0)))
+                                .version(1);
+                break;
+            case CHAR:
+            case VARCHAR:
+            default:
+                field = SchemaBuilder.string();
+        }
+
+        if (columnType.isNullable()) {
+            field.optional();
+        } else {
+            field.required();
+        }
+        if (column.getDefaultValueExpression() != null) {
+            field.defaultValue(column.getDefaultValueExpression());
+        }
+        if (column.getComment() != null) {
+            field.doc(column.getComment());
+        }
+        return field;
+    }
+
+    private void convertInsertEventToRowData(
+            DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+        genericRowData.setField(BEFORE.getPosition(), null);
+        genericRowData.setField(
+                AFTER.getPosition(),
+                jsonSerializers
+                        .get(dataChangeEvent.tableId())
+                        .getRowDataFromRecordData(dataChangeEvent.after(), 
false));
+        genericRowData.setField(OPERATION.getPosition(), OP_INSERT);
+        genericRowData.setField(
+                SOURCE.getPosition(),
+                GenericRowData.of(
+                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+    }
+
+    private void convertDeleteEventToRowData(
+            DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+        genericRowData.setField(
+                BEFORE.getPosition(),
+                jsonSerializers
+                        .get(dataChangeEvent.tableId())
+                        .getRowDataFromRecordData(dataChangeEvent.before(), 
false));
+        genericRowData.setField(AFTER.getPosition(), null);
+        genericRowData.setField(OPERATION.getPosition(), OP_DELETE);
+        genericRowData.setField(
+                SOURCE.getPosition(),
+                GenericRowData.of(
+                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+    }
+
+    private void convertUpdateEventToRowData(
+            DataChangeEvent dataChangeEvent, GenericRowData genericRowData) {
+        genericRowData.setField(
+                BEFORE.getPosition(),
+                jsonSerializers
+                        .get(dataChangeEvent.tableId())
+                        .getRowDataFromRecordData(dataChangeEvent.before(), 
false));
+        genericRowData.setField(
+                AFTER.getPosition(),
+                jsonSerializers
+                        .get(dataChangeEvent.tableId())
+                        .getRowDataFromRecordData(dataChangeEvent.after(), 
false));
+        genericRowData.setField(OPERATION.getPosition(), OP_UPDATE);
+        genericRowData.setField(
+                SOURCE.getPosition(),
+                GenericRowData.of(
+                        
StringData.fromString(dataChangeEvent.tableId().getSchemaName()),
+                        
StringData.fromString(dataChangeEvent.tableId().getTableName())));
+    }
+
     /**
      * Refer to <a
      * 
href="https://debezium.io/documentation/reference/1.9/connectors/mysql.html";>Debezium
      * docs</a> for more details.
      */
-    private static RowType createJsonRowType(DataType databaseSchema) {
-        return (RowType)
+    private static RowType createJsonRowType(
+            DataType databaseSchema, boolean isIncludedDebeziumSchema) {
+        DataType payloadRowType =
                 DataTypes.ROW(
-                                DataTypes.FIELD("before", databaseSchema),
-                                DataTypes.FIELD("after", databaseSchema),
-                                DataTypes.FIELD("op", DataTypes.STRING()),
-                                DataTypes.FIELD(
-                                        "source",
-                                        DataTypes.ROW(
-                                                DataTypes.FIELD("db", 
DataTypes.STRING()),
-                                                DataTypes.FIELD("table", 
DataTypes.STRING()))))
-                        .getLogicalType();
+                        DataTypes.FIELD(BEFORE.getFieldName(), databaseSchema),
+                        DataTypes.FIELD(AFTER.getFieldName(), databaseSchema),
+                        DataTypes.FIELD(OPERATION.getFieldName(), 
DataTypes.STRING()),
+                        DataTypes.FIELD(
+                                SOURCE.getFieldName(),
+                                DataTypes.ROW(
+                                        DataTypes.FIELD(
+                                                DATABASE.getFieldName(), 
DataTypes.STRING()),
+                                        DataTypes.FIELD(
+                                                TABLE.getFieldName(), 
DataTypes.STRING()))));
+        if (isIncludedDebeziumSchema) {
+            return (RowType)
+                    DataTypes.ROW(
+                                    DataTypes.FIELD(SCHEMA.getFieldName(), 
DataTypes.STRING()),
+                                    DataTypes.FIELD(PAYLOAD.getFieldName(), 
payloadRowType))
+                            .getLogicalType();
+        }
+        return (RowType) payloadRowType.getLogicalType();
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
new file mode 100644
index 000000000..e1c314b9b
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonStruct.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.json.debezium;
+
+/** Debezium JSON struct. */
+public class DebeziumJsonStruct {
+
+    enum DebeziumStruct {
+        SCHEMA(0, "schema"),
+        PAYLOAD(1, "payload");
+
+        private final int position;
+        private final String fieldName;
+
+        DebeziumStruct(int position, String fieldName) {
+            this.position = position;
+            this.fieldName = fieldName;
+        }
+
+        public int getPosition() {
+            return position;
+        }
+
+        public String getFieldName() {
+            return fieldName;
+        }
+    }
+
+    enum DebeziumPayload {
+        BEFORE(0, "before"),
+        AFTER(1, "after"),
+        OPERATION(2, "op"),
+        SOURCE(3, "source");
+
+        private final int position;
+        private final String fieldName;
+
+        DebeziumPayload(int position, String fieldName) {
+            this.position = position;
+            this.fieldName = fieldName;
+        }
+
+        public int getPosition() {
+            return position;
+        }
+
+        public String getFieldName() {
+            return fieldName;
+        }
+    }
+
+    enum DebeziumSource {
+        DATABASE(0, "db"),
+        TABLE(1, "table");
+
+        private final int position;
+        private final String fieldName;
+
+        DebeziumSource(int position, String fieldName) {
+            this.position = position;
+            this.fieldName = fieldName;
+        }
+
+        public int getPosition() {
+            return position;
+        }
+
+        public String getFieldName() {
+            return fieldName;
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
index 6a232849c..05090b30b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkFactory.java
@@ -40,6 +40,7 @@ import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PA
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.PROPERTIES_PREFIX;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_ADD_TABLEID_TO_HEADER_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_CUSTOM_HEADER;
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.TOPIC;
 import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.VALUE_FORMAT;
@@ -131,6 +132,7 @@ public class KafkaDataSinkFactory implements 
DataSinkFactory {
         options.add(SINK_CUSTOM_HEADER);
         options.add(KafkaDataSinkOptions.DELIVERY_GUARANTEE);
         options.add(SINK_TABLE_ID_TO_TOPIC_MAPPING);
+        options.add(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
index 1b7bbf9bd..a8eeed8cf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkOptions.java
@@ -100,4 +100,11 @@ public class KafkaDataSinkOptions {
                                     .text(
                                             ". For example, we can set 
'sink.tableId-to-topic.mappingg' like 
'mydb.mytable1:topic1;mydb.mytable2:topic2'.")
                                     .build());
+
+    public static final ConfigOption<Boolean> 
SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED =
+            key("sink.debezium-json.include-schema.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Optional. If this parameter is configured, each 
debezium record will contain debezium schema information. Is only supported 
when using debezium-json.");
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
index e021e0f1b..0cc850349 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
 import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.lang.reflect.Constructor;
@@ -86,6 +87,49 @@ public class JsonRowDataSerializationSchemaUtils {
                 "Failed to find appropriate constructor for 
JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 
1.20.");
     }
 
+    /**
+     * In flink>=1.20, the constructor of RowDataToJsonConverters has 4 
parameters, and in
+     * flink<1.20, the constructor of RowDataToJsonConverters has 3 parameters.
+     */
+    public static RowDataToJsonConverters createRowDataToJsonConverters(
+            TimestampFormat timestampFormat,
+            JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+            String mapNullKeyLiteral,
+            boolean ignoreNullFields) {
+        try {
+            Class<?>[] fullParams =
+                    new Class[] {
+                        TimestampFormat.class,
+                        JsonFormatOptions.MapNullKeyMode.class,
+                        String.class,
+                        boolean.class
+                    };
+
+            Object[] fullParamValues =
+                    new Object[] {
+                        timestampFormat, mapNullKeyMode, mapNullKeyLiteral, 
ignoreNullFields
+                    };
+
+            for (int i = fullParams.length; i >= 3; i--) {
+                try {
+                    Constructor<?> constructor =
+                            RowDataToJsonConverters.class.getConstructor(
+                                    Arrays.copyOfRange(fullParams, 0, i));
+
+                    return (RowDataToJsonConverters)
+                            
constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
+                } catch (NoSuchMethodException ignored) {
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to create RowDataToJsonConverters,please check 
your Flink version is 1.19 or 1.20.",
+                    e);
+        }
+        throw new RuntimeException(
+                "Failed to find appropriate constructor for 
RowDataToJsonConverters,please check your Flink version is 1.19 or 1.20.");
+    }
+
     /** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
     public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) 
{
         try {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
index f4826ca80..4c9933bf2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchemaTest.java
@@ -18,6 +18,9 @@
 package org.apache.flink.cdc.connectors.kafka.json.debezium;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
@@ -40,7 +43,13 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.math.BigDecimal;
+import java.time.Instant;
 import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.cdc.connectors.kafka.sink.KafkaDataSinkOptions.SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED;
 
 /** Tests for {@link DebeziumJsonSerializationSchema}. */
 class DebeziumJsonSerializationSchemaTest {
@@ -129,4 +138,112 @@ class DebeziumJsonSerializationSchemaTest {
         actual = mapper.readTree(serializationSchema.serialize(updateEvent));
         Assertions.assertThat(actual).isEqualTo(expected);
     }
+
+    @Test
+    public void testSerializeWithSchemaAllDataTypes() throws Exception {
+        ObjectMapper mapper =
+                JacksonMapperFactory.createObjectMapper()
+                        
.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, false);
+        Map<String, String> properties = new HashMap<>();
+        properties.put(SINK_DEBEZIUM_JSON_INCLUDE_SCHEMA_ENABLED.key(), 
"true");
+        Configuration configuration = Configuration.fromMap(properties);
+        SerializationSchema<Event> serializationSchema =
+                ChangeLogJsonFormatFactory.createSerializationSchema(
+                        configuration, JsonSerializationType.DEBEZIUM_JSON, 
ZoneId.systemDefault());
+        serializationSchema.open(new MockInitializationContext());
+        // create table
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("_boolean", DataTypes.BOOLEAN(), 
"_boolean comment")
+                        .physicalColumn("_binary", DataTypes.BINARY(3))
+                        .physicalColumn("_varbinary", DataTypes.VARBINARY(10))
+                        .physicalColumn("_bytes", DataTypes.BYTES())
+                        .physicalColumn("_tinyint", DataTypes.TINYINT())
+                        .physicalColumn("_smallint", DataTypes.SMALLINT())
+                        .physicalColumn("_int", DataTypes.INT())
+                        .physicalColumn("_bigint", DataTypes.BIGINT())
+                        .physicalColumn("_float", DataTypes.FLOAT())
+                        .physicalColumn("_double", DataTypes.DOUBLE())
+                        .physicalColumn("_decimal", DataTypes.DECIMAL(6, 3))
+                        .physicalColumn("_char", DataTypes.CHAR(5))
+                        .physicalColumn("_varchar", DataTypes.VARCHAR(10))
+                        .physicalColumn("_string", DataTypes.STRING())
+                        .physicalColumn("_date", DataTypes.DATE())
+                        .physicalColumn("_time", DataTypes.TIME())
+                        .physicalColumn("_time_6", DataTypes.TIME(6))
+                        .physicalColumn("_timestamp", DataTypes.TIMESTAMP())
+                        .physicalColumn("_timestamp_3", DataTypes.TIMESTAMP(3))
+                        .physicalColumn("_timestamp_ltz", 
DataTypes.TIMESTAMP_LTZ())
+                        .physicalColumn("_timestamp_ltz_3", 
DataTypes.TIMESTAMP_LTZ(3))
+                        .physicalColumn("pt", DataTypes.STRING())
+                        .primaryKey("pt")
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        DataTypes.BOOLEAN(),
+                        DataTypes.BINARY(3),
+                        DataTypes.VARBINARY(10),
+                        DataTypes.BYTES(),
+                        DataTypes.TINYINT(),
+                        DataTypes.SMALLINT(),
+                        DataTypes.INT(),
+                        DataTypes.BIGINT(),
+                        DataTypes.FLOAT(),
+                        DataTypes.DOUBLE(),
+                        DataTypes.DECIMAL(6, 3),
+                        DataTypes.CHAR(5),
+                        DataTypes.VARCHAR(10),
+                        DataTypes.STRING(),
+                        DataTypes.DATE(),
+                        DataTypes.TIME(),
+                        DataTypes.TIME(6),
+                        DataTypes.TIMESTAMP(),
+                        DataTypes.TIMESTAMP(3),
+                        DataTypes.TIMESTAMP_LTZ(),
+                        DataTypes.TIMESTAMP_LTZ(3),
+                        DataTypes.STRING());
+
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, 
schema);
+        Assertions.assertNull(serializationSchema.serialize(createTableEvent));
+        BinaryRecordDataGenerator generator = new 
BinaryRecordDataGenerator(rowType);
+        // insert
+        DataChangeEvent insertEvent1 =
+                DataChangeEvent.insertEvent(
+                        TABLE_1,
+                        generator.generate(
+                                new Object[] {
+                                    true,
+                                    new byte[] {1, 2},
+                                    new byte[] {3, 4},
+                                    new byte[] {5, 6, 7},
+                                    (byte) 1,
+                                    (short) 2,
+                                    3,
+                                    4L,
+                                    5.1f,
+                                    6.2,
+                                    DecimalData.fromBigDecimal(new 
BigDecimal("7.123"), 6, 3),
+                                    BinaryStringData.fromString("test1"),
+                                    BinaryStringData.fromString("test2"),
+                                    BinaryStringData.fromString("test3"),
+                                    100,
+                                    200,
+                                    300,
+                                    TimestampData.fromTimestamp(
+                                            
java.sql.Timestamp.valueOf("2023-01-01 00:00:00.000")),
+                                    TimestampData.fromTimestamp(
+                                            
java.sql.Timestamp.valueOf("2023-01-01 00:00:00")),
+                                    LocalZonedTimestampData.fromInstant(
+                                            
Instant.parse("2023-01-01T00:00:00.000Z")),
+                                    LocalZonedTimestampData.fromInstant(
+                                            
Instant.parse("2023-01-01T00:00:00.000Z")),
+                                    null
+                                }));
+        JsonNode expected =
+                mapper.readTree(
+                        
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"boolean\",\"optional\":true,\"doc\":\"_boolean
 
comment\",\"field\":\"_boolean\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"3\"},\"field\":\"_binary\"},{\"type\":\"bytes\",\"optional\":true,\"name\":\"io.debezium.data.Bits\",\"version\":1,\"parameters\":{\"length\":\"10\"},\"field\":\"_varbinary\"},{\"t
 [...]
+        JsonNode actual = 
mapper.readTree(serializationSchema.serialize(insertEvent1));
+        Assertions.assertEquals(expected, actual);
+    }
 }
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
index bdc313b35..e85924558 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
@@ -301,6 +301,82 @@ class MysqlToKafkaE2eITCase extends 
PipelineTestEnvironment {
                 
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
     }
 
+    @Test
+    public void testSyncWholeDatabaseWithDebeziumJsonHasSchema() throws 
Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.\\.*\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "\n"
+                                + "sink:\n"
+                                + "  type: kafka\n"
+                                + "  properties.bootstrap.servers: 
kafka:9092\n"
+                                + "  topic: %s\n"
+                                + "  
sink.debezium-json.include-schema.enabled: true\n"
+                                + "\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        mysqlInventoryDatabase.getDatabaseName(),
+                        topic,
+                        parallelism);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path kafkaCdcJar = 
TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+        List<ConsumerRecord<byte[], byte[]>> collectedRecords = new 
ArrayList<>();
+        int expectedEventCount = 13;
+        waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+        List<String> expectedRecords =
+                
getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json-with-schema.txt");
+        
assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
+        LOG.info("Begin incremental reading stage.");
+        // generate binlogs
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s",
+                        MYSQL.getHost(),
+                        MYSQL.getDatabasePort(),
+                        mysqlInventoryDatabase.getDatabaseName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stat = conn.createStatement()) {
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            // modify table schema
+            stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
+            stat.execute(
+                    "INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null, 1);"); // 110
+            stat.execute(
+                    "INSERT INTO products VALUES (default,'scooter','Big 
2-wheel scooter ',5.18, null, null, null, 1);"); // 111
+            stat.execute(
+                    "UPDATE products SET description='new water resistent 
white wind breaker', weight='0.5' WHERE id=110;");
+            stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+            stat.execute("DELETE FROM products WHERE id=111;");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        expectedEventCount = 20;
+        waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+        assertThat(expectedRecords)
+                
.containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
+    }
+
     private void waitUntilSpecificEventCount(
             List<ConsumerRecord<byte[], byte[]>> actualEvent, int 
expectedCount) throws Exception {
         boolean result = false;
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
new file mode 100644
index 000000000..deeed6cb5
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json-with-schema.txt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"field":"phone_number"}],"optional":true,"field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"}],"optional":true,"field":"before"},{"
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
 [...]
+{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"default":"flink","field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"float","optional":true,"field":"weight"},{"type":"string","optional":true,"default":"red","field":"enum_c"},{"type":"string","optional":true,"field":"json_c"},{"type":"string","optional":true,"field":"point_c"},{"type":"int32","optional":true,"fiel
 [...]
\ No newline at end of file

Reply via email to