This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ee8b1b198 [core][spark] Support updating nested column types in Spark
(#4494)
ee8b1b198 is described below
commit ee8b1b198d3c65d68df15cda59dc0a0c31f63888
Author: tsreaper <[email protected]>
AuthorDate: Mon Nov 11 20:32:57 2024 +0800
[core][spark] Support updating nested column types in Spark (#4494)
---
.../org/apache/paimon/schema/SchemaChange.java | 25 +++++++----
.../org/apache/paimon/schema/SchemaManager.java | 28 ++++---------
.../org/apache/paimon/catalog/CatalogTestBase.java | 4 +-
.../apache/paimon/schema/SchemaManagerTest.java | 49 ++++++++++++++++++++++
.../cdc/UpdatedDataFieldsProcessFunctionBase.java | 9 ++--
.../java/org/apache/paimon/spark/SparkCatalog.java | 10 +----
.../paimon/spark/SparkSchemaEvolutionITCase.java | 33 +++++++++++++++
7 files changed, 116 insertions(+), 42 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 7e94b0a77..1c1d601bc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -83,12 +83,18 @@ public interface SchemaChange extends Serializable {
}
static SchemaChange updateColumnType(String fieldName, DataType
newDataType) {
- return new UpdateColumnType(fieldName, newDataType, false);
+ return new UpdateColumnType(Collections.singletonList(fieldName),
newDataType, false);
}
static SchemaChange updateColumnType(
String fieldName, DataType newDataType, boolean keepNullability) {
- return new UpdateColumnType(fieldName, newDataType, keepNullability);
+ return new UpdateColumnType(
+ Collections.singletonList(fieldName), newDataType,
keepNullability);
+ }
+
+ static SchemaChange updateColumnType(
+ List<String> fieldNames, DataType newDataType, boolean
keepNullability) {
+ return new UpdateColumnType(fieldNames, newDataType, keepNullability);
}
static SchemaChange updateColumnNullability(String fieldName, boolean
newNullability) {
@@ -357,19 +363,20 @@ public interface SchemaChange extends Serializable {
private static final long serialVersionUID = 1L;
- private final String fieldName;
+ private final List<String> fieldNames;
private final DataType newDataType;
// If true, do not change the target field nullability
private final boolean keepNullability;
- private UpdateColumnType(String fieldName, DataType newDataType,
boolean keepNullability) {
- this.fieldName = fieldName;
+ private UpdateColumnType(
+ List<String> fieldNames, DataType newDataType, boolean
keepNullability) {
+ this.fieldNames = fieldNames;
this.newDataType = newDataType;
this.keepNullability = keepNullability;
}
- public String fieldName() {
- return fieldName;
+ public List<String> fieldNames() {
+ return fieldNames;
}
public DataType newDataType() {
@@ -389,14 +396,14 @@ public interface SchemaChange extends Serializable {
return false;
}
UpdateColumnType that = (UpdateColumnType) o;
- return Objects.equals(fieldName, that.fieldName)
+ return Objects.equals(fieldNames, that.fieldNames)
&& newDataType.equals(that.newDataType);
}
@Override
public int hashCode() {
int result = Objects.hash(newDataType);
- result = 31 * result + Objects.hashCode(fieldName);
+ result = 31 * result + Objects.hashCode(fieldNames);
return result;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 5ffeca65d..86ed96d5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -319,7 +319,7 @@ public class SchemaManager implements Serializable {
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
- renameColumnValidation(oldTableSchema, rename);
+ assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames().toArray(new
String[0])) {
@Override
protected void updateLastColumn(List<DataField>
newFields, String fieldName)
@@ -361,15 +361,10 @@ public class SchemaManager implements Serializable {
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
- if
(oldTableSchema.partitionKeys().contains(update.fieldName())) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot update partition column [%s]
type in the table[%s].",
- update.fieldName(),
tableRoot.getName()));
- }
+ assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
updateNestedColumn(
newFields,
- new String[] {update.fieldName()},
+ update.fieldNames().toArray(new String[0]),
(field) -> {
DataType targetType = update.newDataType();
if (update.keepNullability()) {
@@ -382,13 +377,6 @@ public class SchemaManager implements Serializable {
String.format(
"Column type %s[%s] cannot be
converted to %s without loosing information.",
field.name(), field.type(),
targetType));
- AtomicInteger dummyId = new AtomicInteger(0);
- if (dummyId.get() != 0) {
- throw new RuntimeException(
- String.format(
- "Update column to nested
row type '%s' is not supported.",
- targetType));
- }
return new DataField(
field.id(), field.name(), targetType,
field.description());
});
@@ -594,15 +582,17 @@ public class SchemaManager implements Serializable {
}
}
- private static void renameColumnValidation(TableSchema schema,
RenameColumn change) {
+ private static void assertNotUpdatingPrimaryKeys(
+ TableSchema schema, List<String> fieldNames, String operation) {
// partition keys can't be nested columns
- if (change.fieldNames().size() > 1) {
+ if (fieldNames.size() > 1) {
return;
}
- String columnToRename = change.fieldNames().get(0);
+ String columnToRename = fieldNames.get(0);
if (schema.partitionKeys().contains(columnToRename)) {
throw new UnsupportedOperationException(
- String.format("Cannot rename partition column: [%s]",
columnToRename));
+ String.format(
+ "Cannot " + operation + " partition column: [%s]",
columnToRename));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 3bba6d562..f130920a7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -675,8 +675,8 @@ public abstract class CatalogTestBase {
false))
.satisfies(
anyCauseMatches(
- IllegalArgumentException.class,
- "Cannot update partition column [dt] type in
the table"));
+ UnsupportedOperationException.class,
+ "Cannot update partition column: [dt]"));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
index 5fb76387e..ac8d4cd91 100644
--- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java
@@ -663,4 +663,53 @@ public class SchemaManagerTest {
assertThatCode(() ->
manager.commitChanges(newNameAlreadyExistRenameColumn))
.hasMessageContaining("Column v.f2.f100 already exists");
}
+
+ @Test
+ public void testUpdateNestedColumnType() throws Exception {
+ RowType innerType =
+ RowType.of(
+ new DataField(4, "f1", DataTypes.INT()),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ RowType middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ RowType outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
+ Schema schema =
+ new Schema(
+ outerType.getFields(),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ new HashMap<>(),
+ "");
+ SchemaManager manager = new SchemaManager(LocalFileIO.create(), path);
+ manager.createTable(schema);
+
+ SchemaChange updateColumnType =
+ SchemaChange.updateColumnType(
+ Arrays.asList("v", "f2", "f1"), DataTypes.BIGINT(),
true);
+ manager.commitChanges(updateColumnType);
+
+ innerType =
+ RowType.of(
+ new DataField(4, "f1", DataTypes.BIGINT()),
+ new DataField(5, "f2", DataTypes.BIGINT()));
+ middleType =
+ RowType.of(
+ new DataField(2, "f1", DataTypes.STRING()),
+ new DataField(3, "f2", innerType));
+ outerType =
+ RowType.of(
+ new DataField(0, "k", DataTypes.INT()), new
DataField(1, "v", middleType));
+
assertThat(manager.latest().get().logicalRowType()).isEqualTo(outerType);
+
+ SchemaChange middleColumnNotExistUpdateColumnType =
+ SchemaChange.updateColumnType(
+ Arrays.asList("v", "invalid", "f1"),
DataTypes.BIGINT(), true);
+ assertThatCode(() ->
manager.commitChanges(middleColumnNotExistUpdateColumnType))
+ .hasMessageContaining("Column v.invalid does not exist");
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
index 77c49e8f3..0e93fdb07 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java
@@ -100,6 +100,9 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
} else if (schemaChange instanceof SchemaChange.UpdateColumnType) {
SchemaChange.UpdateColumnType updateColumnType =
(SchemaChange.UpdateColumnType) schemaChange;
+ Preconditions.checkState(
+ updateColumnType.fieldNames().size() == 1,
+ "Paimon CDC currently does not support nested type schema
evolution.");
TableSchema schema =
schemaManager
.latest()
@@ -107,11 +110,11 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
() ->
new RuntimeException(
"Table does not exist.
This is unexpected."));
- int idx =
schema.fieldNames().indexOf(updateColumnType.fieldName());
+ int idx =
schema.fieldNames().indexOf(updateColumnType.fieldNames().get(0));
Preconditions.checkState(
idx >= 0,
"Field name "
- + updateColumnType.fieldName()
+ + updateColumnType.fieldNames().get(0)
+ " does not exist in table. This is unexpected.");
DataType oldType = schema.fields().get(idx).type();
DataType newType = updateColumnType.newDataType();
@@ -123,7 +126,7 @@ public abstract class
UpdatedDataFieldsProcessFunctionBase<I, O> extends Process
throw new UnsupportedOperationException(
String.format(
"Cannot convert field %s from type %s to
%s of Paimon table %s.",
- updateColumnType.fieldName(),
+ updateColumnType.fieldNames().get(0),
oldType,
newType,
identifier.getFullName()));
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 82c8939ea..5fde2c565 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -385,9 +385,8 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
return SchemaChange.dropColumn(Arrays.asList(delete.fieldNames()));
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType update =
(TableChange.UpdateColumnType) change;
- validateAlterNestedField(update.fieldNames());
return SchemaChange.updateColumnType(
- update.fieldNames()[0],
toPaimonType(update.newDataType()), true);
+ Arrays.asList(update.fieldNames()),
toPaimonType(update.newDataType()), true);
} else if (change instanceof TableChange.UpdateColumnNullability) {
TableChange.UpdateColumnNullability update =
(TableChange.UpdateColumnNullability) change;
@@ -449,13 +448,6 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
return schemaBuilder.build();
}
- private void validateAlterNestedField(String[] fieldNames) {
- if (fieldNames.length > 1) {
- throw new UnsupportedOperationException(
- "Alter nested column is not supported: " +
Arrays.toString(fieldNames));
- }
- }
-
private void validateAlterProperty(String alterKey) {
if (PRIMARY_KEY_IDENTIFIER.equals(alterKey)) {
throw new UnsupportedOperationException("Alter primary key is not
supported");
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index ccae59e88..771ddc628 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -817,4 +817,37 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.map(Row::toString))
.containsExactlyInAnyOrder("[apple,1]", "[banana,2]");
}
+
+ @ParameterizedTest()
+ @ValueSource(strings = {"orc", "avro", "parquet"})
+ public void testUpdateNestedColumnType(String formatType) {
+ String tableName = "testRenameNestedColumnTable";
+ spark.sql(
+ "CREATE TABLE paimon.default."
+ + tableName
+ + " (k INT NOT NULL, v STRUCT<f1: INT, f2: STRUCT<f1:
STRING, f2: INT>>) "
+ + "TBLPROPERTIES ('bucket' = '1', 'primary-key' = 'k',
'file.format' = '"
+ + formatType
+ + "')");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(10, STRUCT('apple', 100))), (2,
STRUCT(20, STRUCT('banana', 200)))");
+ assertThat(
+ spark.sql("SELECT v.f2.f2, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[100,1]", "[200,2]");
+
+ spark.sql("ALTER TABLE paimon.default." + tableName + " CHANGE COLUMN
v.f2.f2 f2 BIGINT");
+ spark.sql(
+ "INSERT INTO paimon.default."
+ + tableName
+ + " VALUES (1, STRUCT(11, STRUCT('APPLE', 101))), (3,
STRUCT(31, STRUCT('CHERRY', 3000000000000)))");
+ assertThat(
+ spark.sql("SELECT v.f2.f2, k FROM paimon.default." +
tableName)
+ .collectAsList().stream()
+ .map(Row::toString))
+ .containsExactlyInAnyOrder("[101,1]", "[200,2]",
"[3000000000000,3]");
+ }
}