This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 23d4c683c9 [cdc] mysql cdc supports table comment changes. (#5258)
23d4c683c9 is described below

commit 23d4c683c9dbe7efc204cbf6a8d183cd09522b1e
Author: Kerwin <[email protected]>
AuthorDate: Tue Mar 18 18:27:05 2025 +0800

    [cdc] mysql cdc supports table comment changes. (#5258)
---
 docs/content/cdc-ingestion/mysql-cdc.md            |   5 +-
 .../cdc/format/AbstractJsonRecordParser.java       |  18 +-
 .../action/cdc/format/AbstractRecordParser.java    |  28 +--
 .../cdc/format/aliyun/AliyunRecordParser.java      |   8 +-
 .../action/cdc/format/canal/CanalRecordParser.java |  10 +-
 .../format/debezium/DebeziumAvroRecordParser.java  |  14 +-
 .../format/debezium/DebeziumBsonRecordParser.java  |   8 +-
 .../format/debezium/DebeziumJsonRecordParser.java  |  10 +-
 .../mongodb/strategy/Mongo4VersionStrategy.java    |  16 +-
 .../cdc/mongodb/strategy/MongoVersionStrategy.java |  26 ++-
 .../flink/action/cdc/mysql/MySqlRecordParser.java  |  33 +--
 .../cdc/mysql/format/DebeziumEventUtils.java       |  49 ++++-
 .../action/cdc/postgres/PostgresRecordParser.java  |  23 +-
 .../cdc/CdcDynamicTableParsingProcessFunction.java |  25 +--
 .../cdc/CdcMultiTableParsingProcessFunction.java   |  26 +--
 .../flink/sink/cdc/CdcParsingProcessFunction.java  |  20 +-
 .../apache/paimon/flink/sink/cdc/CdcSchema.java    | 187 ++++++++++++++++
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |   2 +-
 .../apache/paimon/flink/sink/cdc/EventParser.java  |   2 +-
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |   4 +-
 ...MultiTableUpdatedDataFieldsProcessFunction.java |  16 +-
 .../flink/sink/cdc/NewTableSchemaBuilder.java      |   8 +-
 .../flink/sink/cdc/RichCdcMultiplexRecord.java     |  45 ++--
 .../cdc/RichCdcMultiplexRecordEventParser.java     |   9 +-
 .../paimon/flink/sink/cdc/RichCdcRecord.java       |  33 ++-
 .../paimon/flink/sink/cdc/RichEventParser.java     |  20 +-
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  21 +-
 .../cdc/UpdatedDataFieldsProcessFunctionBase.java  |  15 +-
 .../flink/action/cdc/SchemaEvolutionTest.java      | 243 +++++++++++----------
 .../action/cdc/SyncDatabaseActionBaseTest.java     |  27 +--
 .../debezium/DebeziumBsonRecordParserTest.java     |  24 +-
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |   3 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  51 +++--
 .../flink/sink/cdc/CdcRecordSerializeITCase.java   |  19 +-
 .../apache/paimon/flink/sink/cdc/TestCdcEvent.java |  17 +-
 .../paimon/flink/sink/cdc/TestCdcEventParser.java  |   5 +-
 .../apache/paimon/flink/sink/cdc/TestTable.java    |  21 +-
 37 files changed, 639 insertions(+), 452 deletions(-)

diff --git a/docs/content/cdc-ingestion/mysql-cdc.md 
b/docs/content/cdc-ingestion/mysql-cdc.md
index 1632ef4c81..01b306470a 100644
--- a/docs/content/cdc-ingestion/mysql-cdc.md
+++ b/docs/content/cdc-ingestion/mysql-cdc.md
@@ -264,8 +264,11 @@ to avoid potential name conflict.
 ## FAQ
 
 1. Chinese characters in records ingested from MySQL are garbled.
+
 * Try to set `env.java.opts: -Dfile.encoding=UTF-8` in `flink-conf.yaml`(Flink 
version < 1.19) or `config.yaml`(Flink version >= 1.19)
 (the option is changed to `env.java.opts.all` since Flink-1.17).
 
-2. Synchronize MySQL Table comment.
+2. Synchronize MySQL table and column comment.
+
 * Synchronize MySQL create table comment to the paimon table, you need to 
configure `--mysql_conf jdbc.properties.useInformationSchema=true`.
+* Synchronize MySQL alter table or column comment to the paimon table, you 
need to configure `--mysql_conf debezium.include.schema.comments=true`.
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
index 1d775e9044..76289aa355 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractJsonRecordParser.java
@@ -21,10 +21,10 @@ package org.apache.paimon.flink.action.cdc.format;
 import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TypeUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -76,13 +76,13 @@ public abstract class AbstractJsonRecordParser extends 
AbstractRecordParser {
     protected abstract String dataField();
 
     // use STRING type in default when we cannot get origin data types (most 
cases)
-    protected void fillDefaultTypes(JsonNode record, RowType.Builder 
rowTypeBuilder) {
+    protected void fillDefaultTypes(JsonNode record, CdcSchema.Builder 
schemaBuilder) {
         record.fieldNames()
-                .forEachRemaining(name -> rowTypeBuilder.field(name, 
DataTypes.STRING()));
+                .forEachRemaining(name -> schemaBuilder.column(name, 
DataTypes.STRING()));
     }
 
-    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
-        fillDefaultTypes(record, rowTypeBuilder);
+    protected Map<String, String> extractRowData(JsonNode record, 
CdcSchema.Builder schemaBuilder) {
+        fillDefaultTypes(record, schemaBuilder);
         Map<String, Object> recordMap =
                 convertValue(record, new TypeReference<Map<String, Object>>() 
{});
         Map<String, String> rowData =
@@ -103,7 +103,7 @@ public abstract class AbstractJsonRecordParser extends 
AbstractRecordParser {
                                             }
                                             return 
Objects.toString(entry.getValue());
                                         }));
-        evalComputedColumns(rowData, rowTypeBuilder);
+        evalComputedColumns(rowData, schemaBuilder);
         return rowData;
     }
 
@@ -121,9 +121,9 @@ public abstract class AbstractJsonRecordParser extends 
AbstractRecordParser {
 
     protected void processRecord(
             JsonNode jsonNode, RowKind rowKind, List<RichCdcMultiplexRecord> 
records) {
-        RowType.Builder rowTypeBuilder = RowType.builder();
-        Map<String, String> rowData = this.extractRowData(jsonNode, 
rowTypeBuilder);
-        records.add(createRecord(rowKind, rowData, 
rowTypeBuilder.build().getFields()));
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+        Map<String, String> rowData = this.extractRowData(jsonNode, 
schemaBuilder);
+        records.add(createRecord(rowKind, rowData, schemaBuilder));
     }
 
     protected JsonNode mergeOldRecord(JsonNode data, JsonNode oldNode) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 8b8946a99a..1834444afa 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -22,11 +22,10 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
@@ -71,20 +70,7 @@ public abstract class AbstractRecordParser
             }
 
             Optional<RichCdcMultiplexRecord> recordOpt = 
