This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 10b8dfa496 [cdc] support several basic computed-column expressions in 
cdc ingestion. (#5620)
10b8dfa496 is described below

commit 10b8dfa496bf3a33b2ea7398b7d175e651a38616
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon May 19 15:34:41 2025 +0800

    [cdc] support several basic computed-column expressions in cdc ingestion. 
(#5620)
---
 .../shortcodes/generated/other_functions.html      |  14 ++-
 .../java/org/apache/paimon/utils/StringUtils.java  |  14 +++
 .../apache/paimon/flink/action/cdc/Expression.java | 120 ++++++++++++++++++++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |  25 +++--
 .../src/test/resources/mysql/sync_table_setup.sql  |   1 +
 5 files changed, 165 insertions(+), 9 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/other_functions.html 
b/docs/layouts/shortcodes/generated/other_functions.html
index 2135fc8172..808e0a4c45 100644
--- a/docs/layouts/shortcodes/generated/other_functions.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -35,7 +35,7 @@ under the License.
     </tr>
     <tr>
         <td><h5>truncate(column,width)</h5></td>
-        <td>truncate column by width. Output type is same with column.If the 
column is a STRING, truncate(column,width) will truncate the string to width 
characters, namely `value.substring(0, width)`.
+        <td>Truncate column by width. Output type is the same with column. If 
the column is a STRING, truncate(column,width) will truncate the string to 
width characters, namely `value.substring(0, width)`.
              If the column is an INT or LONG, truncate(column,width) will 
truncate the number with the algorithm `v - (((v % W) + W) % W)`. The 
`redundant` compute part is to keep the result always positive.
              If the column is a DECIMAL, truncate(column,width) will truncate 
the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then 
return `v - (v % scaled_W)`.</td>
     </tr>
@@ -43,5 +43,17 @@ under the License.
         <td><h5>cast(value,dataType)</h5></td>
         <td>Get a constant value. The output is an atomic type, such as 
STRING, INT, BOOLEAN, etc.</td>
     </tr>
+    <tr>
+        <td><h5>upper(value)</h5></td>
+        <td>Convert string column to upper case. The input should be a STRING 
and the output is a STRING.</td>
+    </tr>
+    <tr>
+        <td><h5>lower(value)</h5></td>
+        <td>Convert string column to lower case. The input should be a STRING 
and the output is a STRING.</td>
+    </tr>
+    <tr>
+        <td><h5>trim(value)</h5></td>
+        <td>Trim string column. The input should be a STRING and the output is 
a STRING.</td>
+    </tr>
     </tbody>
 </table>
\ No newline at end of file
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
index 57a14fa4f2..140d0750a5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -530,4 +530,18 @@ public class StringUtils {
         }
         return value.trim();
     }
+
+    public static String toUpperCase(String value) {
+        if (value == null) {
+            return null;
+        }
+        return value.toUpperCase();
+    }
+
+    public static String toLowerCase(String value) {
+        if (value == null) {
+            return null;
+        }
+        return value.toLowerCase();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 50bd57da36..087fe15e67 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeFamily;
 import org.apache.paimon.types.DataTypeJsonParser;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.SerializableSupplier;
@@ -55,6 +56,11 @@ public interface Expression extends Serializable {
     /** Compute value from given input. Input and output are serialized to 
string. */
     String eval(String input);
 
+    /** Return name of this expression. */
+    default String name() {
+        return null;
+    }
+
     /** Expression function. */
     enum ExpressionFunction {
         YEAR(
@@ -142,7 +148,34 @@ public interface Expression extends Serializable {
                             referencedField.literals());
                 }),
         CAST((typeMapping, caseSensitive, args) -> cast(args)),
-        NOW((typeMapping, caseSensitive, args) -> new NowExpression());
+        NOW((typeMapping, caseSensitive, args) -> new NowExpression()),
+        UPPER(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return new UpperExpression(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            referencedField.literals());
+                }),
+        LOWER(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return new LowerExpression(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            referencedField.literals());
+                }),
+        TRIM(
+                (typeMapping, caseSensitive, args) -> {
+                    ReferencedField referencedField =
+                            ReferencedField.checkArgument(typeMapping, 
caseSensitive, args);
+                    return new TrimExpression(
+                            referencedField.field(),
+                            referencedField.fieldType(),
+                            referencedField.literals());
+                });
 
         public final ExpressionCreator creator;
 
@@ -627,4 +660,89 @@ public interface Expression extends Serializable {
             return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
         }
     }
