This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 08f60af2c8 [hotfix] Fix test failure in
UpdatedDataFieldsProcessFunctionBase
08f60af2c8 is described below
commit 08f60af2c8f54560e76b3166bbe678100d490b5e
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jun 10 13:14:50 2025 +0800
[hotfix] Fix test failure in UpdatedDataFieldsProcessFunctionBase
---
.../sink/cdc/UpdatedDataFieldsProcessFunctionBase.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index f512c14e6c..6f74a346e9 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -130,10 +130,8 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
String topLevelFieldName = updateColumnType.fieldNames()[0];
TableSchema oldSchema =
schemaManager.latestOrThrow("Table does not exist. This is
unexpected.");
- DataType oldTopLevelFieldType =
- new
RowType(oldSchema.fields()).getField(topLevelFieldName).type();
- DataType newTopLevelFieldType =
- new
RowType(newSchema.fields()).getField(topLevelFieldName).type();
+ DataType oldTopLevelFieldType =
findTopLevelType(oldSchema.fields(), topLevelFieldName);
+ DataType newTopLevelFieldType =
findTopLevelType(newSchema.fields(), topLevelFieldName);
// For complex types, extract the top level type to check type
context (e.g.,
// ARRAY<BIGINT> instead of just BIGINT)
@@ -163,6 +161,15 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
}
+ private DataType findTopLevelType(List<DataField> fields, String name) {
+ for (DataField field : fields) {
+ if (caseSensitive ? name.equals(field.name()) :
name.equalsIgnoreCase(field.name())) {
+ return field.type();
+ }
+ }
+ throw new RuntimeException("Cannot find top level type " + name);
+ }
+
public static ConvertAction canConvert(
DataType oldType, DataType newType, TypeMapping typeMapping) {
if (oldType.equalsIgnoreNullable(newType)) {