This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 116309cdd3 [hotfix] Refactor codes to topLevelField name in
UpdatedDataFieldsProcessFunctionBase
116309cdd3 is described below
commit 116309cdd34dff641558764fbf00ac0c598389c3
Author: JingsongLi <[email protected]>
AuthorDate: Mon Jun 9 16:13:07 2025 +0800
[hotfix] Refactor codes to topLevelField name in
UpdatedDataFieldsProcessFunctionBase
---
...MultiTableUpdatedDataFieldsProcessFunction.java | 1 -
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 59 +++++++++-------------
.../java/org/apache/paimon/flink/FlinkCatalog.java | 1 -
3 files changed, 23 insertions(+), 38 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index 503f3593c2..d10f6d9f5e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -99,7 +99,6 @@ public class MultiTableUpdatedDataFieldsProcessFunction
actualUpdatedDataFields,
updatedSchema.f1.primaryKeys(),
updatedSchema.f1.comment());
-
for (SchemaChange schemaChange : extractSchemaChanges(schemaManager,
actualUpdatedSchema)) {
applySchemaChange(schemaManager, schemaChange, tableId,
actualUpdatedSchema);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index bb49de2ac1..f512c14e6c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -35,7 +35,6 @@ import org.apache.paimon.types.FieldIdentifier;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.api.common.functions.OpenContext;
@@ -46,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -54,13 +54,15 @@ import java.util.stream.Collectors;
/** Base class for update data fields process function. */
public abstract class UpdatedDataFieldsProcessFunctionBase<I, O> extends
ProcessFunction<I, O> {
+
private static final Logger LOG =
LoggerFactory.getLogger(UpdatedDataFieldsProcessFunctionBase.class);
protected final CatalogLoader catalogLoader;
+ private final TypeMapping typeMapping;
+
protected Catalog catalog;
private boolean caseSensitive;
- private TypeMapping typeMapping;
private static final List<DataTypeRoot> STRING_TYPES =
Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR);
@@ -105,7 +107,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
SchemaManager schemaManager,
SchemaChange schemaChange,
Identifier identifier,
- CdcSchema actualUpdatedSchema)
+ CdcSchema newSchema)
throws Exception {
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
@@ -125,35 +127,17 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
- TableSchema schema =
- schemaManager
- .latest()
- .orElseThrow(
- () ->
- new RuntimeException(
- "Table does not exist.
This is unexpected."));
- int idx =
schema.fieldNames().indexOf(updateColumnType.fieldNames()[0]);
- Preconditions.checkState(
- idx >= 0,
- "Field name "
- + updateColumnType.fieldNames()[0]
- + " does not exist in table. This is unexpected.");
- DataType oldType = schema.fields().get(idx).type();
- DataType newType = updateColumnType.newDataType();
-
- // For complex types, extract the full new type from
actualUpdatedSchema
- // to preserve type context (e.g., ARRAY<BIGINT> instead of just
BIGINT)
- if (actualUpdatedSchema != null) {
- String fieldName = updateColumnType.fieldNames()[0];
- for (DataField field : actualUpdatedSchema.fields()) {
- if (fieldName.equals(field.name())) {
- newType = field.type();
- break;
- }
- }
- }
-
- switch (canConvert(oldType, newType, typeMapping)) {
+ String topLevelFieldName = updateColumnType.fieldNames()[0];
+ TableSchema oldSchema =
+ schemaManager.latestOrThrow("Table does not exist. This is
unexpected.");
+ DataType oldTopLevelFieldType =
+ new
RowType(oldSchema.fields()).getField(topLevelFieldName).type();
+ DataType newTopLevelFieldType =
+ new
RowType(newSchema.fields()).getField(topLevelFieldName).type();
+
+ // For complex types, extract the top level type to check type
context (e.g.,
+ // ARRAY<BIGINT> instead of just BIGINT)
+ switch (canConvert(oldTopLevelFieldType, newTopLevelFieldType,
typeMapping)) {
case CONVERT:
catalog.alterTable(identifier, schemaChange, false);
break;
@@ -161,9 +145,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to
%s of Paimon table %s.",
- updateColumnType.fieldNames()[0],
- oldType,
- newType,
+ topLevelFieldName,
+ oldTopLevelFieldType,
+ newTopLevelFieldType,
identifier.getFullName()));
}
} else if (schemaChange instanceof SchemaChange.UpdateColumnComment) {
@@ -391,7 +375,10 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
}
// Generate nested column updates if needed
NestedSchemaUtils.generateNestedColumnUpdates(
- Arrays.asList(newFieldName), oldField.type(),
newField.type(), result);
+ Collections.singletonList(newFieldName),
+ oldField.type(),
+ newField.type(),
+ result);
// update column comment
if (newField.description() != null) {
result.add(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index d791d33d89..e7275a9470 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -669,7 +669,6 @@ public class FlinkCatalog extends AbstractCatalog {
org.apache.paimon.types.DataType oldType,
org.apache.paimon.types.DataType newType,
List<SchemaChange> schemaChanges) {
-
NestedSchemaUtils.generateNestedColumnUpdates(fieldNames, oldType,
newType, schemaChanges);
}