This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ef9e214468 [cdc] Support computed columns when sync_database (#5816)
ef9e214468 is described below

commit ef9e214468d55b98dbe95d8e2e9c569b3e32ccfe
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Jul 1 16:21:35 2025 +0800

    [cdc] Support computed columns when sync_database (#5816)
---
 docs/content/cdc-ingestion/kafka-cdc.md            |   4 +-
 .../shortcodes/generated/kafka_sync_database.html  |   2 +-
 .../paimon/flink/action/cdc/ComputedColumn.java    |  14 ++
 .../flink/action/cdc/ComputedColumnUtils.java      |  91 +++++++++++-
 .../apache/paimon/flink/action/cdc/Expression.java | 154 +++++++++++++++++----
 .../action/cdc/format/AbstractRecordParser.java    |  17 ++-
 .../apache/paimon/flink/sink/cdc/CdcSchema.java    |  20 ++-
 .../flink/action/cdc/ComputedColumnUtilsTest.java  |  61 ++++++++
 .../kafka/KafkaCanalSyncDatabaseActionITCase.java  |  26 +++-
 9 files changed, 345 insertions(+), 44 deletions(-)

diff --git a/docs/content/cdc-ingestion/kafka-cdc.md 
b/docs/content/cdc-ingestion/kafka-cdc.md
index dfad34fad3..7ca3b2728c 100644
--- a/docs/content/cdc-ingestion/kafka-cdc.md
+++ b/docs/content/cdc-ingestion/kafka-cdc.md
@@ -214,6 +214,7 @@ To use this feature through `flink run`, run the following 
shell command.
     [--type_mapping to-string] \
     [--partition_keys <partition_keys>] \
     [--primary_keys <primary-keys>] \
+    [--computed_column <'column-name=expr-name(args[, ...])'> 
[--computed_column ...]] \
     [--kafka_conf <kafka-source-conf> [--kafka_conf <kafka-source-conf> ...]] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]] \
     [--table_conf <paimon-table-sink-conf> [--table_conf 
<paimon-table-sink-conf> ...]]
@@ -244,7 +245,8 @@ Synchronization from one Kafka topic to Paimon database.
     --catalog_conf uri=thrift://hive-metastore:9083 \
     --table_conf bucket=4 \
     --table_conf changelog-producer=input \
