This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 10b8dfa496 [cdc] support several basic computed-column expressions in
cdc ingestion. (#5620)
10b8dfa496 is described below
commit 10b8dfa496bf3a33b2ea7398b7d175e651a38616
Author: Fantasy-Jay <[email protected]>
AuthorDate: Mon May 19 15:34:41 2025 +0800
[cdc] support several basic computed-column expressions in cdc ingestion.
(#5620)
---
.../shortcodes/generated/other_functions.html | 14 ++-
.../java/org/apache/paimon/utils/StringUtils.java | 14 +++
.../apache/paimon/flink/action/cdc/Expression.java | 120 ++++++++++++++++++++-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 25 +++--
.../src/test/resources/mysql/sync_table_setup.sql | 1 +
5 files changed, 165 insertions(+), 9 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/other_functions.html
b/docs/layouts/shortcodes/generated/other_functions.html
index 2135fc8172..808e0a4c45 100644
--- a/docs/layouts/shortcodes/generated/other_functions.html
+++ b/docs/layouts/shortcodes/generated/other_functions.html
@@ -35,7 +35,7 @@ under the License.
</tr>
<tr>
<td><h5>truncate(column,width)</h5></td>
- <td>truncate column by width. Output type is same with column.If the
column is a STRING, truncate(column,width) will truncate the string to width
characters, namely `value.substring(0, width)`.
+ <td>Truncate column by width. Output type is the same with column. If
the column is a STRING, truncate(column,width) will truncate the string to
width characters, namely `value.substring(0, width)`.
If the column is an INT or LONG, truncate(column,width) will
truncate the number with the algorithm `v - (((v % W) + W) % W)`. The
`redundant` compute part is to keep the result always positive.
If the column is a DECIMAL, truncate(column,width) will truncate
the decimal with the algorithm: let `scaled_W = decimal(W, scale(v))`, then
return `v - (v % scaled_W)`.</td>
</tr>
@@ -43,5 +43,17 @@ under the License.
<td><h5>cast(value,dataType)</h5></td>
<td>Get a constant value. The output is an atomic type, such as
STRING, INT, BOOLEAN, etc.</td>
</tr>
+ <tr>
+ <td><h5>upper(value)</h5></td>
+ <td>Convert string column to upper case. The input should be a STRING
and the output is a STRING.</td>
+ </tr>
+ <tr>
+ <td><h5>lower(value)</h5></td>
+ <td>Convert string column to lower case. The input should be a STRING
and the output is a STRING.</td>
+ </tr>
+ <tr>
+ <td><h5>trim(value)</h5></td>
+ <td>Trim string column. The input should be a STRING and the output is
a STRING.</td>
+ </tr>
</tbody>
</table>
\ No newline at end of file
diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
index 57a14fa4f2..140d0750a5 100644
--- a/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-api/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -530,4 +530,18 @@ public class StringUtils {
}
return value.trim();
}
+
+ public static String toUpperCase(String value) {
+ if (value == null) {
+ return null;
+ }
+ return value.toUpperCase();
+ }
+
+ public static String toLowerCase(String value) {
+ if (value == null) {
+ return null;
+ }
+ return value.toLowerCase();
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 50bd57da36..087fe15e67 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeJsonParser;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
@@ -55,6 +56,11 @@ public interface Expression extends Serializable {
/** Compute value from given input. Input and output are serialized to
string. */
String eval(String input);
+ /** Return name of this expression. */
+ default String name() {
+ return null;
+ }
+
/** Expression function. */
enum ExpressionFunction {
YEAR(
@@ -142,7 +148,34 @@ public interface Expression extends Serializable {
referencedField.literals());
}),
CAST((typeMapping, caseSensitive, args) -> cast(args)),
- NOW((typeMapping, caseSensitive, args) -> new NowExpression());
+ NOW((typeMapping, caseSensitive, args) -> new NowExpression()),
+ UPPER(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ return new UpperExpression(
+ referencedField.field(),
+ referencedField.fieldType(),
+ referencedField.literals());
+ }),
+ LOWER(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ return new LowerExpression(
+ referencedField.field(),
+ referencedField.fieldType(),
+ referencedField.literals());
+ }),
+ TRIM(
+ (typeMapping, caseSensitive, args) -> {
+ ReferencedField referencedField =
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
+ return new TrimExpression(
+ referencedField.field(),
+ referencedField.fieldType(),
+ referencedField.literals());
+ });
public final ExpressionCreator creator;
@@ -627,4 +660,89 @@ public interface Expression extends Serializable {
return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
}
}
+
+ /** Convert string to upper case. */
+ final class UpperExpression extends NoLiteralsStringExpressionBase {
+
+ public UpperExpression(String fieldReference, DataType fieldType,
String... literals) {
+ super(fieldReference, fieldType, literals);
+ }
+
+ @Override
+ public String eval(String input) {
+ return StringUtils.toUpperCase(input);
+ }
+
+ @Override
+ public String name() {
+ return "upper";
+ }
+ }
+
+ /** Convert string to lower case. */
+ final class LowerExpression extends NoLiteralsStringExpressionBase {
+
+ public LowerExpression(String fieldReference, DataType fieldType,
String... literals) {
+ super(fieldReference, fieldType, literals);
+ }
+
+ @Override
+ public String eval(String input) {
+ return StringUtils.toLowerCase(input);
+ }
+
+ @Override
+ public String name() {
+ return "lower";
+ }
+ }
+
+ /** Get trim string. */
+ final class TrimExpression extends NoLiteralsStringExpressionBase {
+
+ public TrimExpression(String fieldReference, DataType fieldType,
String... literals) {
+ super(fieldReference, fieldType, literals);
+ }
+
+ @Override
+ public String eval(String input) {
+ return StringUtils.trim(input);
+ }
+
+ @Override
+ public String name() {
+ return "trim";
+ }
+ }
+
+ /** No literals string expression. */
+ abstract class NoLiteralsStringExpressionBase implements Expression {
+
+ private final String fieldReference;
+
+ public NoLiteralsStringExpressionBase(
+ String fieldReference, DataType fieldType, String... literals)
{
+ this.fieldReference = fieldReference;
+ checkArgument(
+ fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
+ String.format(
+ "'%s' expression only supports type root of '%s',
but found '%s'.",
+ name(), DataTypeRoot.VARCHAR,
fieldType.getTypeRoot()));
+ checkArgument(
+ literals.length == 0,
+ String.format(
+ "'%s' expression only supports 0 argument, but
found '%s'.",
+ name(), literals.length));
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public String fieldReference() {
+ return fieldReference;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 58df6ac36e..6f16609372 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -856,7 +856,10 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_substring_date1=substring(_date,2)",
"_substring_date2=substring(_timestamp,5,10)",
"_truncate_date=trUNcate(pk,2)", // test
case-insensitive too
- "_constant=cast(11,INT)");
+ "_constant=cast(11,INT)",
+ "_upper=upper(_value)",
+ "_lower=lower(_value)",
+ "_trim=trim(_value)");
MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
@@ -870,9 +873,9 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
try (Statement statement = getStatement()) {
statement.execute("USE " + DATABASE_NAME);
statement.executeUpdate(
- "INSERT INTO test_computed_column VALUES (1,
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
+ "INSERT INTO test_computed_column VALUES (1,
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10', ' vaLUE ')");
statement.executeUpdate(
- "INSERT INTO test_computed_column VALUES (2,
'2023-03-23', null, null)");
+ "INSERT INTO test_computed_column VALUES (2,
'2023-03-23', null, null, null)");
}
}
@@ -884,6 +887,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.DATE(),
DataTypes.TIMESTAMP(0),
DataTypes.TIMESTAMP(0),
+ DataTypes.VARCHAR(10),
DataTypes.INT().notNull(),
DataTypes.INT(),
DataTypes.INT(),
@@ -908,13 +912,17 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.INT().notNull(),
- DataTypes.INT()
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
},
new String[] {
"pk",
"_date",
"_datetime",
"_timestamp",
+ "_value",
"_year_date",
"_year_datetime",
"_year_timestamp",
@@ -939,12 +947,15 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
"_substring_date1",
"_substring_date2",
"_truncate_date",
- "_constant"
+ "_constant",
+ "_upper",
+ "_lower",
+ "_trim"
});
List<String> expected =
Arrays.asList(
- "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023,
2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]",
- "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL,
NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL,
NULL, 23-03-23, NULL, 2, 11]");
+ "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
vaLUE , 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10,
2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, 11, VALUE , value , vaLUE]",
+ "+I[2, 19439, NULL, NULL, NULL, 2023, NULL, NULL, 3,
NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023,
NULL, NULL, 23-03-23, NULL, 2, 11, NULL, NULL, NULL]");
waitForResult(expected, table, rowType, Arrays.asList("pk",
"_year_date"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index f7c4dd9845..66e0b776d0 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -292,6 +292,7 @@ CREATE TABLE test_computed_column (
_date DATE,
_datetime DATETIME,
_timestamp TIMESTAMP,
+ _value VARCHAR(10),
PRIMARY KEY (pk)
);