This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eb340b59de [core] Primary key types should not be changed (#6264)
eb340b59de is described below
commit eb340b59de7c9fb3066e209c1e6a9b88a495cede
Author: tsreaper <[email protected]>
AuthorDate: Wed Sep 17 17:35:46 2025 +0800
[core] Primary key types should not be changed (#6264)
---
.../org/apache/paimon/schema/SchemaManager.java | 33 ++++++++++++++--------
.../apache/paimon/table/SchemaEvolutionTest.java | 16 +++++++++++
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 5 ++--
.../FilterPushdownWithSchemaChangeITCase.java | 5 ++--
4 files changed, 43 insertions(+), 16 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index f2f3d89064..cbfe81b3a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -372,7 +372,7 @@ public class SchemaManager implements Serializable {
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
- assertNotUpdatingPrimaryKeys(oldTableSchema,
rename.fieldNames(), "rename");
+ assertNotUpdatingPartitionKeys(oldTableSchema,
rename.fieldNames(), "rename");
new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
@Override
protected void updateLastColumn(
@@ -416,6 +416,7 @@ public class SchemaManager implements Serializable {
}.updateIntermediateColumn(newFields, 0);
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
+ assertNotUpdatingPartitionKeys(oldTableSchema,
update.fieldNames(), "update");
assertNotUpdatingPrimaryKeys(oldTableSchema,
update.fieldNames(), "update");
updateNestedColumn(
newFields,
@@ -458,11 +459,9 @@ public class SchemaManager implements Serializable {
lazyIdentifier);
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability)
change;
- if (update.fieldNames().length == 1
- && update.newNullability()
- &&
oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
- throw new UnsupportedOperationException(
- "Cannot change nullability of primary key");
+ if (update.newNullability()) {
+ assertNotUpdatingPrimaryKeys(
+ oldTableSchema, update.fieldNames(), "change
nullability of");
}
updateNestedColumn(
newFields,
@@ -839,17 +838,29 @@ public class SchemaManager implements Serializable {
}
}
- private static void assertNotUpdatingPrimaryKeys(
+ private static void assertNotUpdatingPartitionKeys(
TableSchema schema, String[] fieldNames, String operation) {
// partition keys can't be nested columns
if (fieldNames.length > 1) {
return;
}
- String columnToRename = fieldNames[0];
- if (schema.partitionKeys().contains(columnToRename)) {
+ String fieldName = fieldNames[0];
+ if (schema.partitionKeys().contains(fieldName)) {
throw new UnsupportedOperationException(
- String.format(
- "Cannot " + operation + " partition column: [%s]",
columnToRename));
+ String.format("Cannot %s partition column: [%s]",
operation, fieldName));
+ }
+ }
+
+ private static void assertNotUpdatingPrimaryKeys(
+ TableSchema schema, String[] fieldNames, String operation) {
+ // primary keys can't be nested columns
+ if (fieldNames.length > 1) {
+ return;
+ }
+ String fieldName = fieldNames[0];
+ if (schema.primaryKeys().contains(fieldName)) {
+ throw new UnsupportedOperationException(
+ String.format("Cannot %s primary key", operation));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index ad540d58b4..8b118e0562 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -227,6 +227,22 @@ public class SchemaEvolutionTest {
assertThat(tableSchema.fields().get(0).type()).isEqualTo(DataTypes.STRING());
}
+ @Test
+ public void testUpdatePrimaryKeyType() throws Exception {
+ Schema schema =
+ Schema.newBuilder()
+ .column("k", DataTypes.INT())
+ .column("v", DataTypes.BIGINT())
+ .primaryKey("k")
+ .build();
+ schemaManager.createTable(schema);
+
+ List<SchemaChange> changes =
+ Collections.singletonList(SchemaChange.updateColumnType("k",
DataTypes.STRING()));
+ assertThatThrownBy(() -> schemaManager.commitChanges(changes))
+ .hasMessageContaining("Cannot update primary key");
+ }
+
@Test
public void testRenameField() throws Exception {
Schema schema =
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 4436aa392d..ee64162ad9 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -516,8 +516,9 @@ public class CdcRecordStoreMultiWriteOperatorTest {
// first table
data = new HashMap<>();
data.put("pt", "1");
- data.put("k", "123456789876543211");
+ data.put("k", "2");
data.put("v", "varchar");
+ data.put("v2", "hello");
expected =
CdcMultiplexRecord.fromCdcRecord(
databaseName,
@@ -528,7 +529,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
assertThat(actual).isNull();
schemaManager = new SchemaManager(table1.fileIO(), table1.location());
- schemaManager.commitChanges(SchemaChange.updateColumnType("k",
DataTypes.BIGINT()));
+ schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.STRING()));
actual = runner.take();
assertThat(actual).isEqualTo(expected);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
index 73ea8f0ae1..317d574194 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java
@@ -247,13 +247,12 @@ public class FilterPushdownWithSchemaChangeITCase extends
CatalogITCaseBase {
sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')");
// key filter
- sql("ALTER TABLE T MODIFY (pk INT)");
assertThat(sql("SELECT * FROM T WHERE pk > 9"))
- .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11"));
+ .containsExactlyInAnyOrder(Row.of("10", "10"), Row.of("11",
"11"));
// value filter
sql("ALTER TABLE T MODIFY (v INT)");
assertThat(sql("SELECT * FROM T WHERE v > 9"))
- .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11));
+ .containsExactlyInAnyOrder(Row.of("10", 10), Row.of("11", 11));
}
}