This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 926c381024 [cdc] Support computed column referring to each other while
sync_table (#5972)
926c381024 is described below
commit 926c381024260c0f5bbcbf3d696d0362f661d23b
Author: JackeyLee007 <[email protected]>
AuthorDate: Tue Aug 5 10:01:44 2025 +0800
[cdc] Support computed column referring to each other while sync_table
(#5972)
---
.../paimon/flink/action/cdc/ComputedColumn.java | 14 --
.../flink/action/cdc/ComputedColumnUtils.java | 145 ++++++-------------
.../apache/paimon/flink/action/cdc/Expression.java | 154 ++++-----------------
.../action/cdc/format/AbstractRecordParser.java | 14 +-
.../flink/action/cdc/mysql/MySqlRecordParser.java | 35 ++++-
.../action/cdc/postgres/PostgresRecordParser.java | 17 ++-
.../paimon/flink/action/cdc/utils/DfsSort.java | 103 ++++++++++++++
.../apache/paimon/flink/sink/cdc/CdcSchema.java | 5 +-
.../flink/action/cdc/ComputedColumnUtilsTest.java | 41 +++---
.../cdc/mysql/MySqlSyncTableActionITCase.java | 64 +++++++++
.../flink/action/cdc/utils/DfsSortTestTest.java | 55 ++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 9 ++
12 files changed, 366 insertions(+), 290 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
index dddb909cd7..5e9041a120 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumn.java
@@ -53,11 +53,6 @@ public class ComputedColumn implements Serializable {
return expression.fieldReference();
}
- @Nullable
- public DataType fieldReferenceType() {
- return expression.fieldReferenceType();
- }
-
/** Compute column's value from given argument. Return null if input is
null. */
@Nullable
public String eval(@Nullable String input) {
@@ -66,13 +61,4 @@ public class ComputedColumn implements Serializable {
}
return expression.eval(input);
}
-
- /** Compute column's value from given argument. Return null if input is
null. */
- @Nullable
- public String eval(@Nullable String input, DataType inputType) {
- if (fieldReference() != null && input == null) {
- return null;
- }
- return expression.eval(input, inputType);
- }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
index 28fbef9455..7bd9dd561d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtils.java
@@ -18,22 +18,19 @@
package org.apache.paimon.flink.action.cdc;
-import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.flink.action.cdc.utils.DfsSort;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;
+import org.apache.flink.api.java.tuple.Tuple2;
+
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** Utility methods for {@link ComputedColumn}, such as build. */
public class ComputedColumnUtils {
@@ -50,16 +47,44 @@ public class ComputedColumnUtils {
.collect(
Collectors.toMap(DataField::name,
DataField::type, (v1, v2) -> v2));
+ // sort computed column args by dependencies
+ LinkedHashMap<String, Tuple2<String, String[]>> sortedArgs =
+ sortComputedColumnArgs(computedColumnArgs, caseSensitive);
+
List<ComputedColumn> computedColumns = new ArrayList<>();
- for (String columnArg : computedColumnArgs) {
- String[] kv = columnArg.split("=");
+ for (Map.Entry<String, Tuple2<String, String[]>> columnArg :
sortedArgs.entrySet()) {
+ String columnName = columnArg.getKey().trim();
+ String exprName = columnArg.getValue().f0.trim();
+ String[] args = columnArg.getValue().f1;
+
+ Expression expr = Expression.create(typeMapping, caseSensitive,
exprName, args);
+ ComputedColumn cmpColumn = new ComputedColumn(columnName, expr);
+ computedColumns.add(new ComputedColumn(columnName, expr));
+
+ // remember the column type for later reference by other computed
columns
+ typeMapping.put(columnName, cmpColumn.columnType());
+ }
+
+ return computedColumns;
+ }
+
+ private static LinkedHashMap<String, Tuple2<String, String[]>>
sortComputedColumnArgs(
+ List<String> computedColumnArgs, boolean caseSensitive) {
+ List<String> argList =
+ computedColumnArgs.stream()
+ .map(x -> caseSensitive ? x : x.toUpperCase())
+ .collect(Collectors.toList());
+
+ LinkedHashMap<String, Tuple2<String, String[]>> eqMap = new
LinkedHashMap<>();
+ LinkedHashMap<String, String> refMap = new LinkedHashMap<>();
+ for (String arg : argList) {
+ String[] kv = arg.split("=");
if (kv.length != 2) {
throw new IllegalArgumentException(
String.format(
"Invalid computed column argument: %s. Please
use format 'column-name=expr-name(args, ...)'.",
- columnArg));
+ arg));
}
- String columnName = kv[0].trim();
String expression = kv[1].trim();
// parse expression
int left = expression.indexOf('(');
@@ -69,101 +94,21 @@ public class ComputedColumnUtils {
String.format(
"Invalid expression: %s. Please use format
'expr-name(args, ...)'.",
expression));
-
String exprName = expression.substring(0, left);
String[] args = expression.substring(left + 1, right).split(",");
- checkArgument(args.length >= 1, "Computed column needs at least
one argument.");
-
- computedColumns.add(
- new ComputedColumn(
- columnName,
- Expression.create(typeMapping, caseSensitive,
exprName, args)));
- }
-
- return sortComputedColumns(computedColumns);
- }
-
- @VisibleForTesting
- public static List<ComputedColumn>
sortComputedColumns(List<ComputedColumn> columns) {
- Set<String> columnNames = new HashSet<>();
- for (ComputedColumn col : columns) {
- columnNames.add(col.columnName());
- }
- // For simple processing, no reference or referring to another
computed column, means
- // independent
- List<ComputedColumn> independent = new ArrayList<>();
- List<ComputedColumn> dependent = new ArrayList<>();
-
- for (ComputedColumn col : columns) {
- if (col.fieldReference() == null ||
!columnNames.contains(col.fieldReference())) {
- independent.add(col);
- } else {
- dependent.add(col);
- }
- }
-
- // Sort dependent columns with topological sort
- Map<String, ComputedColumn> columnMap = new HashMap<>();
- Map<String, Set<String>> reverseDependencies = new HashMap<>();
-
- for (ComputedColumn col : dependent) {
- columnMap.put(col.columnName(), col);
- reverseDependencies
- .computeIfAbsent(col.fieldReference(), k -> new
HashSet<>())
- .add(col.columnName());
- }
-
- List<ComputedColumn> sortedDependent = new ArrayList<>();
- Set<String> visited = new HashSet<>();
- Set<String> tempMark = new HashSet<>(); // For cycle detection
-
- for (ComputedColumn col : dependent) {
- if (!visited.contains(col.columnName())) {
- dfs(
- col.columnName(),
- reverseDependencies,
- columnMap,
- sortedDependent,
- visited,
- tempMark);
- }
+ // args[0] may be empty string, eg. "cal_col=now()"
+ eqMap.put(kv[0].trim(), Tuple2.of(exprName, args));
+ refMap.put(kv[0].trim(), args[0].trim());
}
- Collections.reverse(sortedDependent);
+ List<String> sortedKeys = DfsSort.sortKeys(refMap);
- // Independent should precede dependent
- List<ComputedColumn> result = new ArrayList<>();
- result.addAll(independent);
- result.addAll(sortedDependent);
-
- return result;
- }
-
- private static void dfs(
- String node,
- Map<String, Set<String>> reverseDependencies,
- Map<String, ComputedColumn> columnMap,
- List<ComputedColumn> sorted,
- Set<String> visited,
- Set<String> tempMark) {
- if (tempMark.contains(node)) {
- throw new IllegalArgumentException("Cycle detected: " + node);
- }
- if (visited.contains(node)) {
- return;
+ LinkedHashMap<String, Tuple2<String, String[]>> sortedMap =
+ new LinkedHashMap<>(refMap.size());
+ for (String key : sortedKeys) {
+ sortedMap.put(key, eqMap.get(key));
}
-
- tempMark.add(node);
- ComputedColumn current = columnMap.get(node);
-
- // Process the dependencies
- for (String dependent : reverseDependencies.getOrDefault(node,
Collections.emptySet())) {
- dfs(dependent, reverseDependencies, columnMap, sorted, visited,
tempMark);
- }
-
- tempMark.remove(node);
- visited.add(node);
- sorted.add(current);
+ return sortedMap;
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 9fdc4606a0..087fe15e67 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -24,7 +24,6 @@ import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeJsonParser;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.StringUtils;
@@ -51,18 +50,12 @@ public interface Expression extends Serializable {
/** Return name of referenced field. */
String fieldReference();
- /** Return {@link DataType} of referenced field. */
- DataType fieldReferenceType();
-
/** Return {@link DataType} of computed value. */
DataType outputType();
/** Compute value from given input. Input and output are serialized to
string. */
String eval(String input);
- /** Compute value from given input. Input and output are serialized to
string. */
- String eval(String input, DataType inputType);
-
/** Return name of this expression. */
default String name() {
return null;
@@ -73,7 +66,7 @@ public interface Expression extends Serializable {
YEAR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -83,7 +76,7 @@ public interface Expression extends Serializable {
MONTH(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -93,7 +86,7 @@ public interface Expression extends Serializable {
DAY(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -103,7 +96,7 @@ public interface Expression extends Serializable {
HOUR(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -113,7 +106,7 @@ public interface Expression extends Serializable {
MINUTE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -123,7 +116,7 @@ public interface Expression extends Serializable {
SECOND(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return TemporalToIntConverter.create(
referencedField.field(),
referencedField.fieldType(),
@@ -133,7 +126,7 @@ public interface Expression extends Serializable {
DATE_FORMAT(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return DateFormat.create(
referencedField.field(),
referencedField.fieldType(),
@@ -142,13 +135,13 @@ public interface Expression extends Serializable {
SUBSTRING(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return substring(referencedField.field(),
referencedField.literals());
}),
TRUNCATE(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return truncate(
referencedField.field(),
referencedField.fieldType(),
@@ -159,7 +152,7 @@ public interface Expression extends Serializable {
UPPER(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return new UpperExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -168,7 +161,7 @@ public interface Expression extends Serializable {
LOWER(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return new LowerExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -177,7 +170,7 @@ public interface Expression extends Serializable {
TRIM(
(typeMapping, caseSensitive, args) -> {
ReferencedField referencedField =
- ReferencedField.create(typeMapping, caseSensitive,
args);
+ ReferencedField.checkArgument(typeMapping,
caseSensitive, args);
return new TrimExpression(
referencedField.field(),
referencedField.fieldType(),
@@ -215,16 +208,16 @@ public interface Expression extends Serializable {
/** Referenced field in expression input parameters. */
class ReferencedField {
private final String field;
- @Nullable private final DataType fieldType;
+ private final DataType fieldType;
private final String[] literals;
- private ReferencedField(String field, @Nullable DataType fieldType,
String[] literals) {
+ private ReferencedField(String field, DataType fieldType, String[]
literals) {
this.field = field;
this.fieldType = fieldType;
this.literals = literals;
}
- public static ReferencedField create(
+ public static ReferencedField checkArgument(
Map<String, DataType> typeMapping, boolean caseSensitive,
String... args) {
String referencedField = args[0].trim();
String[] literals =
@@ -233,13 +226,11 @@ public interface Expression extends Serializable {
StringUtils.toLowerCaseIfNeed(referencedField,
caseSensitive);
DataType fieldType =
- typeMapping.isEmpty()
- ? null
- : checkNotNull(
- typeMapping.get(referencedFieldCheckForm),
- String.format(
- "Referenced field '%s' is not in
given fields: %s.",
- referencedFieldCheckForm,
typeMapping.keySet()));
+ checkNotNull(
+ typeMapping.get(referencedFieldCheckForm),
+ String.format(
+ "Referenced field '%s' is not in given
fields: %s.",
+ referencedFieldCheckForm,
typeMapping.keySet()));
return new ReferencedField(referencedField, fieldType, literals);
}
@@ -335,22 +326,16 @@ public interface Expression extends Serializable {
private static final List<Integer> SUPPORTED_PRECISION =
Arrays.asList(0, 3, 6, 9);
private final String fieldReference;
- @Nullable private DataType fieldReferenceType;
- @Nullable private Integer precision;
+ @Nullable private final Integer precision;
private transient Function<LocalDateTime, T> converter;
private TemporalExpressionBase(
- String fieldReference, @Nullable DataType fieldType, @Nullable
Integer precision) {
+ String fieldReference, DataType fieldType, @Nullable Integer
precision) {
this.fieldReference = fieldReference;
- this.fieldReferenceType = fieldType;
// when the input is INTEGER_NUMERIC, the precision must be set
- if (fieldType != null
- && fieldType
- .getTypeRoot()
- .getFamilies()
- .contains(DataTypeFamily.INTEGER_NUMERIC)
+ if
(fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
&& precision == null) {
precision = 0;
}
@@ -369,11 +354,6 @@ public interface Expression extends Serializable {
return fieldReference;
}
- @Override
- public DataType fieldReferenceType() {
- return fieldReferenceType;
- }
-
/** If not, this must be overridden! */
@Override
public DataType outputType() {
@@ -390,21 +370,6 @@ public interface Expression extends Serializable {
return String.valueOf(result);
}
- @Override
- public String eval(String input, DataType inputType) {
- if (this.fieldReferenceType == null) {
- this.fieldReferenceType = inputType;
-
- // when the input is INTEGER_NUMERIC, the precision must be set
- if
(inputType.getTypeRoot().getFamilies().contains(DataTypeFamily.INTEGER_NUMERIC)
- && precision == null) {
- this.precision = 0;
- }
- }
-
- return eval(input);
- }
-
private LocalDateTime toLocalDateTime(String input) {
if (precision == null) {
return DateTimeUtils.toLocalDateTime(input, 9);
@@ -460,7 +425,7 @@ public interface Expression extends Serializable {
private static TemporalToIntConverter create(
String fieldReference,
- @Nullable DataType fieldType,
+ DataType fieldType,
SerializableSupplier<Function<LocalDateTime, Integer>>
converterSupplier,
String... literals) {
checkArgument(
@@ -539,11 +504,6 @@ public interface Expression extends Serializable {
return fieldReference;
}
- @Override
- public DataType fieldReferenceType() {
- return new VarCharType();
- }
-
@Override
public DataType outputType() {
return DataTypes.STRING();
@@ -564,11 +524,6 @@ public interface Expression extends Serializable {
input, beginInclusive, endExclusive));
}
}
-
- @Override
- public String eval(String input, DataType inputType) {
- return eval(input);
- }
}
/** Truncate numeric/decimal/string value. */
@@ -577,11 +532,11 @@ public interface Expression extends Serializable {
private final String fieldReference;
- @Nullable private DataType fieldType;
+ private final DataType fieldType;
private final int width;
- TruncateComputer(String fieldReference, @Nullable DataType fieldType,
String literal) {
+ TruncateComputer(String fieldReference, DataType fieldType, String
literal) {
this.fieldReference = fieldReference;
this.fieldType = fieldType;
try {
@@ -599,11 +554,6 @@ public interface Expression extends Serializable {
return fieldReference;
}
- @Override
- public DataType fieldReferenceType() {
- return fieldType;
- }
-
@Override
public DataType outputType() {
return fieldType;
@@ -638,14 +588,6 @@ public interface Expression extends Serializable {
}
}
- @Override
- public String eval(String input, DataType inputType) {
- if (this.fieldType == null) {
- this.fieldType = inputType;
- }
- return eval(input);
- }
-
private short truncateShort(int width, short value) {
return (short) (value - (((value % width) + width) % width));
}
@@ -690,11 +632,6 @@ public interface Expression extends Serializable {
return null;
}
- @Override
- public DataType fieldReferenceType() {
- return null;
- }
-
@Override
public DataType outputType() {
return dataType;
@@ -704,11 +641,6 @@ public interface Expression extends Serializable {
public String eval(String input) {
return value;
}
-
- @Override
- public String eval(String input, DataType inputType) {
- return value;
- }
}
/** Get current timestamp. */
@@ -718,11 +650,6 @@ public interface Expression extends Serializable {
return null;
}
- @Override
- public DataType fieldReferenceType() {
- return null;
- }
-
@Override
public DataType outputType() {
return DataTypes.TIMESTAMP(3);
@@ -732,11 +659,6 @@ public interface Expression extends Serializable {
public String eval(String input) {
return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
}
-
- @Override
- public String eval(String input, DataType inputType) {
- return eval(input);
- }
}
/** Convert string to upper case. */
@@ -797,14 +719,12 @@ public interface Expression extends Serializable {
abstract class NoLiteralsStringExpressionBase implements Expression {
private final String fieldReference;
- @Nullable protected DataType fieldReferenceType;
public NoLiteralsStringExpressionBase(
- String fieldReference, @Nullable DataType fieldType, String...
literals) {
+ String fieldReference, DataType fieldType, String... literals)
{
this.fieldReference = fieldReference;
- this.fieldReferenceType = fieldType;
checkArgument(
- fieldType == null || fieldType.getTypeRoot() ==
DataTypeRoot.VARCHAR,
+ fieldType.getTypeRoot() == DataTypeRoot.VARCHAR,
String.format(
"'%s' expression only supports type root of '%s',
but found '%s'.",
name(), DataTypeRoot.VARCHAR,
fieldType.getTypeRoot()));
@@ -824,23 +744,5 @@ public interface Expression extends Serializable {
public String fieldReference() {
return fieldReference;
}
-
- @Override
- public DataType fieldReferenceType() {
- return fieldReferenceType;
- }
-
- @Override
- public String eval(String input, DataType inputType) {
- if (this.fieldReferenceType == null) {
- checkArgument(
- inputType.getTypeRoot() == DataTypeRoot.VARCHAR,
- String.format(
- "'%s' expression only supports type root of
'%s', but found '%s'.",
- name(), DataTypeRoot.VARCHAR,
inputType.getTypeRoot()));
- this.fieldReferenceType = inputType;
- }
- return eval(input);
- }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
index 8008cd7ca9..85442067b9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/AbstractRecordParser.java
@@ -25,7 +25,6 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcSchema;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -104,17 +103,8 @@ public abstract class AbstractRecordParser
Map<String, String> rowData, CdcSchema.Builder schemaBuilder) {
computedColumns.forEach(
computedColumn -> {
- String result;
- if (computedColumn.fieldReference() != null
- && computedColumn.fieldReferenceType() == null) {
- DataType inputType =
-
schemaBuilder.getFieldType(computedColumn.fieldReference());
- result =
- computedColumn.eval(
-
rowData.get(computedColumn.fieldReference()), inputType);
- } else {
- result =
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
- }
+ String result =
+
computedColumn.eval(rowData.get(computedColumn.fieldReference()));
rowData.put(computedColumn.columnName(), result);
schemaBuilder.column(computedColumn.columnName(),
computedColumn.columnType());
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 0fffdf0b98..6c8f2ae324 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -235,21 +236,40 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
Map<String, DebeziumEvent.Field> fields =
schema.beforeAndAfterFields();
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+
LinkedHashMap<String, String> resultMap = new LinkedHashMap<>();
+
for (Map.Entry<String, DebeziumEvent.Field> field : fields.entrySet())
{
String fieldName = field.getKey();
- String mySqlType = field.getValue().type();
+ String debeziumType = field.getValue().type();
+ String className = field.getValue().name();
+
+ // record the field data type for computed columns reference
+ JsonNode parametersNode = field.getValue().parameters();
+ Map<String, String> parametersMap =
+ isNull(parametersNode)
+ ? Collections.emptyMap()
+ : JsonSerdeUtil.convertValue(
+ parametersNode,
+ new TypeReference<HashMap<String,
String>>() {});
+
+ DataType paimonDataType =
+ DebeziumSchemaUtils.toDataType(debeziumType, className,
parametersMap);
+ schemaBuilder.column(fieldName, paimonDataType);
+
JsonNode objectValue = recordRow.get(fieldName);
if (isNull(objectValue)) {
continue;
}
- String className = field.getValue().name();
String oldValue = objectValue.asText();
String newValue =
DebeziumSchemaUtils.transformRawValue(
oldValue,
- mySqlType,
+ debeziumType,
className,
typeMapping,
objectValue,
@@ -259,9 +279,12 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
// generate values of computed columns
for (ComputedColumn computedColumn : computedColumns) {
- resultMap.put(
- computedColumn.columnName(),
-
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+ String refName = computedColumn.fieldReference();
+
+ resultMap.put(computedColumn.columnName(),
computedColumn.eval(resultMap.get(refName)));
+
+ // remember the computed column data type for later reference by
other computed columns
+ schemaBuilder.column(computedColumn.columnName(),
computedColumn.columnType());
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
index af114cb920..c2565c1f2d 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java
@@ -121,7 +121,7 @@ public class PostgresRecordParser
extractRecords().forEach(out::collect);
}
- private CdcSchema extractSchema(DebeziumEvent.Field schema) {
+ private CdcSchema extractSchema(DebeziumEvent.Field schema,
CdcSchema.Builder schemaBuilder) {
Map<String, DebeziumEvent.Field> afterFields = schema.afterFields();
Preconditions.checkArgument(
!afterFields.isEmpty(),
@@ -129,7 +129,6 @@ public class PostgresRecordParser
+ "Please make sure that `includeSchema` is true "
+ "in the JsonDebeziumDeserializationSchema you
created");
- CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
afterFields.forEach(
(key, value) -> {
DataType dataType = extractFieldType(value);
@@ -207,15 +206,16 @@ public class PostgresRecordParser
private List<RichCdcMultiplexRecord> extractRecords() {
List<RichCdcMultiplexRecord> records = new ArrayList<>();
+ CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
+ CdcSchema schema = extractSchema(root.schema(), schemaBuilder);
- Map<String, String> before = extractRow(root.payload().before());
+ Map<String, String> before = extractRow(root.payload().before(),
schemaBuilder);
if (!before.isEmpty()) {
records.add(createRecord(RowKind.DELETE, before));
}
- Map<String, String> after = extractRow(root.payload().after());
+ Map<String, String> after = extractRow(root.payload().after(),
schemaBuilder);
if (!after.isEmpty()) {
- CdcSchema schema = extractSchema(root.schema());
records.add(
new RichCdcMultiplexRecord(
databaseName,
@@ -227,7 +227,7 @@ public class PostgresRecordParser
return records;
}
- private Map<String, String> extractRow(JsonNode recordRow) {
+ private Map<String, String> extractRow(JsonNode recordRow,
CdcSchema.Builder schemaBuilder) {
if (isNull(recordRow)) {
return new HashMap<>();
}
@@ -346,9 +346,8 @@ public class PostgresRecordParser
// generate values of computed columns
for (ComputedColumn computedColumn : computedColumns) {
- resultMap.put(
- computedColumn.columnName(),
-
computedColumn.eval(resultMap.get(computedColumn.fieldReference())));
+ String refName = computedColumn.fieldReference();
+ resultMap.put(computedColumn.columnName(),
computedColumn.eval(resultMap.get(refName)));
}
for (CdcMetadataConverter metadataConverter : metadataConverters) {
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
new file mode 100644
index 0000000000..555c64ee2a
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/utils/DfsSort.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.utils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** DFS sort algorithm for topological sorting of a DAG (Directed Acyclic
Graph). */
+public class DfsSort {
+ public static <K> LinkedHashMap<K, K> sort(LinkedHashMap<K, K> depMap) {
+ List<K> sortedKeys = sortKeys(depMap);
+ LinkedHashMap<K, K> sortedMap = new LinkedHashMap<>();
+ for (K key : sortedKeys) {
+ sortedMap.put(key, depMap.get(key));
+ }
+ return sortedMap;
+ }
+
+ public static <K> List<K> sortKeys(LinkedHashMap<K, K> depMap) {
+
+ Map<K, Set<K>> revMap = new LinkedHashMap<>();
+
+ List<K> noDeps = new ArrayList<>();
+
+ for (Map.Entry<K, K> entry : depMap.entrySet()) {
+ K key = entry.getKey();
+ K val = entry.getValue();
+
+ if (val == null || !depMap.containsKey(val)) {
+ noDeps.add(key);
+ } else {
+ revMap.computeIfAbsent(val, k -> new HashSet<>()).add(key);
+ }
+ }
+
+ List<K> sorted = new ArrayList<>();
+
+ Set<K> visited = new HashSet<>();
+ Set<K> tempMark = new HashSet<>(); // for cycle reference detection
+
+ for (Map.Entry<K, K> entry : depMap.entrySet()) {
+ K key = entry.getKey();
+ K val = entry.getValue();
+ if (val == null || !depMap.containsKey(val)) {
+ continue;
+ }
+
+ if (!visited.contains(key)) {
+ dfs(key, revMap, sorted, visited, tempMark);
+ }
+ }
+
+ Collections.reverse(noDeps);
+
+ sorted.addAll(noDeps);
+
+ Collections.reverse(sorted);
+
+ return sorted;
+ }
+
+ private static <K> void dfs(
+ K node, Map<K, Set<K>> revMap, List<K> sorted, Set<K> visited,
Set<K> tempMark) {
+ if (tempMark.contains(node)) {
+ throw new IllegalArgumentException("Cycle detected: " + node);
+ }
+ if (visited.contains(node)) {
+ return;
+ }
+
+ tempMark.add(node);
+
+ // Process the dependencies
+ for (K dependent : revMap.getOrDefault(node, Collections.emptySet())) {
+ dfs(dependent, revMap, sorted, visited, tempMark);
+ }
+
+ tempMark.remove(node);
+ visited.add(node);
+ sorted.add(node);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
index 7a615fdbc8..229dcc69bb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSchema.java
@@ -184,10 +184,7 @@ public class CdcSchema implements Serializable {
/** Returns the data type of the specified field. */
public DataType getFieldType(String fieldName) {
DataField field = columns.get(fieldName);
- if (field == null) {
- throw new IllegalArgumentException("Field " + fieldName + "
not found in schema.");
- }
- return field.type();
+ return field == null ? null : field.type();
}
/** Returns an instance of an unresolved {@link CdcSchema}. */
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
index 5ab3f48817..d04f91053b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/ComputedColumnUtilsTest.java
@@ -18,44 +18,47 @@
package org.apache.paimon.flink.action.cdc;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
-import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.sortComputedColumns;
+import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
/** Test for ComputedColumnUtils. */
public class ComputedColumnUtilsTest {
+
@Test
- public void testSortComputedColumns() {
- List<ComputedColumn> columns =
+ public void testComputedColumns() {
+ List<String> calColArgs =
Arrays.asList(
- new ComputedColumn("A", Expression.substring("B",
"1")),
- new ComputedColumn("B",
Expression.substring("ExistedColumn", "1")),
- new ComputedColumn("C", Expression.cast("No
Reference")),
- new ComputedColumn("D", Expression.substring("A",
"1")),
- new ComputedColumn("E", Expression.substring("C",
"1")));
+ "A=substring(B, 1)",
+ "B=substring(ExistedColumn,1)",
+ "C=now()",
+ "D=substring(A, 1)",
+ "E=substring(C,1)");
+ List<DataField> physicalFields =
+ Arrays.asList(new DataField(1, "ExistedColumn",
DataTypes.STRING()));
+ List<ComputedColumn> columns = buildComputedColumns(calColArgs,
physicalFields);
- List<ComputedColumn> sortedColumns = sortComputedColumns(columns);
assertEquals(
Arrays.asList("B", "C", "E", "A", "D"),
- sortedColumns.stream()
- .map(ComputedColumn::columnName)
- .collect(Collectors.toList()));
+
columns.stream().map(ComputedColumn::columnName).collect(Collectors.toList()));
}
@Test
public void testCycleReference() {
- List<ComputedColumn> columns =
- Arrays.asList(
- new ComputedColumn("A", Expression.substring("B",
"1")),
- new ComputedColumn("B", Expression.substring("C",
"1")),
- new ComputedColumn("C", Expression.substring("A",
"1")));
-
- assertThrows(IllegalArgumentException.class, () ->
sortComputedColumns(columns));
+ List<String> calColArgs =
+ Arrays.asList("A=substring(B, 1)", "B=substring(C, 1)",
"C=substring(A, 1)");
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> buildComputedColumns(calColArgs,
Collections.emptyList()));
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index b48a1c79cf..1200629351 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -1131,6 +1131,70 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
epochSecond * 1_000_000_000 + nano));
}
+ @Test
+ @Timeout(60)
+ public void testComputedColumnsCrossReference() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "test_computed_column2");
+
+ List<String> computedColumnDefs =
+ Arrays.asList(
+ "_lower_of_upper=lower(_upper)",
+ "_upper=upper(_value)",
+ "_trim_lower=trim(_lower_of_upper)",
+ "_constant=cast(11,INT)");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withPrimaryKeys("pk")
+ .withComputedColumnArgs(computedColumnDefs)
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ statement.execute("USE " + DATABASE_NAME);
+ statement.executeUpdate(
+ "INSERT INTO test_computed_column2 VALUES (1,
'2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10', ' vaLUE ')");
+ statement.executeUpdate(
+ "INSERT INTO test_computed_column2 VALUES (2,
'2023-03-23', null, null, null)");
+ }
+
+ FileStoreTable table = getFileStoreTable();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.DATE(),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.TIMESTAMP(0),
+ DataTypes.VARCHAR(10),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING()
+ },
+ new String[] {
+ "pk",
+ "_date",
+ "_datetime",
+ "_timestamp",
+ "_value",
+ "_upper",
+ "_constant",
+ "_lower_of_upper",
+ "_trim_lower"
+ });
+ List<String> expected =
+ Arrays.asList(
+ // sort according to reference
+
+ "+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10,
vaLUE , VALUE , 11, value , value]",
+ "+I[2, 19439, NULL, NULL, NULL, NULL, 11, NULL,
NULL]");
+
+ waitForResult(expected, table, rowType, Arrays.asList("pk"));
+ }
+
@Test
@Timeout(60)
public void testSyncShards() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
new file mode 100644
index 0000000000..180474d2cf
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/utils/DfsSortTestTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link DfsSort}. */
+public class DfsSortTestTest {
+ @Test
+ public void testSortKeys() {
+ LinkedHashMap<String, String> refs = new LinkedHashMap<>();
+ refs.put("A", "B");
+ refs.put("B", "O");
+ refs.put("C", null);
+ refs.put("D", "A");
+ refs.put("E", "C");
+ refs.put("F", "");
+
+ List<String> sorted = DfsSort.sortKeys(refs);
+ assertEquals(Arrays.asList("B", "C", "F", "E", "A", "D"), sorted);
+ }
+
+ @Test
+ public void testCycleReference() {
+ LinkedHashMap<String, String> refs = new LinkedHashMap<>();
+ refs.put("A", "B");
+ refs.put("B", "C");
+ refs.put("C", "A");
+
+ assertThrows(IllegalArgumentException.class, () -> DfsSort.sort(refs));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index ae0186cf70..a3b42f9440 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -296,6 +296,15 @@ CREATE TABLE test_computed_column (
PRIMARY KEY (pk)
);
+CREATE TABLE test_computed_column2 (
+ pk INT,
+ _date DATE,
+ _datetime DATETIME,
+ _timestamp TIMESTAMP,
+ _value VARCHAR(10),
+ PRIMARY KEY (pk)
+);
+
CREATE TABLE test_time_to_int_epoch (
pk INT,
_second_val0 INT,