-    --table_conf sink.parallelism=4
+    --table_conf sink.parallelism=4 \
+    --computed_column 'pt=date_format(event_tm, yyyyMMdd)'
 ```
 
 Synchronization from multiple Kafka topics to Paimon database.
diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html 
b/docs/layouts/shortcodes/generated/kafka_sync_database.html
index d24b7cf25a..4b6ee3e38d 100644
--- a/docs/layouts/shortcodes/generated/kafka_sync_database.html
+++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html
@@ -92,7 +92,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>--computed_column</h5></td>
-        <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. </td>
+        <td>The definitions of computed columns. The argument field is from 
Kafka topic's table field name. See <a 
href="../overview/#computed-functions">here</a> for a complete list of 
configurations. NOTICE: It returns null if the referenced column does not exist 
in the source table.</td>
     </tr>
     <tr>
         <td><h5>--eager_init</h5></td>
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index 5e9041a120..dddb909cd7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -53,6 +53,11 @@ public class ComputedColumn implements Serializable {
         return expression.fieldReference();
     }
 
+    @Nullable
+    public DataType fieldReferenceType() {
+        return expression.fieldReferenceType();
+    }
+
     /** Compute column's value from given argument. Return null if input is 
null. */
     @Nullable
     public String eval(@Nullable String input) {
@@ -61,4 +66,13 @@ public class ComputedColumn implements Serializable {
         }
         return expression.eval(input);
     }
+
+    /** Compute column's value from given argument. Return null if input is 
null. */
+    @Nullable
+    public String eval(@Nullable String input, DataType inputType) {
+        if (fieldReference() != null && input == null) {
+            return null;
+        }
+        return expression.eval(input, inputType);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 24ca0599bf..28fbef9455 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -18,13 +18,18 @@
 
 package org.apache.paimon.flink.action.cdc;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -75,6 +80,90 @@ public class ComputedColumnUtils {
                             Expression.create(typeMapping, caseSensitive, 
exprName, args)));
         }
 
-        return computedColumns;
+        return sortComputedColumns(computedColumns);
+    }
+
+    @VisibleForTesting
+    public static List<ComputedColumn> 
sortComputedColumns(List<ComputedColumn> columns) {
+        Set<String> columnNames = new HashSet<>();
+        for (ComputedColumn col : columns) {
+            columnNames.add(col.columnName());
+        }
+
+        // For simple processing, no reference or referring to another 
computed column, means
+        // independent
+        List<ComputedColumn> independent = new ArrayList<>();
+        List<ComputedColumn> dependent = new ArrayList<>();
+
+        for (ComputedColumn col : columns) {
+            if (col.fieldReference() == null || 
!columnNames.contains(col.fieldReference())) {
+                independent.add(col);
+            } else {
+                dependent.add(col);
+            }
+        }
+
+        // Sort dependent columns with topological sort
+        Map<String, ComputedColumn> columnMap = new HashMap<>();
+        Map<String, Set<String>> reverseDependencies = new HashMap<>();
+
+        for (ComputedColumn col : dependent) {
+            columnMap.put(col.columnName(), col);
+            reverseDependencies
+                    .computeIfAbsent(col.fieldReference(), k -> new 
HashSet<>())
+                    .add(col.columnName());
+        }
+
+        List<ComputedColumn> sortedDependent = new ArrayList<>();
+        Set<String> visited = new HashSet<>();
+        Set<String> tempMark = new HashSet<>(); // For cycle detection
+
+        for (ComputedColumn col : dependent) {
+            if (!visited.contains(col.columnName())) {
+                dfs(
+                        col.columnName(),
+                        reverseDependencies,
+                        columnMap,
+                        sortedDependent,
+                        visited,
+                        tempMark);
+            }
+        }
+
+        Collections.reverse(sortedDependent);
+
+        // Independent should precede dependent
+        List<ComputedColumn> result = new ArrayList<>();
+        result.addAll(independent);
+        result.addAll(sortedDependent);
+
+        return result;
+    }
+
+    private static void dfs(
+            String node,
+            Map<String, Set<String>> reverseDependencies,
+            Map<String, ComputedColumn> columnMap,
+            List<ComputedColumn> sorted,
+            Set<String> visited,
+            Set<String> tempMark) {
+        if (tempMark.contains(node)) {
+            throw new IllegalArgumentException("Cycle detected: " + node);
+        }
+        if (visited.contains(node)) {
+            return;
+        }
+
+        tempMark.add(node);
+        ComputedColumn current = columnMap.get(node);
+
+        // Process the dependencies
+        for (String dependent : reverseDependencies.getOrDefault(node, 
Collections.emptySet())) {
+            dfs(dependent, reverseDependencies, columnMap, sorted, visited, 
tempMark);
+        }
+
+        tempMark.remove(node);
+        visited.add(node);
+        sorted.add(current);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 087fe15e67..9fdc4606a0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -24,6 +24,7 @@ import org.apache.paimon.types.DataTypeFamily;
 import org.apache.paimon.types.DataTypeJsonParser;
 import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SerializableSupplier;
 import org.apache.paimon.utils.StringUtils;
@@ -50,12 +51,18 @@ public interface Expression extends Serializable {
     /** Return name of referenced field. */
     String fieldReference();
 
+    /** Return {@link DataType} of referenced field. */
+    DataType fieldReferenceType();
+
     /** Return {@link DataType} of computed value. */
     DataType outputType();
 
     /** Compute value from given input. Input and output are serialized to 
string. */
     String eval(String input);
 
+    /** Compute value from given input. Input and output are serialized to 
string. */
+    String eval(String input, DataType inputType);
+
     /** Return name of this expression. */
     default String name() {
         return null;
@@ -66,7 +73,7 @@ public interface Expression extends Serializable {
         YEAR(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -76,7 +83,7 @@ public interface Expression extends Serializable {
         MONTH(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -86,7 +93,7 @@ public interface Expression extends Serializable {
         DAY(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -96,7 +103,7 @@ public interface Expression extends Serializable {
         HOUR(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -106,7 +113,7 @@ public interface Expression extends Serializable {
         MINUTE(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -116,7 +123,7 @@ public interface Expression extends Serializable {
         SECOND(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return TemporalToIntConverter.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -126,7 +133,7 @@ public interface Expression extends Serializable {
         DATE_FORMAT(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return DateFormat.create(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -135,13 +142,13 @@ public interface Expression extends Serializable {
         SUBSTRING(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return substring(referencedField.field(), 
referencedField.literals());
                 }),
         TRUNCATE(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return truncate(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -152,7 +159,7 @@ public interface Expression extends Serializable {
         UPPER(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return new UpperExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -161,7 +168,7 @@ public interface Expression extends Serializable {
         LOWER(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return new LowerExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -170,7 +177,7 @@ public interface Expression extends Serializable {
         TRIM(
                 (typeMapping, caseSensitive, args) -> {
                     ReferencedField referencedField =
-                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                            ReferencedField.create(typeMapping, caseSensitive, 
args);
                     return new TrimExpression(
                             referencedField.field(),
                             referencedField.fieldType(),
@@ -208,16 +215,16 @@ public interface Expression extends Serializable {
     /** Referenced field in expression input parameters. */
     class ReferencedField {
         private final String field;
-        private final DataType fieldType;
+        @Nullable private final DataType fieldType;
         private final String[] literals;
 
-        private ReferencedField(String field, DataType fieldType, String[] 
literals) {
+        private ReferencedField(String field, @Nullable DataType fieldType, 
String[] literals) {
             this.field = field;
             this.fieldType = fieldType;
             this.literals = literals;
         }
 
-        public static ReferencedField checkArgument(
+        public static ReferencedField create(
                 Map<String, DataType> typeMapping, boolean caseSensitive, 
String... args) {
             String referencedField = args[0].trim();
             String[] literals =
@@ -226,11 +233,13 @@ public interface Expression extends Serializable {
                     StringUtils.toLowerCaseIfNeed(referencedField, 
caseSensitive);
 
             DataType fieldType =
-                    checkNotNull(
-                            typeMapping.get(referencedFieldCheckForm),
-                            String.format(
-                                    "Referenced field '%s' is not in given 
fields: %s.",
-                                    referencedFieldCheckForm, 
typeMapping.keySet()));
+                    typeMapping.isEmpty()
+                            ? null
+                            : checkNotNull(
+                                    typeMapping.get(referencedFieldCheckForm),
+                                    String.format(
+                                            "Referenced field '%s' is not in 
given fields: %s.",
+                                            referencedFieldCheckForm, 
typeMapping.keySet()));
             return new ReferencedField(referencedField, fieldType, literals);
         }
 
@@ -326,16 +335,22 @@ public interface Expression extends Serializable {
         private static final List<Integer> SUPPORTED_PRECISION = 
Arrays.asList(0, 3, 6, 9);
 
         private final String fieldReference;
-        @Nullable private final Integer precision;
+        @Nullable private DataType fieldReferenceType;
+        @Nullable private Integer precision;
 
         private transient Function<LocalDateTime, T> converter;
 
         private TemporalExpressionBase(
-                String fieldReference, DataType fieldType, @Nullable Integer 
precision) {
+                String fieldReference, @Nullable DataType fieldType, @Nullable 
Integer precision) {
             this.fieldReference = fieldReference;
+            this.fieldReferenceType = fieldType;
 
             // when the input is INTEGER_NUMERIC, the precision must be set
-            if 
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+            if (fieldType != null
+                    && fieldType
+                            .getTypeRoot()
+                            .getFamilies()
+                            .contains(DataTypeFamily.INTEGER_NUMERIC)
                     && precision == null) {
                 precision = 0;
             }
@@ -354,6 +369,11 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
+        @Override
+        public DataType fieldReferenceType() {
+            return fieldReferenceType;
+        }
+
         /** If not, this must be overridden! */
         @Override
         public DataType outputType() {
@@ -370,6 +390,21 @@ public interface Expression extends Serializable {
             return String.valueOf(result);
         }
 
+        @Override
+        public String eval(String input, DataType inputType) {
+            if (this.fieldReferenceType == null) {
+                this.fieldReferenceType = inputType;
+
+                // when the input is INTEGER_NUMERIC, the precision must be set
+                if 
(inputType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
+                        && precision == null) {
+                    this.precision = 0;
+                }
+            }
+
+            return eval(input);
+        }
+
         private LocalDateTime toLocalDateTime(String input) {
             if (precision == null) {
                 return DateTimeUtils.toLocalDateTime(input, 9);
@@ -425,7 +460,7 @@ public interface Expression extends Serializable {
 
         private static TemporalToIntConverter create(
                 String fieldReference,
-                DataType fieldType,
+                @Nullable DataType fieldType,
                 SerializableSupplier<Function<LocalDateTime, Integer>> 
converterSupplier,
                 String... literals) {
             checkArgument(
@@ -504,6 +539,11 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
+        @Override
+        public DataType fieldReferenceType() {
+            return new VarCharType();
+        }
+
         @Override
         public DataType outputType() {
             return DataTypes.STRING();
@@ -524,6 +564,11 @@ public interface Expression extends Serializable {
                                 input, beginInclusive, endExclusive));
             }
         }
+
+        @Override
+        public String eval(String input, DataType inputType) {
+            return eval(input);
+        }
     }
 
     /** Truncate numeric/decimal/string value. */
@@ -532,11 +577,11 @@ public interface Expression extends Serializable {
 
         private final String fieldReference;
 
-        private final DataType fieldType;
+        @Nullable private DataType fieldType;
 
         private final int width;
 
-        TruncateComputer(String fieldReference, DataType fieldType, String 
literal) {
+        TruncateComputer(String fieldReference, @Nullable DataType fieldType, 
String literal) {
             this.fieldReference = fieldReference;
             this.fieldType = fieldType;
             try {
@@ -554,6 +599,11 @@ public interface Expression extends Serializable {
             return fieldReference;
         }
 
+        @Override
+        public DataType fieldReferenceType() {
+            return fieldType;
+        }
+
         @Override
         public DataType outputType() {
             return fieldType;
@@ -588,6 +638,14 @@ public interface Expression extends Serializable {
             }
         }
 
+        @Override
+        public String eval(String input, DataType inputType) {
+            if (this.fieldType == null) {
+                this.fieldType = inputType;
+            }
+            return eval(input);
+        }
+
         private short truncateShort(int width, short value) {
             return (short) (value - (((value % width) + width) % width));
         }
@@ -632,6 +690,11 @@ public interface Expression extends Serializable {
             return null;
         }
 
+        @Override
+        public DataType fieldReferenceType() {
+            return null;
+        }
+
         @Override
         public DataType outputType() {
             return dataType;
@@ -641,6 +704,11 @@ public interface Expression extends Serializable {
         public String eval(String input) {
             return value;
         }
+
+        @Override
+        public String eval(String input, DataType inputType) {
+            return value;
+        }
     }
 
     /** Get current timestamp. */
@@ -650,6 +718,11 @@ public interface Expression extends Serializable {
             return null;
         }
 
+        @Override
+        public DataType fieldReferenceType() {
+            return null;
+        }
+
         @Override
         public DataType outputType() {
             return DataTypes.TIMESTAMP(3);
@@ -659,6 +732,11 @@ public interface Expression extends Serializable {
         public String eval(String input) {
             return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
         }
+
+        @Override
+        public String eval(String input, DataType inputType) {
+            return eval(input);
+        }
     }
 
     /** Convert string to upper case. */
@@ -719,12 +797,14 @@ public interface Expression extends Serializable {
     abstract class NoLiteralsStringExpressionBase implements Expression {
 
         private final String fieldReference;
+        @Nullable protected DataType fieldReferenceType;
 
         public NoLiteralsStringExpressionBase(
-                String fieldReference, DataType fieldType, String... literals) 
{
+                String fieldReference, @Nullable DataType fieldType, String... 
literals) {
             this.fieldReference = fieldReference;
+            this.fieldReferenceType = fieldType;
             checkArgument(
-                    fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
+                    fieldType == null || fieldType.getTypeRoot() == 
DataTypeRoot.VARCHAR,
                     String.format(
                             "'%s' expression only supports type root of '%s', 
but found '%s'.",
                             name(), DataTypeRoot.VARCHAR, 
fieldType.getTypeRoot()));
@@ -744,5 +824,23 @@ public interface Expression extends Serializable {
         public String fieldReference() {
             return fieldReference;
         }
+
+        @Override
+        public DataType fieldReferenceType() {
+            return fieldReferenceType;
+        }
+
+        @Override
+        public String eval(String input, DataType inputType) {
+            if (this.fieldReferenceType == null) {
+                checkArgument(
+                        inputType.getTypeRoot() == DataTypeRoot.VARCHAR,
+                        String.format(
+                                "'%s' expression only supports type root of 
'%s', but found '%s'.",
+                                name(), DataTypeRoot.VARCHAR, 
inputType.getTypeRoot()));
+                this.fieldReferenceType = inputType;
+            }
+            return eval(input);
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 1834444afa..8008cd7ca9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
 import org.apache.paimon.flink.sink.cdc.CdcSchema;
 import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -103,9 +104,19 @@ public abstract class AbstractRecordParser
             Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
         computedColumns.forEach(
                 computedColumn -> {
-                    rowData.put(
-                            computedColumn.columnName(),
-                            
computedColumn.eval(rowData.get(computedColumn.fieldReference())));
+                    String result;
+                    if (computedColumn.fieldReference() != null
+                            && computedColumn.fieldReferenceType() == null) {
+                        DataType inputType =
+                                
schemaBuilder.getFieldType(computedColumn.fieldReference());
+                        result =
+                                computedColumn.eval(
+                                        
rowData.get(computedColumn.fieldReference()), inputType);
+                    } else {
+                        result = 
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
+                    }
+
+                    rowData.put(computedColumn.columnName(), result);
                     schemaBuilder.column(computedColumn.columnName(), 
computedColumn.columnType());
                 });
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
index b5199d3a0b..7a615fdbc8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -28,7 +28,9 @@ import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -100,7 +102,7 @@ public class CdcSchema implements Serializable {
     /** A builder for constructing an immutable but still unresolved {@link 
CdcSchema}. */
     public static final class Builder {
 
-        private final List<DataField> columns = new ArrayList<>();
+        private final Map<String, DataField> columns = new LinkedHashMap<>();
 
         private List<String> primaryKeys = new ArrayList<>();
 
@@ -121,7 +123,7 @@ public class CdcSchema implements Serializable {
             Preconditions.checkNotNull(dataField, "Data field must not be 
null.");
             Preconditions.checkNotNull(dataField.name(), "Column name must not 
be null.");
             Preconditions.checkNotNull(dataField.type(), "Data type must not 
be null.");
-            columns.add(dataField);
+            columns.put(dataField.name(), dataField);
             return this;
         }
 
@@ -148,7 +150,7 @@ public class CdcSchema implements Serializable {
 
             int id = highestFieldId.incrementAndGet();
             DataType reassignDataType = ReassignFieldId.reassign(dataType, 
highestFieldId);
-            columns.add(new DataField(id, columnName, reassignDataType, 
description));
+            columns.put(columnName, new DataField(id, columnName, 
reassignDataType, description));
             return this;
         }
 
@@ -179,9 +181,19 @@ public class CdcSchema implements Serializable {
             return this;
         }
 
+        /** Returns the data type of the specified field. */
+        public DataType getFieldType(String fieldName) {
+            DataField field = columns.get(fieldName);
+            if (field == null) {
+                throw new IllegalArgumentException("Field " + fieldName + " 
not found in schema.");
+            }
+            return field.type();
+        }
+
         /** Returns an instance of an unresolved {@link CdcSchema}. */
         public CdcSchema build() {
-            return new CdcSchema(columns, primaryKeys, comment);
+            List<DataField> fields = new ArrayList<>(columns.values());
+            return new CdcSchema(fields, primaryKeys, comment);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
new file mode 100644
index 0000000000..5ab3f48817
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.sortComputedColumns;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for ComputedColumnUtils. */
+public class ComputedColumnUtilsTest {
+    @Test
+    public void testSortComputedColumns() {
+        List<ComputedColumn> columns =
+                Arrays.asList(
+                        new ComputedColumn("A", Expression.substring("B", 
"1")),
+                        new ComputedColumn("B", 
Expression.substring("ExistedColumn", "1")),
+                        new ComputedColumn("C", Expression.cast("No 
Reference")),
+                        new ComputedColumn("D", Expression.substring("A", 
"1")),
+                        new ComputedColumn("E", Expression.substring("C", 
"1")));
+
+        List<ComputedColumn> sortedColumns = sortComputedColumns(columns);
+        assertEquals(
+                Arrays.asList("B", "C", "E", "A", "D"),
+                sortedColumns.stream()
+                        .map(ComputedColumn::columnName)
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void testCycleReference() {
+        List<ComputedColumn> columns =
+                Arrays.asList(
+                        new ComputedColumn("A", Expression.substring("B", 
"1")),
+                        new ComputedColumn("B", Expression.substring("C", 
"1")),
+                        new ComputedColumn("C", Expression.substring("A", 
"1")));
+
+        assertThrows(IllegalArgumentException.class, () -> 
sortComputedColumns(columns));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 53daeebc25..5bd1ffbfa7 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.kafka;
 
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.options.CatalogOptions;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Timeout;
 
 import javax.annotation.Nullable;
 
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -660,7 +662,10 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                         .withTableConfig(getBasicTableConfig())
                         .withPrimaryKeys("k")
                         .withComputedColumnArgs(
-                                Arrays.asList("etl_create_time=now()", 
"etl_update_time=now()"))
+                                Arrays.asList(
+                                        "etl_create_time=now()",
+                                        "etl_update_time=now()",
+                                        
"pt=date_format(etl_update_time,yyyy-MM-dd)"))
                         .build();
         runActionWithDefaultEnv(action);
 
@@ -675,15 +680,16 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
                             DataTypes.INT().notNull(),
                             DataTypes.VARCHAR(10),
                             DataTypes.TIMESTAMP(3),
-                            DataTypes.TIMESTAMP(3)
+                            DataTypes.TIMESTAMP(3),
+                            DataTypes.STRING()
                         },
-                        new String[] {"k", "v1", "etl_create_time", 
"etl_update_time"});
+                        new String[] {"k", "v1", "etl_create_time", 
"etl_update_time", "pt"});
 
         // INSERT
         waitForResult(
                 true,
                 Collections.singletonList(
-                        "\\+I\\[1, A, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                        "\\+I\\[1, A, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
                 table1,
                 rowType1,
                 Collections.singletonList("k"));
@@ -691,9 +697,13 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         List<InternalRow> data = getData("t1");
         Timestamp createTime1 = data.get(0).getTimestamp(2, 3);
         Timestamp updateTime1 = data.get(0).getTimestamp(3, 3);
+        BinaryString pt1 = data.get(0).getString(4);
+
+        DateTimeFormatter ptFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
 
         
assertThat(createTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
         
assertThat(updateTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+        
assertThat(updateTime1.toLocalDateTime().format(ptFormatter)).isEqualTo(pt1.toString());
 
         Thread.sleep(1000);
 
@@ -702,7 +712,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(
                 true,
                 Collections.singletonList(
-                        "\\+I\\[1, B, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                        "\\+I\\[1, B, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
                 table1,
                 rowType1,
                 Collections.singletonList("k"));
@@ -710,10 +720,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         data = getData("t1");
         Timestamp createTime2 = data.get(0).getTimestamp(2, 3);
         Timestamp updateTime2 = data.get(0).getTimestamp(3, 3);
+        BinaryString pt2 = data.get(0).getString(4);
 
         
assertThat(createTime2.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
         
assertThat(updateTime2.toLocalDateTime()).isAfter(updateTime1.toLocalDateTime());
         
assertThat(updateTime2.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+        
assertThat(updateTime2.toLocalDateTime().format(ptFormatter)).isEqualTo(pt2.toString());
 
         Thread.sleep(1000);
 
@@ -722,7 +734,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         waitForResult(
                 true,
                 Collections.singletonList(
-                        "\\+I\\[1, C, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+                        "\\+I\\[1, C, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, 
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}, \\d{4}-\\d{2}-\\d{2}\\]"),
                 table1,
                 rowType1,
                 Collections.singletonList("k"));
@@ -730,10 +742,12 @@ public class KafkaCanalSyncDatabaseActionITCase extends 
KafkaActionITCaseBase {
         data = getData("t1");
         Timestamp createTime3 = data.get(0).getTimestamp(2, 3);
         Timestamp updateTime3 = data.get(0).getTimestamp(3, 3);
+        BinaryString pt3 = data.get(0).getString(4);
 
         
assertThat(createTime3.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
         
assertThat(updateTime3.toLocalDateTime()).isAfter(updateTime2.toLocalDateTime());
         
assertThat(updateTime3.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+        
assertThat(updateTime3.toLocalDateTime().format(ptFormatter)).isEqualTo(pt3.toString());
     }
 
     @Test


Reply via email to