+
+    /** Convert string to upper case. */
+    final class UpperExpression extends NoLiteralsStringExpressionBase {
+
+        public UpperExpression(String fieldReference, DataType fieldType, 
String... literals) {
+            super(fieldReference, fieldType, literals);
+        }
+
+        @Override
+        public String eval(String input) {
+            return StringUtils.toUpperCase(input);
+        }
+
+        @Override
+        public String name() {
+            return "upper";
+        }
+    }
+
+    /** Convert string to lower case. */
+    final class LowerExpression extends NoLiteralsStringExpressionBase {
+
+        public LowerExpression(String fieldReference, DataType fieldType, 
String... literals) {
+            super(fieldReference, fieldType, literals);
+        }
+
+        @Override
+        public String eval(String input) {
+            return StringUtils.toLowerCase(input);
+        }
+
+        @Override
+        public String name() {
+            return "lower";
+        }
+    }
+
+    /** Get trim string. */
+    final class TrimExpression extends NoLiteralsStringExpressionBase {
+
+        public TrimExpression(String fieldReference, DataType fieldType, 
String... literals) {
+            super(fieldReference, fieldType, literals);
+        }
+
+        @Override
+        public String eval(String input) {
+            return StringUtils.trim(input);
+        }
+
+        @Override
+        public String name() {
+            return "trim";
+        }
+    }
+
+    /** No literals string expression. */
+    abstract class NoLiteralsStringExpressionBase implements Expression {
+
+        private final String fieldReference;
+
+        public NoLiteralsStringExpressionBase(
+                String fieldReference, DataType fieldType, String... literals) 
{
+            this.fieldReference = fieldReference;
+            checkArgument(
+                    fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
+                    String.format(
+                            "'%s' expression only supports type root of '%s', 
but found '%s'.",
+                            name(), DataTypeRoot.VARCHAR, 
fieldType.getTypeRoot()));
+            checkArgument(
+                    literals.length == 0,
+                    String.format(
+                            "'%s' expression only supports 0 argument, but 
found '%s'.",
+                            name(), literals.length));
+        }
+
+        @Override
+        public DataType outputType() {
+            return DataTypes.STRING();
+        }
+
+        @Override
+        public String fieldReference() {
+            return fieldReference;
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 58df6ac36e..6f16609372 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -856,7 +856,10 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         "_substring_date1=substring(_date,2)",
                         "_substring_date2=substring(_timestamp,5,10)",
                         "_truncate_date=trUNcate(pk,2)", // test 
case-insensitive too
-                        "_constant=cast(11,INT)");
+                        "_constant=cast(11,INT)",
+                        "_upper=upper(_value)",
+                        "_lower=lower(_value)",
+                        "_trim=trim(_value)");
 
         MySqlSyncTableAction action =
                 syncTableActionBuilder(mySqlConfig)
@@ -870,9 +873,9 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
             try (Statement statement = getStatement()) {
                 statement.execute("USE " + DATABASE_NAME);
                 statement.executeUpdate(
-                        "INSERT INTO test_computed_column VALUES (1, 
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
+                        "INSERT INTO test_computed_column VALUES (1, 
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10', ' vaLUE ')");
                 statement.executeUpdate(
-                        "INSERT INTO test_computed_column VALUES (2, 
'2023-03-23', null, null)");
+                        "INSERT INTO test_computed_column VALUES (2, 
'2023-03-23', null, null, null)");
             }
         }
 
@@ -884,6 +887,7 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.DATE(),
                             DataTypes.TIMESTAMP(0),
                             DataTypes.TIMESTAMP(0),
+                            DataTypes.VARCHAR(10),
                             DataTypes.INT().notNull(),
                             DataTypes.INT(),
                             DataTypes.INT(),
@@ -908,13 +912,17 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             DataTypes.STRING(),
                             DataTypes.STRING(),
                             DataTypes.INT().notNull(),
-                            DataTypes.INT()
+                            DataTypes.INT(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING(),
+                            DataTypes.STRING()
                         },
                         new String[] {
                             "pk",
                             "_date",
                             "_datetime",
                             "_timestamp",
+                            "_value",
                             "_year_date",
                             "_year_datetime",
                             "_year_timestamp",
@@ -939,12 +947,15 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                             "_substring_date1",
                             "_substring_date2",
                             "_truncate_date",
-                            "_constant"
+                            "_constant",
+                            "_upper",
+                            "_lower",
+                            "_trim"
                         });
         List<String> expected =
                 Arrays.asList(
-                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 
2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]",
-                        "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, 
NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, 
NULL, 23-03-23, NULL, 2, 11]");
+                        "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,  
vaLUE , 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 
2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, 11,  VALUE ,  value , vaLUE]",
+                        "+I[2, 19439, NULL, NULL, NULL, 2023, NULL, NULL, 3, 
NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, 
NULL, NULL, 23-03-23, NULL, 2, 11, NULL, NULL, NULL]");
         waitForResult(expected, table, rowType, Arrays.asList("pk", 
"_year_date"));
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index f7c4dd9845..66e0b776d0 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -292,6 +292,7 @@ CREATE TABLE test_computed_column (
     _date DATE,
     _datetime DATETIME,
     _timestamp TIMESTAMP,
+    _value VARCHAR(10),
     PRIMARY KEY (pk)
 );
 

Reply via email to