extractRecords().stream().findFirst();
-            if (!recordOpt.isPresent()) {
-                return null;
-            }
-
-            Schema.Builder builder = Schema.newBuilder();
-            recordOpt
-                    .get()
-                    .fields()
-                    .forEach(
-                            field ->
-                                    builder.column(
-                                            field.name(), field.type(), 
field.description()));
-            builder.primaryKey(extractPrimaryKeys());
-            return builder.build();
+            return 
recordOpt.map(RichCdcMultiplexRecord::buildSchema).orElse(null);
         } catch (Exception e) {
             logInvalidSourceRecord(record);
             throw e;
@@ -114,24 +100,24 @@ public abstract class AbstractRecordParser
 
     /** generate values for computed columns. */
     protected void evalComputedColumns(
-            Map<String, String> rowData, RowType.Builder rowTypeBuilder) {
+            Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
         computedColumns.forEach(
                 computedColumn -> {
                     rowData.put(
                             computedColumn.columnName(),
                             
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
-                    rowTypeBuilder.field(computedColumn.columnName(), 
computedColumn.columnType());
+                    schemaBuilder.column(computedColumn.columnName(), 
computedColumn.columnType());
                 });
     }
 
     /** Handle case sensitivity here. */
     protected RichCdcMultiplexRecord createRecord(
-            RowKind rowKind, Map<String, String> data, List<DataField> 
paimonFields) {
+            RowKind rowKind, Map<String, String> data, CdcSchema.Builder 
schemaBuilder) {
+        schemaBuilder.primaryKey(extractPrimaryKeys());
         return new RichCdcMultiplexRecord(
                 getDatabaseName(),
                 getTableName(),
-                paimonFields,
-                extractPrimaryKeys(),
+                schemaBuilder.build(),
                 new CdcRecord(rowKind, data));
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
index 9525093af5..e14e4ab4b7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -166,7 +166,7 @@ public class AliyunRecordParser extends 
AbstractJsonRecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+    protected Map<String, String> extractRowData(JsonNode record, 
CdcSchema.Builder schemaBuilder) {
 
         Map<String, Object> recordMap =
                 JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
@@ -184,14 +184,14 @@ public class AliyunRecordParser extends 
AbstractJsonRecordParser {
             Tuple3<String, Integer, Integer> typeInfo = 
MySqlTypeUtils.getTypeInfo(originalType);
             DataType paimonDataType =
                     MySqlTypeUtils.toDataType(typeInfo.f0, typeInfo.f1, 
typeInfo.f2, typeMapping);
-            rowTypeBuilder.field(originalName, paimonDataType);
+            schemaBuilder.column(originalName, paimonDataType);
         }
 
         for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
             rowData.put(entry.getKey(), Objects.toString(entry.getValue(), 
null));
         }
 
-        evalComputedColumns(rowData, rowTypeBuilder);
+        evalComputedColumns(rowData, schemaBuilder);
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
index 170cea4d7f..5864396564 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/canal/CanalRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
 import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
@@ -152,7 +152,7 @@ public class CanalRecordParser extends 
AbstractJsonRecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+    protected Map<String, String> extractRowData(JsonNode record, 
CdcSchema.Builder schemaBuilder) {
         LinkedHashMap<String, String> originalFieldTypes = 
tryExtractOriginalFieldTypes();
         Map<String, Object> recordMap =
                 JsonSerdeUtil.convertValue(record, new 
TypeReference<Map<String, Object>>() {});
@@ -167,20 +167,20 @@ public class CanalRecordParser extends 
AbstractJsonRecordParser {
                 DataType paimonDataType =
                         MySqlTypeUtils.toDataType(
                                 typeInfo.f0, typeInfo.f1, typeInfo.f2, 
typeMapping);
-                rowTypeBuilder.field(originalName, paimonDataType);
+                schemaBuilder.column(originalName, paimonDataType);
 
                 String filedValue = 
Objects.toString(recordMap.get(originalName), null);
                 String newValue = transformValue(filedValue, typeInfo.f0, 
originalType);
                 rowData.put(originalName, newValue);
             }
         } else {
-            fillDefaultTypes(record, rowTypeBuilder);
+            fillDefaultTypes(record, schemaBuilder);
             for (Map.Entry<String, Object> entry : recordMap.entrySet()) {
                 rowData.put(entry.getKey(), Objects.toString(entry.getValue(), 
null));
             }
         }
 
-        evalComputedColumns(rowData, rowTypeBuilder);
+        evalComputedColumns(rowData, schemaBuilder);
         return rowData;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
index f89183d6d3..7c3763a604 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumAvroRecordParser.java
@@ -22,9 +22,9 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
@@ -113,9 +113,9 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
 
     private void processRecord(
             GenericRecord record, RowKind rowKind, 
List<RichCdcMultiplexRecord> records) {
-        RowType.Builder rowTypeBuilder = RowType.builder();
-        Map<String, String> rowData = this.extractRowData(record, 
rowTypeBuilder);
-        records.add(createRecord(rowKind, rowData, 
rowTypeBuilder.build().getFields()));
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+        Map<String, String> rowData = this.extractRowData(record, 
schemaBuilder);
+        records.add(createRecord(rowKind, rowData, schemaBuilder));
     }
 
     @Override
@@ -128,7 +128,7 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
     }
 
     private Map<String, String> extractRowData(
-            GenericRecord record, RowType.Builder rowTypeBuilder) {
+            GenericRecord record, CdcSchema.Builder schemaBuilder) {
         Schema payloadSchema = sanitizedSchema(record.getSchema());
 
         LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
@@ -155,10 +155,10 @@ public class DebeziumAvroRecordParser extends 
AbstractRecordParser {
                             record.get(fieldName),
                             ZoneOffset.UTC);
             resultMap.put(fieldName, transformed);
-            rowTypeBuilder.field(fieldName, avroToPaimonDataType(schema));
+            schemaBuilder.column(fieldName, avroToPaimonDataType(schema));
         }
 
-        evalComputedColumns(resultMap, rowTypeBuilder);
+        evalComputedColumns(resultMap, schemaBuilder);
         return resultMap;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
index 397575c8e5..5c13170638 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
@@ -22,10 +22,10 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mongodb.BsonValueConvertor;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.TypeUtils;
 
@@ -121,7 +121,7 @@ public class DebeziumBsonRecordParser extends 
DebeziumJsonRecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+    protected Map<String, String> extractRowData(JsonNode record, 
CdcSchema.Builder schemaBuilder) {
         // bson record should be a string
         Preconditions.checkArgument(
                 record.isTextual(),
@@ -133,10 +133,10 @@ public class DebeziumBsonRecordParser extends 
DebeziumJsonRecordParser {
         for (Map.Entry<String, BsonValue> entry : document.entrySet()) {
             String fieldName = entry.getKey();
             resultMap.put(fieldName, 
toJsonString(BsonValueConvertor.convert(entry.getValue())));
-            rowTypeBuilder.field(fieldName, DataTypes.STRING());
+            schemaBuilder.column(fieldName, DataTypes.STRING());
         }
 
-        evalComputedColumns(resultMap, rowTypeBuilder);
+        evalComputedColumns(resultMap, schemaBuilder);
 
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
index 6e06e3adca..19156fb916 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java
@@ -22,9 +22,9 @@ import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
@@ -181,9 +181,9 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
     }
 
     @Override
-    protected Map<String, String> extractRowData(JsonNode record, 
RowType.Builder rowTypeBuilder) {
+    protected Map<String, String> extractRowData(JsonNode record, 
CdcSchema.Builder schemaBuilder) {
         if (!hasSchema) {
-            return super.extractRowData(record, rowTypeBuilder);
+            return super.extractRowData(record, schemaBuilder);
         }
 
         Map<String, Object> recordMap =
@@ -205,13 +205,13 @@ public class DebeziumJsonRecordParser extends 
AbstractJsonRecordParser {
                             ZoneOffset.UTC);
             resultMap.put(fieldName, transformed);
 
-            rowTypeBuilder.field(
+            schemaBuilder.column(
                     fieldName,
                     DebeziumSchemaUtils.toDataType(
                             debeziumType, className, 
parameters.get(fieldName)));
         }
 
-        evalComputedColumns(resultMap, rowTypeBuilder);
+        evalComputedColumns(resultMap, schemaBuilder);
 
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
index 450eacaefe..5f9538d2fc 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/Mongo4VersionStrategy.java
@@ -20,10 +20,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -123,16 +122,11 @@ public class Mongo4VersionStrategy implements 
MongoVersionStrategy {
      */
     private RichCdcMultiplexRecord processRecord(JsonNode fullDocument, 
RowKind rowKind)
             throws JsonProcessingException {
-        RowType.Builder rowTypeBuilder = RowType.builder();
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
         Map<String, String> record =
-                getExtractRow(fullDocument, rowTypeBuilder, computedColumns, 
mongodbConfig);
-        List<DataField> fields = rowTypeBuilder.build().getFields();
-
+                getExtractRow(fullDocument, schemaBuilder, computedColumns, 
mongodbConfig);
+        schemaBuilder.primaryKey(extractPrimaryKeys());
         return new RichCdcMultiplexRecord(
-                databaseName,
-                collection,
-                fields,
-                extractPrimaryKeys(),
-                new CdcRecord(rowKind, record));
+                databaseName, collection, schemaBuilder.build(), new 
CdcRecord(rowKind, record));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
index df288a4150..baea947276 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java
@@ -20,9 +20,9 @@ package org.apache.paimon.flink.action.cdc.mongodb.strategy;
 
 import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.mongodb.SchemaAcquisitionMode;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -71,14 +71,14 @@ public interface MongoVersionStrategy {
      * Determines the extraction mode and retrieves the row accordingly.
      *
      * @param jsonNode The JsonNode representing the MongoDB document.
-     * @param rowTypeBuilder row type builder.
+     * @param schemaBuilder schema builder.
      * @param mongodbConfig Configuration for the MongoDB connection.
      * @return A map representing the extracted row.
      * @throws JsonProcessingException If there's an error during JSON 
processing.
      */
     default Map<String, String> getExtractRow(
             JsonNode jsonNode,
-            RowType.Builder rowTypeBuilder,
+            CdcSchema.Builder schemaBuilder,
             List<ComputedColumn> computedColumns,
             Configuration mongodbConfig)
             throws JsonProcessingException {
@@ -104,9 +104,9 @@ public interface MongoVersionStrategy {
                         mongodbConfig.get(PARSER_PATH),
                         mongodbConfig.get(FIELD_NAME),
                         computedColumns,
-                        rowTypeBuilder);
+                        schemaBuilder);
             case DYNAMIC:
-                return parseAndTypeJsonRow(document.toString(), 
rowTypeBuilder, computedColumns);
+                return parseAndTypeJsonRow(document.toString(), schemaBuilder, 
computedColumns);
             default:
                 throw new RuntimeException("Unsupported extraction mode: " + 
mode);
         }
@@ -114,9 +114,11 @@ public interface MongoVersionStrategy {
 
     /** Parses and types a JSON row based on the given parameters. */
     default Map<String, String> parseAndTypeJsonRow(
-            String evaluate, RowType.Builder rowTypeBuilder, 
List<ComputedColumn> computedColumns) {
+            String evaluate,
+            CdcSchema.Builder schemaBuilder,
+            List<ComputedColumn> computedColumns) {
         Map<String, String> parsedRow = JsonSerdeUtil.parseJsonMap(evaluate, 
String.class);
-        return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
+        return processParsedData(parsedRow, schemaBuilder, computedColumns);
     }
 
     /** Parses fields from a JSON record based on the given parameters. */
@@ -125,7 +127,7 @@ public interface MongoVersionStrategy {
             String fieldPaths,
             String fieldNames,
             List<ComputedColumn> computedColumns,
-            RowType.Builder rowTypeBuilder) {
+            CdcSchema.Builder schemaBuilder) {
         String[] columnNames = fieldNames.split(",");
         String[] parseNames = fieldPaths.split(",");
         Map<String, String> parsedRow = new HashMap<>();
@@ -135,20 +137,20 @@ public interface MongoVersionStrategy {
             parsedRow.put(columnNames[i], 
Optional.ofNullable(evaluate).orElse("{}"));
         }
 
-        return processParsedData(parsedRow, rowTypeBuilder, computedColumns);
+        return processParsedData(parsedRow, schemaBuilder, computedColumns);
     }
 
     /** Processes the parsed data to generate the result map and update field 
types. */
     static Map<String, String> processParsedData(
             Map<String, String> parsedRow,
-            RowType.Builder rowTypeBuilder,
+            CdcSchema.Builder schemaBuilder,
             List<ComputedColumn> computedColumns) {
         int initialCapacity = parsedRow.size() + computedColumns.size();
         Map<String, String> resultMap = new HashMap<>(initialCapacity);
 
         parsedRow.forEach(
                 (column, value) -> {
-                    rowTypeBuilder.field(column, DataTypes.STRING());
+                    schemaBuilder.column(column, DataTypes.STRING());
                     resultMap.put(column, value);
                 });
         computedColumns.forEach(
@@ -158,7 +160,7 @@ public interface MongoVersionStrategy {
                     String computedValue = 
computedColumn.eval(parsedRow.get(fieldReference));
 
                     resultMap.put(columnName, computedValue);
-                    rowTypeBuilder.field(columnName, 
computedColumn.columnType());
+                    schemaBuilder.column(columnName, 
computedColumn.columnType());
                 });
         return resultMap;
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 26579e718f..0fffdf0b98 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -25,11 +25,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
 import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
@@ -165,17 +164,15 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
         Table table = tableChange.getTable();
 
-        List<DataField> fields = extractFields(table);
-        List<String> primaryKeys = table.primaryKeyColumnNames();
+        CdcSchema schema = extractSchema(table);
 
-        // TODO : add table comment and column comment when we upgrade flink 
cdc to 2.4
         return Collections.singletonList(
                 new RichCdcMultiplexRecord(
-                        databaseName, currentTable, fields, primaryKeys, 
CdcRecord.emptyRecord()));
+                        databaseName, currentTable, schema, 
CdcRecord.emptyRecord()));
     }
 
-    private List<DataField> extractFields(Table table) {
-        RowType.Builder rowType = RowType.builder();
+    private CdcSchema extractSchema(Table table) {
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
         List<Column> columns = table.columns();
 
         for (Column column : columns) {
@@ -189,12 +186,20 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
             // add column comment when we upgrade flink cdc to 2.4
             if (isDebeziumSchemaCommentsEnabled) {
-                rowType.field(column.name(), dataType, column.comment());
+                schemaBuilder.column(column.name(), dataType, 
column.comment());
             } else {
-                rowType.field(column.name(), dataType);
+                schemaBuilder.column(column.name(), dataType);
             }
         }
-        return rowType.build().getFields();
+
+        schemaBuilder.primaryKey(table.primaryKeyColumnNames());
+
+        // add table comment when we upgrade flink cdc to 2.4
+        if (isDebeziumSchemaCommentsEnabled) {
+            schemaBuilder.comment(table.comment());
+        }
+
+        return schemaBuilder.build();
     }
 
     private List<RichCdcMultiplexRecord> extractRecords() {
@@ -270,10 +275,6 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
 
     protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, 
String> data) {
         return new RichCdcMultiplexRecord(
-                databaseName,
-                currentTable,
-                Collections.emptyList(),
-                Collections.emptyList(),
-                new CdcRecord(rowKind, data));
+                databaseName, currentTable, null, new CdcRecord(rowKind, 
data));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
index c03b050fa2..fff189cfc0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/format/DebeziumEventUtils.java
@@ -19,9 +19,13 @@
 package org.apache.paimon.flink.action.cdc.mysql.format;
 
 import io.debezium.document.Array;
+import io.debezium.document.Document;
 import io.debezium.document.DocumentReader;
+import io.debezium.relational.Table;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import io.debezium.relational.history.TableChanges.TableChangeType;
 import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
 
 import java.io.IOException;
@@ -30,8 +34,6 @@ import java.io.IOException;
 public class DebeziumEventUtils {
 
     private static final DocumentReader DOCUMENT_READER = 
DocumentReader.defaultReader();
-    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
-            new FlinkJsonTableChangeSerializer();
 
     public static HistoryRecord getHistoryRecord(String historyRecordStr) 
throws IOException {
         return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
@@ -39,7 +41,46 @@ public class DebeziumEventUtils {
 
     public static TableChanges getTableChanges(String historyRecordStr) throws 
IOException {
         HistoryRecord historyRecord = getHistoryRecord(historyRecordStr);
-        Array tableChanges = 
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
-        return TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+        Array tableChangesDocument =
+                
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+        return deserialize(tableChangesDocument, true);
+    }
+
+    /**
+     * Copy from {@link FlinkJsonTableChangeSerializer#deserialize}, add a 
method to supplement
+     * table comment. TODO remove this method after the method is added to 
{@link
+     * FlinkJsonTableChangeSerializer}.
+     */
+    private static TableChanges deserialize(Array array, boolean 
useCatalogBeforeSchema) {
+        TableChanges tableChanges = new TableChanges();
+
+        for (Array.Entry entry : array) {
+            Document document = entry.getValue().asDocument();
+            TableChange change =
+                    FlinkJsonTableChangeSerializer.fromDocument(document, 
useCatalogBeforeSchema);
+
+            if (change.getType() == TableChangeType.CREATE) {
+                // tableChanges.create(change.getTable());
+                tableChanges.create(supplementTableComment(document, 
change.getTable()));
+            } else if (change.getType() == TableChangeType.ALTER) {
+                // tableChanges.alter(change.getTable());
+                tableChanges.alter(supplementTableComment(document, 
change.getTable()));
+            } else if (change.getType() == TableChangeType.DROP) {
+                tableChanges.drop(change.getTable());
+            }
+        }
+
+        return tableChanges;
+    }
+
+    private static Table supplementTableComment(Document document, Table 
table) {
+        if (table.comment() != null) {
+            return table;
+        }
+        String comment = document.getString("comment");
+        if (comment != null) {
+            return table.edit().setComment(comment).create();
+        }
+        return table;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index 07156823b3..af114cb920 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -24,12 +24,11 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.mysql.format.DebeziumEvent;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
@@ -62,7 +61,6 @@ import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -123,7 +121,7 @@ public class PostgresRecordParser
         extractRecords().forEach(out::collect);
     }
 
-    private List<DataField> extractFields(DebeziumEvent.Field schema) {
+    private CdcSchema extractSchema(DebeziumEvent.Field schema) {
         Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
         Preconditions.checkArgument(
                 !afterFields.isEmpty(),
@@ -131,7 +129,7 @@ public class PostgresRecordParser
                         + "Please make sure that `includeSchema` is true "
                         + "in the JsonDebeziumDeserializationSchema you 
created");
 
-        RowType.Builder rowType = RowType.builder();
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
         afterFields.forEach(
                 (key, value) -> {
                     DataType dataType = extractFieldType(value);
@@ -139,9 +137,9 @@ public class PostgresRecordParser
                             dataType.copy(
                                     typeMapping.containsMode(TO_NULLABLE) || 
value.optional());
 
-                    rowType.field(key, dataType);
+                    schemaBuilder.column(key, dataType);
                 });
-        return rowType.build().getFields();
+        return schemaBuilder.build();
     }
 
     /**
@@ -217,13 +215,12 @@ public class PostgresRecordParser
 
         Map<String, String> after = extractRow(root.payload().after());
         if (!after.isEmpty()) {
-            List<DataField> fields = extractFields(root.schema());
+            CdcSchema schema = extractSchema(root.schema());
             records.add(
                     new RichCdcMultiplexRecord(
                             databaseName,
                             currentTable,
-                            fields,
-                            Collections.emptyList(),
+                            schema,
                             new CdcRecord(RowKind.INSERT, after)));
         }
 
@@ -365,10 +362,6 @@ public class PostgresRecordParser
 
     protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, 
String> data) {
         return new RichCdcMultiplexRecord(
-                databaseName,
-                currentTable,
-                Collections.emptyList(),
-                Collections.emptyList(),
-                new CdcRecord(rowKind, data));
+                databaseName, currentTable, null, new CdcRecord(rowKind, 
data));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
index 4efcf1207e..d56963731f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicTableParsingProcessFunction.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.types.DataField;
 
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeHint;
@@ -34,13 +33,11 @@ import org.apache.flink.util.OutputTag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
- * A {@link ProcessFunction} to parse CDC change event to either a list of 
{@link DataField}s or
- * {@link CdcRecord} and send them to different side outputs according to 
table name. This process
- * function will capture newly added tables when syncing entire database and 
in cases where the
- * newly added tables are including by attesting table filters.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link 
CdcSchema}s or {@link
+ * CdcRecord} and send them to different side outputs according to table name. 
This process function
+ * will capture newly added tables when syncing entire database and in cases 
where the newly added
+ * tables are including by attesting table filters.
  *
  * <p>This {@link ProcessFunction} can handle records for different tables at 
the same time.
  *
@@ -54,12 +51,10 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
     public static final OutputTag<CdcMultiplexRecord> DYNAMIC_OUTPUT_TAG =
             new OutputTag<>("paimon-dynamic-table", 
TypeInformation.of(CdcMultiplexRecord.class));
 
-    public static final OutputTag<Tuple2<Identifier, List<DataField>>>
-            DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG =
-                    new OutputTag<>(
-                            "paimon-dynamic-table-schema-change",
-                            TypeInformation.of(
-                                    new TypeHint<Tuple2<Identifier, 
List<DataField>>>() {}));
+    public static final OutputTag<Tuple2<Identifier, CdcSchema>> 
DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG =
+            new OutputTag<>(
+                    "paimon-dynamic-table-schema-change",
+                    TypeInformation.of(new TypeHint<Tuple2<Identifier, 
CdcSchema>>() {}));
 
     private final EventParser.Factory<T> parserFactory;
     private final String database;
@@ -117,8 +112,8 @@ public class CdcDynamicTableParsingProcessFunction<T> 
extends ProcessFunction<T,
                             }
                         });
 
-        List<DataField> schemaChange = parser.parseSchemaChange();
-        if (!schemaChange.isEmpty()) {
+        CdcSchema schemaChange = parser.parseSchemaChange();
+        if (schemaChange != null) {
             context.output(
                     DYNAMIC_SCHEMA_CHANGE_OUTPUT_TAG,
                     Tuple2.of(Identifier.create(database, tableName), 
schemaChange));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
index 4c5e0600bb..899ca54465 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcMultiTableParsingProcessFunction.java
@@ -18,23 +18,19 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataField;
-
 import org.apache.flink.api.common.functions.OpenContext;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 /**
- * A {@link ProcessFunction} to parse CDC change event to either a list of 
{@link DataField}s or
- * {@link CdcRecord} and send them to different side outputs according to 
table name.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link 
CdcSchema} or {@link
+ * CdcRecord} and send them to different side outputs according to table name.
  *
  * <p>This {@link ProcessFunction} can handle records for different tables at 
the same time.
  *
@@ -45,7 +41,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
     private final EventParser.Factory<T> parserFactory;
 
     private transient EventParser<T> parser;
-    private transient Map<String, OutputTag<List<DataField>>> 
updatedDataFieldsOutputTags;
+    private transient Map<String, OutputTag<CdcSchema>> schemaChangeOutputTags;
     private transient Map<String, OutputTag<CdcRecord>> recordOutputTags;
 
     public CdcMultiTableParsingProcessFunction(EventParser.Factory<T> 
parserFactory) {
@@ -64,7 +60,7 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
      */
     public void open(Configuration parameters) throws Exception {
         parser = parserFactory.create();
-        updatedDataFieldsOutputTags = new HashMap<>();
+        schemaChangeOutputTags = new HashMap<>();
         recordOutputTags = new HashMap<>();
     }
 
@@ -72,22 +68,22 @@ public class CdcMultiTableParsingProcessFunction<T> extends 
ProcessFunction<T, V
     public void processElement(T raw, Context context, Collector<Void> 
collector) throws Exception {
         parser.setRawEvent(raw);
         String tableName = parser.parseTableName();
-        List<DataField> schemaChange = parser.parseSchemaChange();
-        if (!schemaChange.isEmpty()) {
+        CdcSchema schemaChange = parser.parseSchemaChange();
+        if (schemaChange != null) {
             context.output(getUpdatedDataFieldsOutputTag(tableName), 
schemaChange);
         }
         parser.parseRecords()
                 .forEach(record -> 
context.output(getRecordOutputTag(tableName), record));
     }
 
-    private OutputTag<List<DataField>> getUpdatedDataFieldsOutputTag(String 
tableName) {
-        return updatedDataFieldsOutputTags.computeIfAbsent(
-                tableName, 
CdcMultiTableParsingProcessFunction::createUpdatedDataFieldsOutputTag);
+    private OutputTag<CdcSchema> getUpdatedDataFieldsOutputTag(String 
tableName) {
+        return schemaChangeOutputTags.computeIfAbsent(
+                tableName, 
CdcMultiTableParsingProcessFunction::createSchameChangeOutputTag);
     }
 
-    public static OutputTag<List<DataField>> 
createUpdatedDataFieldsOutputTag(String tableName) {
+    public static OutputTag<CdcSchema> createSchameChangeOutputTag(String 
tableName) {
         return new OutputTag<>(
-                "new-data-field-list-" + tableName, new 
ListTypeInfo<>(DataField.class));
+                "table-schema-change-" + tableName, 
TypeInformation.of(CdcSchema.class));
     }
 
     private OutputTag<CdcRecord> getRecordOutputTag(String tableName) {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
index eec228f3c0..9e267eda24 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcParsingProcessFunction.java
@@ -18,20 +18,16 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataField;
-
 import org.apache.flink.api.common.functions.OpenContext;
-import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
-import java.util.List;
-
 /**
- * A {@link ProcessFunction} to parse CDC change event to either a list of 
{@link DataField}s or
- * {@link CdcRecord} and send them to different downstreams.
+ * A {@link ProcessFunction} to parse CDC change event to either a {@link 
CdcSchema} or {@link
+ * CdcRecord} and send them to different downstreams.
  *
  * <p>This {@link ProcessFunction} can only handle records for a single 
constant table. To handle
  * records for different tables, see {@link 
CdcMultiTableParsingProcessFunction}.
@@ -40,8 +36,8 @@ import java.util.List;
  */
 public class CdcParsingProcessFunction<T> extends ProcessFunction<T, 
CdcRecord> {
 
-    public static final OutputTag<List<DataField>> 
NEW_DATA_FIELD_LIST_OUTPUT_TAG =
-            new OutputTag<>("new-data-field-list", new 
ListTypeInfo<>(DataField.class));
+    public static final OutputTag<CdcSchema> SCHEMA_CHANGE_OUTPUT_TAG =
+            new OutputTag<>("table-schema-change", 
TypeInformation.of(CdcSchema.class));
 
     private final EventParser.Factory<T> parserFactory;
 
@@ -69,9 +65,9 @@ public class CdcParsingProcessFunction<T> extends 
ProcessFunction<T, CdcRecord>
     public void processElement(T raw, Context context, Collector<CdcRecord> 
collector)
             throws Exception {
         parser.setRawEvent(raw);
-        List<DataField> schemaChange = parser.parseSchemaChange();
-        if (!schemaChange.isEmpty()) {
-            context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, schemaChange);
+        CdcSchema schemaChange = parser.parseSchemaChange();
+        if (schemaChange != null) {
+            context.output(SCHEMA_CHANGE_OUTPUT_TAG, schemaChange);
         }
         parser.parseRecords().forEach(collector::collect);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
new file mode 100644
index 0000000000..b5199d3a0b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.ReassignFieldId;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** A schema change message from the CDC source. */
+public class CdcSchema implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<DataField> fields;
+
+    private final List<String> primaryKeys;
+
+    @Nullable private final String comment;
+
+    public CdcSchema(List<DataField> fields, List<String> primaryKeys, 
@Nullable String comment) {
+        this.fields = fields;
+        this.primaryKeys = primaryKeys;
+        this.comment = comment;
+    }
+
+    public List<DataField> fields() {
+        return fields;
+    }
+
+    public List<String> primaryKeys() {
+        return primaryKeys;
+    }
+
+    public String comment() {
+        return comment;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CdcSchema that = (CdcSchema) o;
+        return Objects.equals(fields, that.fields)
+                && Objects.equals(primaryKeys, that.primaryKeys)
+                && Objects.equals(comment, that.comment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fields, primaryKeys, comment);
+    }
+
+    @Override
+    public String toString() {
+        return "Schema{"
+                + "fields="
+                + fields
+                + ", primaryKeys="
+                + primaryKeys
+                + ", comment="
+                + comment
+                + '}';
+    }
+
+    /** Builder for configuring and creating instances of {@link CdcSchema}. */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
+    /** A builder for constructing an immutable but still unresolved {@link 
CdcSchema}. */
+    public static final class Builder {
+
+        private final List<DataField> columns = new ArrayList<>();
+
+        private List<String> primaryKeys = new ArrayList<>();
+
+        @Nullable private String comment;
+
+        private final AtomicInteger highestFieldId = new AtomicInteger(-1);
+
+        public int getHighestFieldId() {
+            return highestFieldId.get();
+        }
+
+        /**
+         * Declares a column that is appended to this schema.
+         *
+         * @param dataField data field
+         */
+        public Builder column(DataField dataField) {
+            Preconditions.checkNotNull(dataField, "Data field must not be 
null.");
+            Preconditions.checkNotNull(dataField.name(), "Column name must not 
be null.");
+            Preconditions.checkNotNull(dataField.type(), "Data type must not 
be null.");
+            columns.add(dataField);
+            return this;
+        }
+
+        /**
+         * Declares a column that is appended to this schema.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         */
+        public Builder column(String columnName, DataType dataType) {
+            return column(columnName, dataType, null);
+        }
+
+        /**
+         * Declares a column that is appended to this schema.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param description description of the column
+         */
+        public Builder column(String columnName, DataType dataType, @Nullable 
String description) {
+            Preconditions.checkNotNull(columnName, "Column name must not be 
null.");
+            Preconditions.checkNotNull(dataType, "Data type must not be 
null.");
+
+            int id = highestFieldId.incrementAndGet();
+            DataType reassignDataType = ReassignFieldId.reassign(dataType, 
highestFieldId);
+            columns.add(new DataField(id, columnName, reassignDataType, 
description));
+            return this;
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. 
Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be 
nullable.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(String... columnNames) {
+            return primaryKey(Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares a primary key constraint for a set of given columns. 
Primary key uniquely
+         * identify a row in a table. Neither of columns in a primary can be 
nullable.
+         *
+         * @param columnNames columns that form a unique primary key
+         */
+        public Builder primaryKey(List<String> columnNames) {
+            this.primaryKeys = new ArrayList<>(columnNames);
+            return this;
+        }
+
+        /** Declares table comment. */
+        public Builder comment(@Nullable String comment) {
+            this.comment = comment;
+            return this;
+        }
+
+        /** Returns an instance of an unresolved {@link CdcSchema}. */
+        public CdcSchema build() {
+            return new CdcSchema(columns, primaryKeys, comment);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 7edde41f5b..7b24ff8138 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -112,7 +112,7 @@ public class CdcSinkBuilder<T> {
 
         DataStream<Void> schemaChangeProcessFunction =
                 SingleOutputStreamOperatorUtils.getSideOutput(
-                                parsed, 
CdcParsingProcessFunction.NEW_DATA_FIELD_LIST_OUTPUT_TAG)
+                                parsed, 
CdcParsingProcessFunction.SCHEMA_CHANGE_OUTPUT_TAG)
                         .process(
                                 new UpdatedDataFieldsProcessFunction(
                                         new SchemaManager(dataTable.fileIO(), 
dataTable.location()),
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
index c0e4810128..a7ce1c2cf5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/EventParser.java
@@ -47,7 +47,7 @@ public interface EventParser<T> {
      *
      * @return empty if there is no schema change
      */
-    List<DataField> parseSchemaChange();
+    CdcSchema parseSchemaChange();
 
     /**
      * Parse records from event.
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index b15a17a679..196b9e4350 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -222,8 +222,8 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
             DataStream<Void> schemaChangeProcessFunction =
                     SingleOutputStreamOperatorUtils.getSideOutput(
                                     parsed,
-                                    CdcMultiTableParsingProcessFunction
-                                            
.createUpdatedDataFieldsOutputTag(table.name()))
+                                    
CdcMultiTableParsingProcessFunction.createSchameChangeOutputTag(
+                                            table.name()))
                             .process(
                                     new UpdatedDataFieldsProcessFunction(
                                             new SchemaManager(table.fileIO(), 
table.location()),
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 71f71b2412..cc8118f6ca 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
@@ -34,19 +33,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
 /**
- * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a list of {@link
- * DataField}s.
+ * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a {@link
+ * CdcSchema}.
  *
  * <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link 
ProcessFunction} must
  * be 1.
  */
 public class MultiTableUpdatedDataFieldsProcessFunction
-        extends UpdatedDataFieldsProcessFunctionBase<Tuple2<Identifier, 
List<DataField>>, Void> {
+        extends UpdatedDataFieldsProcessFunctionBase<Tuple2<Identifier, 
CdcSchema>, Void> {
 
     private static final Logger LOG =
             
LoggerFactory.getLogger(MultiTableUpdatedDataFieldsProcessFunction.class);
@@ -60,11 +58,9 @@ public class MultiTableUpdatedDataFieldsProcessFunction
 
     @Override
     public void processElement(
-            Tuple2<Identifier, List<DataField>> updatedDataFields,
-            Context context,
-            Collector<Void> collector)
+            Tuple2<Identifier, CdcSchema> updatedSchema, Context context, 
Collector<Void> collector)
             throws Exception {
-        Identifier tableId = updatedDataFields.f0;
+        Identifier tableId = updatedSchema.f0;
         SchemaManager schemaManager =
                 schemaManagers.computeIfAbsent(
                         tableId,
@@ -82,7 +78,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
             LOG.error("Failed to get schema manager for table " + tableId);
         } else {
             for (SchemaChange schemaChange :
-                    extractSchemaChanges(schemaManager, updatedDataFields.f1)) 
{
+                    extractSchemaChanges(schemaManager, updatedSchema.f1)) {
                 applySchemaChange(schemaManager, schemaChange, tableId);
             }
         }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
index ce1bcc47db..0d4a74ac2f 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/NewTableSchemaBuilder.java
@@ -59,13 +59,7 @@ public class NewTableSchemaBuilder implements Serializable {
     }
 
     public Optional<Schema> build(RichCdcMultiplexRecord record) {
-        Schema sourceSchema =
-                new Schema(
-                        record.fields(),
-                        Collections.emptyList(),
-                        record.primaryKeys(),
-                        Collections.emptyMap(),
-                        null);
+        Schema sourceSchema = record.buildSchema();
         List<String> specifiedPartitionKeys = new ArrayList<>();
 
         List<String> partitionKeyMultipleList = 
partitionKeyMultiple.get(record.tableName());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
index 1a01f0273d..5417cd9561 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecord.java
@@ -18,13 +18,12 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataField;
+import org.apache.paimon.schema.Schema;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collections;
 import java.util.Objects;
 
 /** Compared to {@link CdcMultiplexRecord}, this contains schema information. 
*/
@@ -34,25 +33,17 @@ public class RichCdcMultiplexRecord implements Serializable 
{
 
     @Nullable private final String databaseName;
     @Nullable private final String tableName;
-    private final List<DataField> fields;
-    private final List<String> primaryKeys;
+    private final CdcSchema cdcSchema;
     private final CdcRecord cdcRecord;
 
     public RichCdcMultiplexRecord(
             @Nullable String databaseName,
             @Nullable String tableName,
-            List<DataField> fields,
-            List<String> primaryKeys,
+            @Nullable CdcSchema cdcSchema,
             CdcRecord cdcRecord) {
         this.databaseName = databaseName;
         this.tableName = tableName;
-        // This class can not be deserialized by kryoSerializer,
-        // Throw an exception message `com.esotericsoftware.kryo.KryoException:
-        // java.lang.UnsupportedOperationException` ,
-        // because fields and primaryKeys is an
-        // unmodifiableList. So we need to ensure that List is a modifiable 
list.
-        this.fields = new ArrayList<>(fields);
-        this.primaryKeys = new ArrayList<>(primaryKeys);
+        this.cdcSchema = cdcSchema == null ? CdcSchema.newBuilder().build() : 
cdcSchema;
         this.cdcRecord = cdcRecord;
     }
 
@@ -66,21 +57,26 @@ public class RichCdcMultiplexRecord implements Serializable 
{
         return tableName;
     }
 
-    public List<DataField> fields() {
-        return fields;
+    public CdcSchema cdcSchema() {
+        return cdcSchema;
     }
 
-    public List<String> primaryKeys() {
-        return primaryKeys;
+    public Schema buildSchema() {
+        return new Schema(
+                cdcSchema.fields(),
+                Collections.emptyList(),
+                cdcSchema.primaryKeys(),
+                Collections.emptyMap(),
+                cdcSchema.comment());
     }
 
     public RichCdcRecord toRichCdcRecord() {
-        return new RichCdcRecord(cdcRecord, fields);
+        return new RichCdcRecord(cdcRecord, cdcSchema);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(databaseName, tableName, fields, primaryKeys, 
cdcRecord);
+        return Objects.hash(databaseName, tableName, cdcSchema, cdcRecord);
     }
 
     @Override
@@ -94,8 +90,7 @@ public class RichCdcMultiplexRecord implements Serializable {
         RichCdcMultiplexRecord that = (RichCdcMultiplexRecord) o;
         return Objects.equals(databaseName, that.databaseName)
                 && Objects.equals(tableName, that.tableName)
-                && Objects.equals(fields, that.fields)
-                && Objects.equals(primaryKeys, that.primaryKeys)
+                && Objects.equals(cdcSchema, that.cdcSchema)
                 && Objects.equals(cdcRecord, that.cdcRecord);
     }
 
@@ -106,10 +101,8 @@ public class RichCdcMultiplexRecord implements 
Serializable {
                 + databaseName
                 + ", tableName="
                 + tableName
-                + ", fields="
-                + fields
-                + ", primaryKeys="
-                + primaryKeys
+                + ", cdcSchema="
+                + cdcSchema
                 + ", cdcRecord="
                 + cdcRecord
                 + '}';
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index 47367c4234..4ae4da6706 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.TableNameConverter;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataField;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,10 +111,8 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     }
 
     @Override
-    public List<DataField> parseSchemaChange() {
-        return shouldSynchronizeCurrentTable
-                ? currentParser.parseSchemaChange()
-                : Collections.emptyList();
+    public CdcSchema parseSchemaChange() {
+        return shouldSynchronizeCurrentTable ? 
currentParser.parseSchemaChange() : null;
     }
 
     @Override
@@ -205,7 +202,7 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
 
     private boolean shouldCreateCurrentTable() {
         return shouldSynchronizeCurrentTable
-                && !record.fields().isEmpty()
+                && !record.cdcSchema().fields().isEmpty()
                 && createdTables.add(parseTableName());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
index 04b86fea56..73b156f0fd 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcRecord.java
@@ -19,19 +19,15 @@
 package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.annotation.Experimental;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /** A change message contains schema and data. */
 @Experimental
@@ -40,11 +36,11 @@ public class RichCdcRecord implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final CdcRecord cdcRecord;
-    private final List<DataField> fields;
+    private final CdcSchema cdcSchema;
 
-    public RichCdcRecord(CdcRecord cdcRecord, List<DataField> fields) {
+    public RichCdcRecord(CdcRecord cdcRecord, CdcSchema cdcSchema) {
         this.cdcRecord = cdcRecord;
-        this.fields = fields;
+        this.cdcSchema = cdcSchema;
     }
 
     public boolean hasPayload() {
@@ -55,8 +51,8 @@ public class RichCdcRecord implements Serializable {
         return cdcRecord.kind();
     }
 
-    public List<DataField> fields() {
-        return fields;
+    public CdcSchema cdcSchema() {
+        return cdcSchema;
     }
 
     public CdcRecord toCdcRecord() {
@@ -72,34 +68,32 @@ public class RichCdcRecord implements Serializable {
             return false;
         }
         RichCdcRecord that = (RichCdcRecord) o;
-        return cdcRecord == that.cdcRecord && Objects.equals(fields, 
that.fields);
+        return cdcRecord == that.cdcRecord && Objects.equals(cdcSchema, 
that.cdcSchema);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(cdcRecord, fields);
+        return Objects.hash(cdcRecord, cdcSchema);
     }
 
     @Override
     public String toString() {
-        return "{" + "cdcRecord=" + cdcRecord + ", fields=" + fields + '}';
+        return "{" + "cdcRecord=" + cdcRecord + ", cdcSchema=" + cdcSchema + 
'}';
     }
 
     public static Builder builder(RowKind kind) {
-        return new Builder(kind, new AtomicInteger(-1));
+        return new Builder(kind);
     }
 
     /** Builder for {@link RichCdcRecord}. */
     public static class Builder {
 
         private final RowKind kind;
-        private final AtomicInteger fieldId;
-        private final List<DataField> fields = new ArrayList<>();
+        private final CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
         private final Map<String, String> data = new HashMap<>();
 
-        public Builder(RowKind kind, AtomicInteger fieldId) {
+        public Builder(RowKind kind) {
             this.kind = kind;
-            this.fieldId = fieldId;
         }
 
         public Builder field(String name, DataType type, String value) {
@@ -108,13 +102,14 @@ public class RichCdcRecord implements Serializable {
 
         public Builder field(
                 String name, DataType type, String value, @Nullable String 
description) {
-            fields.add(new DataField(fieldId.incrementAndGet(), name, type, 
description));
+            schemaBuilder.column(name, type, description);
             data.put(name, value);
             return this;
         }
 
         public RichCdcRecord build() {
-            return new RichCdcRecord(new CdcRecord(kind, data), fields);
+
+            return new RichCdcRecord(new CdcRecord(kind, data), 
schemaBuilder.build());
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
index 01a002214e..ba6ecbf4af 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink.cdc;
 
 import org.apache.paimon.types.DataField;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -32,15 +31,19 @@ public class RichEventParser implements 
EventParser<RichCdcRecord> {
 
     private final LinkedHashMap<String, DataField> previousDataFields = new 
LinkedHashMap<>();
 
+    private String previousComment;
+
     @Override
     public void setRawEvent(RichCdcRecord rawEvent) {
         this.record = rawEvent;
     }
 
     @Override
-    public List<DataField> parseSchemaChange() {
-        List<DataField> change = new ArrayList<>();
-        record.fields()
+    public CdcSchema parseSchemaChange() {
+        CdcSchema.Builder change = CdcSchema.newBuilder();
+        CdcSchema recordedSchema = record.cdcSchema();
+        recordedSchema
+                .fields()
                 .forEach(
                         dataField -> {
                             DataField previous = 
previousDataFields.get(dataField.name());
@@ -49,10 +52,15 @@ public class RichEventParser implements 
EventParser<RichCdcRecord> {
                             // so the comparison should not include the ID.
                             if (!DataField.dataFieldEqualsIgnoreId(previous, 
dataField)) {
                                 previousDataFields.put(dataField.name(), 
dataField);
-                                change.add(dataField);
+                                change.column(dataField);
                             }
                         });
-        return change;
+
+        if (recordedSchema.comment() != null && 
!recordedSchema.comment().equals(previousComment)) {
+            previousComment = recordedSchema.comment();
+            change.comment(recordedSchema.comment());
+        }
+        return change.build();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 93e22f1e62..363d747d3b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -38,14 +38,14 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a list of {@link
- * DataField}s.
+ * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a {@link
+ * CdcSchema}.
  *
  * <p>NOTE: To avoid concurrent schema changes, the parallelism of this {@link 
ProcessFunction} must
  * be 1.
  */
 public class UpdatedDataFieldsProcessFunction
-        extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> {
+        extends UpdatedDataFieldsProcessFunctionBase<CdcSchema, Void> {
 
     private final SchemaManager schemaManager;
 
@@ -65,20 +65,23 @@ public class UpdatedDataFieldsProcessFunction
     }
 
     @Override
-    public void processElement(
-            List<DataField> updatedDataFields, Context context, 
Collector<Void> collector)
+    public void processElement(CdcSchema updatedSchema, Context context, 
Collector<Void> collector)
             throws Exception {
         List<DataField> actualUpdatedDataFields =
-                updatedDataFields.stream()
+                updatedSchema.fields().stream()
                         .filter(
                                 dataField ->
                                         !latestDataFieldContain(new 
FieldIdentifier(dataField)))
                         .collect(Collectors.toList());
-        if (CollectionUtils.isEmpty(actualUpdatedDataFields)) {
+        if (CollectionUtils.isEmpty(actualUpdatedDataFields) && 
updatedSchema.comment() == null) {
             return;
         }
-        for (SchemaChange schemaChange :
-                extractSchemaChanges(schemaManager, actualUpdatedDataFields)) {
+        CdcSchema actualUpdatedSchema =
+                new CdcSchema(
+                        actualUpdatedDataFields,
+                        updatedSchema.primaryKeys(),
+                        updatedSchema.comment());
+        for (SchemaChange schemaChange : extractSchemaChanges(schemaManager, 
actualUpdatedSchema)) {
             applySchemaChange(schemaManager, schemaChange, identifier);
         }
         /*
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index e544848036..657b025bf5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -148,6 +148,8 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
             }
         } else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
             catalog.alterTable(identifier, schemaChange, false);
+        } else if (schemaChange instanceof SchemaChange.UpdateComment) {
+            catalog.alterTable(identifier, schemaChange, false);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported schema change class "
@@ -219,8 +221,9 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
     }
 
     protected List<SchemaChange> extractSchemaChanges(
-            SchemaManager schemaManager, List<DataField> updatedDataFields) {
-        RowType oldRowType = schemaManager.latest().get().logicalRowType();
+            SchemaManager schemaManager, CdcSchema updatedSchema) {
+        TableSchema oldTableSchema = schemaManager.latest().get();
+        RowType oldRowType = oldTableSchema.logicalRowType();
         Map<String, DataField> oldFields = new HashMap<>();
         for (DataField oldField : oldRowType.getFields()) {
             oldFields.put(oldField.name(), oldField);
@@ -232,7 +235,7 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                                 TypeMapping.TypeMappingMode.DECIMAL_NO_CHANGE);
 
         List<SchemaChange> result = new ArrayList<>();
-        for (DataField newField : updatedDataFields) {
+        for (DataField newField : updatedSchema.fields()) {
             String newFieldName = 
StringUtils.toLowerCaseIfNeed(newField.name(), caseSensitive);
             if (oldFields.containsKey(newFieldName)) {
                 DataField oldField = oldFields.get(newFieldName);
@@ -268,6 +271,12 @@ public abstract class 
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
                                 newFieldName, newField.type(), 
newField.description(), null));
             }
         }
+
+        if (updatedSchema.comment() != null
+                && !updatedSchema.comment().equals(oldTableSchema.comment())) {
+            // update table comment
+            result.add(SchemaChange.updateComment(updatedSchema.comment()));
+        }
         return result;
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
index 8d071150e0..8167441fb6 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SchemaEvolutionTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -35,7 +36,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.types.BigIntType;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.DoubleType;
@@ -53,123 +53,128 @@ import java.util.List;
 /** Used to test schema evolution related logic. */
 public class SchemaEvolutionTest extends TableTestBase {
 
-    private static List<List<DataField>> prepareData() {
-        List<DataField> upField1 =
-                Arrays.asList(
-                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
-                        new DataField(1, "col_1", new IntType(), "test 
description."),
-                        new DataField(2, "col_2", new IntType(), "test 
description."),
-                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
-                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
-                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
-                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
-                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
-                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
-                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
-                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
-                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
-                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
-                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
-                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
-                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
-                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
-                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
-                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
-                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
-                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
-        List<DataField> upField2 =
-                Arrays.asList(
-                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
-                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
-                        new DataField(2, "col_2", new IntType(), "test 
description."),
-                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
-                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
-                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
-                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
-                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
-                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
-                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
-                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
-                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
-                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
-                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
-                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
-                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
-                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
-                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
-                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
-                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
-                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
-        List<DataField> upField3 =
-                Arrays.asList(
-                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
-                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
-                        new DataField(2, "col_2", new IntType(), "test 
description 2."),
-                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
-                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
-                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
-                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
-                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
-                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
-                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
-                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
-                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
-                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
-                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
-                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
-                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
-                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
-                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
-                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
-                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
-                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
-        List<DataField> upField4 =
-                Arrays.asList(
-                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
-                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
-                        new DataField(2, "col_2", new IntType(), "test 
description."),
-                        new DataField(3, "col_3_1", new VarCharType(), 
"Someone's desc."),
-                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
-                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
-                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
-                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
-                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
-                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
-                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
-                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
-                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
-                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
-                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
-                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
-                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
-                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
-                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
-                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
-                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
-        List<DataField> upField5 =
-                Arrays.asList(
-                        new DataField(0, "col_0", new VarCharType(), "test 
description."),
-                        new DataField(1, "col_1", new BigIntType(), "test 
description."),
-                        new DataField(2, "col_2_1", new BigIntType(), "test 
description 2."),
-                        new DataField(3, "col_3", new VarCharType(), 
"Someone's desc."),
-                        new DataField(4, "col_4", new VarCharType(), 
"Someone's desc."),
-                        new DataField(5, "col_5", new VarCharType(), 
"Someone's desc."),
-                        new DataField(6, "col_6", new DecimalType(), 
"Someone's desc."),
-                        new DataField(7, "col_7", new VarCharType(), 
"Someone's desc."),
-                        new DataField(8, "col_8", new VarCharType(), 
"Someone's desc."),
-                        new DataField(9, "col_9", new VarCharType(), 
"Someone's desc."),
-                        new DataField(10, "col_10", new VarCharType(), 
"Someone's desc."),
-                        new DataField(11, "col_11", new VarCharType(), 
"Someone's desc."),
-                        new DataField(12, "col_12", new DoubleType(), 
"Someone's desc."),
-                        new DataField(13, "col_13", new VarCharType(), 
"Someone's desc."),
-                        new DataField(14, "col_14", new VarCharType(), 
"Someone's desc."),
-                        new DataField(15, "col_15", new VarCharType(), 
"Someone's desc."),
-                        new DataField(16, "col_16", new VarCharType(), 
"Someone's desc."),
-                        new DataField(17, "col_17", new VarCharType(), 
"Someone's desc."),
-                        new DataField(18, "col_18", new VarCharType(), 
"Someone's desc."),
-                        new DataField(19, "col_19", new VarCharType(), 
"Someone's desc."),
-                        new DataField(20, "col_20", new VarCharType(), 
"Someone's desc."));
-        return Arrays.asList(upField1, upField2, upField3, upField4, upField5);
+    private static List<CdcSchema> prepareData() {
+        CdcSchema upSchema1 =
+                CdcSchema.newBuilder()
+                        .column("col_0", new VarCharType(), "test 
description.")
+                        .column("col_1", new IntType(), "test description.")
+                        .column("col_2", new IntType(), "test description.")
+                        .column("col_3", new VarCharType(), "Someone's desc.")
+                        .column("col_4", new VarCharType(), "Someone's desc.")
+                        .column("col_5", new VarCharType(), "Someone's desc.")
+                        .column("col_6", new DecimalType(), "Someone's desc.")
+                        .column("col_7", new VarCharType(), "Someone's desc.")
+                        .column("col_8", new VarCharType(), "Someone's desc.")
+                        .column("col_9", new VarCharType(), "Someone's desc.")
+                        .column("col_10", new VarCharType(), "Someone's desc.")
+                        .column("col_11", new VarCharType(), "Someone's desc.")
+                        .column("col_12", new DoubleType(), "Someone's desc.")
+                        .column("col_13", new VarCharType(), "Someone's desc.")
+                        .column("col_14", new VarCharType(), "Someone's desc.")
+                        .column("col_15", new VarCharType(), "Someone's desc.")
+                        .column("col_16", new VarCharType(), "Someone's desc.")
+                        .column("col_17", new VarCharType(), "Someone's desc.")
+                        .column("col_18", new VarCharType(), "Someone's desc.")
+                        .column("col_19", new VarCharType(), "Someone's desc.")
+                        .column("col_20", new VarCharType(), "Someone's desc.")
+                        .build();
+        CdcSchema upSchema2 =
+                CdcSchema.newBuilder()
+                        .column("col_0", new VarCharType(), "test 
description.")
+                        .column("col_1", new BigIntType(), "test description.")
+                        .column("col_2", new IntType(), "test description.")
+                        .column("col_3", new VarCharType(), "Someone's desc.")
+                        .column("col_4", new VarCharType(), "Someone's desc.")
+                        .column("col_5", new VarCharType(), "Someone's desc.")
+                        .column("col_6", new DecimalType(), "Someone's desc.")
+                        .column("col_7", new VarCharType(), "Someone's desc.")
+                        .column("col_8", new VarCharType(), "Someone's desc.")
+                        .column("col_9", new VarCharType(), "Someone's desc.")
+                        .column("col_10", new VarCharType(), "Someone's desc.")
+                        .column("col_11", new VarCharType(), "Someone's desc.")
+                        .column("col_12", new DoubleType(), "Someone's desc.")
+                        .column("col_13", new VarCharType(), "Someone's desc.")
+                        .column("col_14", new VarCharType(), "Someone's desc.")
+                        .column("col_15", new VarCharType(), "Someone's desc.")
+                        .column("col_16", new VarCharType(), "Someone's desc.")
+                        .column("col_17", new VarCharType(), "Someone's desc.")
+                        .column("col_18", new VarCharType(), "Someone's desc.")
+                        .column("col_19", new VarCharType(), "Someone's desc.")
+                        .column("col_20", new VarCharType(), "Someone's desc.")
+                        .build();
+        CdcSchema upSchema3 =
+                CdcSchema.newBuilder()
+                        .column("col_0", new VarCharType(), "test 
description.")
+                        .column("col_1", new BigIntType(), "test description.")
+                        .column("col_2", new IntType(), "test description 2.")
+                        .column("col_3", new VarCharType(), "Someone's desc.")
+                        .column("col_4", new VarCharType(), "Someone's desc.")
+                        .column("col_5", new VarCharType(), "Someone's desc.")
+                        .column("col_6", new DecimalType(), "Someone's desc.")
+                        .column("col_7", new VarCharType(), "Someone's desc.")
+                        .column("col_8", new VarCharType(), "Someone's desc.")
+                        .column("col_9", new VarCharType(), "Someone's desc.")
+                        .column("col_10", new VarCharType(), "Someone's desc.")
+                        .column("col_11", new VarCharType(), "Someone's desc.")
+                        .column("col_12", new DoubleType(), "Someone's desc.")
+                        .column("col_13", new VarCharType(), "Someone's desc.")
+                        .column("col_14", new VarCharType(), "Someone's desc.")
+                        .column("col_15", new VarCharType(), "Someone's desc.")
+                        .column("col_16", new VarCharType(), "Someone's desc.")
+                        .column("col_17", new VarCharType(), "Someone's desc.")
+                        .column("col_18", new VarCharType(), "Someone's desc.")
+                        .column("col_19", new VarCharType(), "Someone's desc.")
+                        .column("col_20", new VarCharType(), "Someone's desc.")
+                        .build();
+        CdcSchema upSchema4 =
+                CdcSchema.newBuilder()
+                        .column("col_0", new VarCharType(), "test 
description.")
+                        .column("col_1", new BigIntType(), "test description.")
+                        .column("col_2", new IntType(), "test description.")
+                        .column("col_3_1", new VarCharType(), "Someone's 
desc.")
+                        .column("col_4", new VarCharType(), "Someone's desc.")
+                        .column("col_5", new VarCharType(), "Someone's desc.")
+                        .column("col_6", new DecimalType(), "Someone's desc.")
+                        .column("col_7", new VarCharType(), "Someone's desc.")
+                        .column("col_8", new VarCharType(), "Someone's desc.")
+                        .column("col_9", new VarCharType(), "Someone's desc.")
+                        .column("col_10", new VarCharType(), "Someone's desc.")
+                        .column("col_11", new VarCharType(), "Someone's desc.")
+                        .column("col_12", new DoubleType(), "Someone's desc.")
+                        .column("col_13", new VarCharType(), "Someone's desc.")
+                        .column("col_14", new VarCharType(), "Someone's desc.")
+                        .column("col_15", new VarCharType(), "Someone's desc.")
+                        .column("col_16", new VarCharType(), "Someone's desc.")
+                        .column("col_17", new VarCharType(), "Someone's desc.")
+                        .column("col_18", new VarCharType(), "Someone's desc.")
+                        .column("col_19", new VarCharType(), "Someone's desc.")
+                        .column("col_20", new VarCharType(), "Someone's desc.")
+                        .build();
+        CdcSchema upSchema5 =
+                CdcSchema.newBuilder()
+                        .column("col_0", new VarCharType(), "test 
description.")
+                        .column("col_1", new BigIntType(), "test description.")
+                        .column("col_2_1", new BigIntType(), "test description 
2.")
+                        .column("col_3", new VarCharType(), "Someone's desc.")
+                        .column("col_4", new VarCharType(), "Someone's desc.")
+                        .column("col_5", new VarCharType(), "Someone's desc.")
+                        .column("col_6", new DecimalType(), "Someone's desc.")
+                        .column("col_7", new VarCharType(), "Someone's desc.")
+                        .column("col_8", new VarCharType(), "Someone's desc.")
+                        .column("col_9", new VarCharType(), "Someone's desc.")
+                        .column("col_10", new VarCharType(), "Someone's desc.")
+                        .column("col_11", new VarCharType(), "Someone's desc.")
+                        .column("col_12", new DoubleType(), "Someone's desc.")
+                        .column("col_13", new VarCharType(), "Someone's desc.")
+                        .column("col_14", new VarCharType(), "Someone's desc.")
+                        .column("col_15", new VarCharType(), "Someone's desc.")
+                        .column("col_16", new VarCharType(), "Someone's desc.")
+                        .column("col_17", new VarCharType(), "Someone's desc.")
+                        .column("col_18", new VarCharType(), "Someone's desc.")
+                        .column("col_19", new VarCharType(), "Someone's desc.")
+                        .column("col_20", new VarCharType(), "Someone's desc.")
+                        .build();
+        return Arrays.asList(upSchema1, upSchema2, upSchema3, upSchema4, 
upSchema5);
     }
 
     private FileStoreTable table;
@@ -199,7 +204,7 @@ public class SchemaEvolutionTest extends TableTestBase {
     @Test
     public void testSchemaEvolution() throws Exception {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<List<DataField>> upDataFieldStream = 
env.fromCollection(prepareData());
+        DataStream<CdcSchema> upDataFieldStream = 
env.fromCollection(prepareData());
         Options options = new Options();
         options.set("warehouse", tempPath.toString());
         final CatalogLoader catalogLoader = () -> 
FlinkCatalogFactory.createPaimonCatalog(options);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
index 6ebfdb7550..7d8d83ca2e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBaseTest.java
@@ -32,7 +32,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -72,25 +71,13 @@ public class SyncDatabaseActionBaseTest {
         rawData.put("field", "value");
 
         CdcRecord cdcData = new CdcRecord(RowKind.INSERT, rawData);
-        whiteAnyDbCdcRecord =
-                new RichCdcMultiplexRecord(
-                        ANY_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
-        blackAnyDbCdcRecord =
-                new RichCdcMultiplexRecord(
-                        ANY_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
-        whiteCdcRecord =
-                new RichCdcMultiplexRecord(
-                        WHITE_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
-        blackCdcRecord =
-                new RichCdcMultiplexRecord(
-                        BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
-
-        whiteDbBlackTblCdcRecord =
-                new RichCdcMultiplexRecord(
-                        WHITE_DB, BLACK_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
-        blackDbWhiteTblCdcRecord =
-                new RichCdcMultiplexRecord(
-                        BLACK_DB, WHITE_TBL, Arrays.asList(), Arrays.asList(), 
cdcData);
+        whiteAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, WHITE_TBL, 
null, cdcData);
+        blackAnyDbCdcRecord = new RichCdcMultiplexRecord(ANY_DB, BLACK_TBL, 
null, cdcData);
+        whiteCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, WHITE_TBL, null, 
cdcData);
+        blackCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, WHITE_TBL, null, 
cdcData);
+
+        whiteDbBlackTblCdcRecord = new RichCdcMultiplexRecord(WHITE_DB, 
BLACK_TBL, null, cdcData);
+        blackDbWhiteTblCdcRecord = new RichCdcMultiplexRecord(BLACK_DB, 
WHITE_TBL, null, cdcData);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
index 78d3bcc3de..17277d5d7d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParserTest.java
@@ -23,10 +23,10 @@ import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.flink.action.cdc.format.DataFormat;
 import 
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
 import org.apache.paimon.flink.sink.cdc.CdcRecord;
+import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.StringUtils;
 
@@ -40,8 +40,6 @@ import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.nio.file.Files;
@@ -56,21 +54,20 @@ import java.util.Map;
 /** Test for DebeziumBsonRecordParser. */
 public class DebeziumBsonRecordParserTest {
 
-    private static final Logger log = 
LoggerFactory.getLogger(DebeziumBsonRecordParserTest.class);
-    private static List<CdcSourceRecord> insertList = new ArrayList<>();
-    private static List<CdcSourceRecord> updateList = new ArrayList<>();
-    private static List<CdcSourceRecord> deleteList = new ArrayList<>();
+    private static final List<CdcSourceRecord> insertList = new ArrayList<>();
+    private static final List<CdcSourceRecord> updateList = new ArrayList<>();
+    private static final List<CdcSourceRecord> deleteList = new ArrayList<>();
 
-    private static ArrayList<CdcSourceRecord> bsonRecords = new ArrayList<>();
-    private static ArrayList<CdcSourceRecord> jsonRecords = new ArrayList<>();
+    private static final ArrayList<CdcSourceRecord> bsonRecords = new 
ArrayList<>();
+    private static final ArrayList<CdcSourceRecord> jsonRecords = new 
ArrayList<>();
 
-    private static Map<String, String> keyEvent = new HashMap<>();
+    private static final Map<String, String> keyEvent = new HashMap<>();
 
     private static KafkaDeserializationSchema<CdcSourceRecord> 
kafkaDeserializationSchema = null;
 
-    private static Map<String, String> beforeEvent = new HashMap<>();
+    private static final Map<String, String> beforeEvent = new HashMap<>();
 
-    private static Map<String, String> afterEvent = new HashMap<>();
+    private static final Map<String, String> afterEvent = new HashMap<>();
 
     @BeforeAll
     public static void beforeAll() throws Exception {
@@ -241,7 +238,8 @@ public class DebeziumBsonRecordParserTest {
 
             JsonNode bsonTextNode =
                     new 
TextNode(JsonSerdeUtil.writeValueAsString(bsonRecord.getValue()));
-            Map<String, String> resultMap = 
parser.extractRowData(bsonTextNode, RowType.builder());
+            Map<String, String> resultMap =
+                    parser.extractRowData(bsonTextNode, 
CdcSchema.newBuilder());
 
             ObjectNode expectNode = (ObjectNode) jsonRecord.getValue();
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 60aa70c34b..767400d475 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -585,8 +585,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                                 IllegalArgumentException.class,
                                 "Cannot synchronize record when database name 
or table name is unknown. "
                                         + "Invalid record is:\n"
-                                        + "{databaseName=null, tableName=null, 
fields=[`k` STRING, `v0` STRING, `v1` STRING], "
-                                        + "primaryKeys=[], cdcRecord=+I 
{v0=five, k=5, v1=50}}"));
+                                        + "{databaseName=null, tableName=null, 
cdcSchema=Schema{fields=[`k` STRING, `v0` STRING, `v1` STRING], primaryKeys=[], 
comment=null}, cdcRecord=+I {v0=five, k=5, v1=50}}"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index aa7d0199bc..58df6ac36e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -21,15 +21,16 @@ package org.apache.paimon.flink.action.cdc.mysql;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.action.cdc.TypeMapping;
 import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommonTestUtils;
-import org.apache.paimon.utils.JsonSerdeUtil;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestartStrategyOptions;
@@ -91,21 +92,26 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        checkTableSchema(
-                "[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
-                        + "{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"_id\"},"
-                        + 
"{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
+        Schema excepted =
+                Schema.newBuilder()
+                        .comment("")
+                        .column("pt", DataTypes.INT().notNull(), "primary")
+                        .column("_id", DataTypes.INT().notNull(), "_id")
+                        .column("v1", DataTypes.VARCHAR(10), "v1")
+                        .build();
+        checkTableSchema(excepted);
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionImpl(statement);
         }
     }
 
-    private void checkTableSchema(String excepted) throws Exception {
-
+    private void checkTableSchema(Schema excepted) throws Exception {
         FileStoreTable table = getFileStoreTable();
 
-        
assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields())).isEqualTo(excepted);
+        TableSchema schema = table.schema();
+        assertThat(schema.fields()).isEqualTo(excepted.fields());
+        assertThat(schema.comment()).isEqualTo(excepted.comment());
     }
 
     private void testSchemaEvolutionImpl(Statement statement) throws Exception 
{
@@ -267,11 +273,15 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         .build();
         runActionWithDefaultEnv(action);
 
-        checkTableSchema(
-                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
-                        + 
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},"
-                        + 
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},"
-                        + 
"{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
+        Schema excepted =
+                Schema.newBuilder()
+                        .comment("")
+                        .column("_id", DataTypes.INT().notNull(), "primary")
+                        .column("v1", DataTypes.VARCHAR(10), "v1")
+                        .column("v2", DataTypes.INT(), "v2")
+                        .column("v3", DataTypes.VARCHAR(10), "v3")
+                        .build();
+        checkTableSchema(excepted);
 
         try (Statement statement = getStatement()) {
             testSchemaEvolutionMultipleImpl(statement);
@@ -387,6 +397,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         FileStoreTable table = getFileStoreTable();
         assertThat(table.comment()).hasValue("schema_evolution_comment");
         statement.executeUpdate("USE " + DATABASE_NAME);
+        // alter table comment
+        statement.executeUpdate("ALTER TABLE schema_evolution_comment COMMENT 
'table_comment_new'");
+
         statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES 
(1, 'one')");
 
         RowType rowType =
@@ -414,10 +427,14 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]", 
"+I[3, three, 30]");
         waitForResult(expected, table, rowType, primaryKeys);
 
-        checkTableSchema(
-                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
-                        + 
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"},"
-                        + 
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]");
+        Schema excepted =
+                Schema.newBuilder()
+                        .comment("table_comment_new")
+                        .column("_id", DataTypes.INT().notNull(), "primary")
+                        .column("v1", DataTypes.VARCHAR(20), "v1-new")
+                        .column("v2", DataTypes.INT(), "v2")
+                        .build();
+        checkTableSchema(excepted);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
index b202ca53c9..57ca081d5c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordSerializeITCase.java
@@ -40,7 +40,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,20 +55,19 @@ public class CdcRecordSerializeITCase {
     public void testCdcRecordKryoSerialize() throws Exception {
         KryoSerializer<RichCdcMultiplexRecord> kr =
                 createFlinkKryoSerializer(RichCdcMultiplexRecord.class);
-        RowType.Builder rowType = RowType.builder();
-        rowType.field("id", new BigIntType());
-        rowType.field("name", new VarCharType());
-        rowType.field("pt", new VarCharType());
-        // this is an unmodifiable list.
-        List<DataField> fields = rowType.build().getFields();
-        List<String> primaryKeys = Collections.singletonList("id");
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+        schemaBuilder.column("id", new BigIntType());
+        schemaBuilder.column("name", new VarCharType());
+        schemaBuilder.column("pt", new VarCharType());
+        schemaBuilder.primaryKey("id");
+        CdcSchema schema = schemaBuilder.build();
         Map<String, String> recordData = new HashMap<>();
         recordData.put("id", "1");
         recordData.put("name", "HunterXHunter");
         recordData.put("pt", "2024-06-28");
         CdcRecord cdcRecord = new CdcRecord(RowKind.INSERT, recordData);
         RichCdcMultiplexRecord serializeRecord =
-                new RichCdcMultiplexRecord("default", "T", fields, 
primaryKeys, cdcRecord);
+                new RichCdcMultiplexRecord("default", "T", schema, cdcRecord);
 
         TestOutputView outputView = new TestOutputView();
         kr.serialize(serializeRecord, outputView);
@@ -77,8 +75,7 @@ public class CdcRecordSerializeITCase {
         
assertThat(deserializeRecord.toRichCdcRecord().toCdcRecord()).isEqualTo(cdcRecord);
         assertThat(deserializeRecord.databaseName()).isEqualTo("default");
         assertThat(deserializeRecord.tableName()).isEqualTo("T");
-        assertThat(deserializeRecord.primaryKeys()).isEqualTo(primaryKeys);
-        assertThat(deserializeRecord.fields()).isEqualTo(fields);
+        assertThat(deserializeRecord.cdcSchema()).isEqualTo(schema);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
index 01d766edf1..5b964ebe6b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEvent.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataField;
-
 import java.io.Serializable;
 import java.util.List;
 
@@ -29,20 +27,20 @@ public class TestCdcEvent implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final String tableName;
-    private final List<DataField> updatedDataFields;
+    private final CdcSchema cdcSchema;
     private final List<CdcRecord> records;
     private final int keyHash;
 
-    public TestCdcEvent(String tableName, List<DataField> updatedDataFields) {
+    public TestCdcEvent(String tableName, CdcSchema updatedSchema) {
         this.tableName = tableName;
-        this.updatedDataFields = updatedDataFields;
+        this.cdcSchema = updatedSchema;
         this.records = null;
         this.keyHash = 0;
     }
 
     public TestCdcEvent(String tableName, List<CdcRecord> records, int 
keyHash) {
         this.tableName = tableName;
-        this.updatedDataFields = null;
+        this.cdcSchema = null;
         this.records = records;
         this.keyHash = keyHash;
     }
@@ -51,8 +49,8 @@ public class TestCdcEvent implements Serializable {
         return tableName;
     }
 
-    public List<DataField> updatedDataFields() {
-        return updatedDataFields;
+    public CdcSchema cdcSchema() {
+        return cdcSchema;
     }
 
     public List<CdcRecord> records() {
@@ -67,7 +65,6 @@ public class TestCdcEvent implements Serializable {
     @Override
     public String toString() {
         return String.format(
-                "{tableName = %s, updatedDataFields = %s, records = %s}",
-                tableName, updatedDataFields, records);
+                "{tableName = %s, schema = %s, records = %s}", tableName, 
cdcSchema, records);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
index 204f8536d9..269907168e 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestCdcEventParser.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.ObjectUtils;
 
 import java.util.Collections;
@@ -40,8 +39,8 @@ public class TestCdcEventParser implements 
EventParser<TestCdcEvent> {
     }
 
     @Override
-    public List<DataField> parseSchemaChange() {
-        return ObjectUtils.coalesce(raw.updatedDataFields(), 
Collections.emptyList());
+    public CdcSchema parseSchemaChange() {
+        return raw.cdcSchema();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
index 6a38c1c265..2601e4fc3b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/TestTable.java
@@ -112,7 +112,7 @@ public class TestTable {
                     fieldNames.add(newName);
                     isBigInt.add(false);
                 }
-                events.add(new TestCdcEvent(tableName, 
currentDataFieldList(fieldNames, isBigInt)));
+                events.add(new TestCdcEvent(tableName, 
currentSchema(fieldNames, isBigInt)));
             } else {
                 Map<String, String> data = new HashMap<>();
                 int key = random.nextInt(numKeys);
@@ -158,23 +158,22 @@ public class TestTable {
         }
     }
 
-    private List<DataField> currentDataFieldList(List<String> fieldNames, 
List<Boolean> isBigInt) {
-        List<DataField> fields = new ArrayList<>();
+    private CdcSchema currentSchema(List<String> fieldNames, List<Boolean> 
isBigInt) {
+        CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
 
         // pt
-        fields.add(initialRowType.getFields().get(0));
+        DataField ptField = initialRowType.getFields().get(0);
+        schemaBuilder.column(ptField.name(), ptField.type(), 
ptField.description());
         // k
-        fields.add(initialRowType.getFields().get(1));
+        DataField pkField = initialRowType.getFields().get(1);
+        schemaBuilder.column(pkField.name(), pkField.type(), 
pkField.description());
 
         for (int i = 0; i < fieldNames.size(); i++) {
-            fields.add(
-                    new DataField(
-                            2 + i,
-                            fieldNames.get(i),
-                            isBigInt.get(i) ? DataTypes.BIGINT() : 
DataTypes.INT()));
+            schemaBuilder.column(
+                    fieldNames.get(i), isBigInt.get(i) ? DataTypes.BIGINT() : 
DataTypes.INT());
         }
 
-        return fields;
+        return schemaBuilder.build();
     }
 
     public RowType initialRowType() {

Reply via email to