This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cb2f6c8e4 [Bug] Support modify non physical column comment (#2548)
cb2f6c8e4 is described below
commit cb2f6c8e4831ea011bd1ad9bba3963c0d55bd600
Author: Aitozi <[email protected]>
AuthorDate: Thu Dec 21 15:04:00 2023 +0800
[Bug] Support modify non physical column comment (#2548)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 16 ++++++--
.../apache/paimon/flink/CatalogTableITCase.java | 20 ----------
.../apache/paimon/flink/SchemaChangeITCase.java | 46 ++++++++++++++++++++++
3 files changed, 59 insertions(+), 23 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e8a0f9635..8ab2d2f36 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -459,10 +460,19 @@ public class FlinkCatalog extends AbstractCatalog {
ResetOption resetOption = (ResetOption) change;
schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
return schemaChanges;
- } else {
- throw new UnsupportedOperationException(
- "Change is not supported: " + change.getClass());
+ } else if (change instanceof TableChange.ModifyColumn) {
+ // let non-physical column handle by option
+ if (oldTableNonPhysicalColumnIndex.containsKey(
+ ((TableChange.ModifyColumn)
change).getOldColumn().getName())
+ && !(((TableChange.ModifyColumn) change).getNewColumn()
+ instanceof Column.PhysicalColumn)) {
+ return schemaChanges;
+ } else {
+ throw new UnsupportedOperationException(
+ "Change is not supported: " + change.getClass());
+ }
}
+ throw new UnsupportedOperationException("Change is not supported: " +
change.getClass());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ab798c426..f662086fe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -754,26 +754,6 @@ public class CatalogTableITCase extends CatalogITCaseBase {
+ "is full-compaction or lookup
because it will read duplicated changes."));
}
- @Test
- public void testAlterTableNonPhysicalColumn() {
- sql(
- "CREATE TABLE T (a INT, c ROW < a INT, d INT> METADATA, b
INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts)");
- sql("ALTER TABLE T ADD e VARCHAR METADATA");
- sql("ALTER TABLE T DROP c ");
- sql("ALTER TABLE T RENAME e TO ee");
- List<Row> result = sql("SHOW CREATE TABLE T");
- assertThat(result.get(0).toString())
- .contains(
- "CREATE TABLE `PAIMON`.`default`.`T` (\n"
- + " `a` INT,\n"
- + " `b` INT,\n"
- + " `ts` TIMESTAMP(3),\n"
- + " `ee` VARCHAR(2147483647) METADATA,\n"
- + " WATERMARK FOR `ts` AS `ts`\n"
- + ") ")
- .doesNotContain("schema");
- }
-
@Test
public void testShowTableMetadataComment() {
sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b
INT)");
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 1a6444149..90d4235b2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -940,4 +940,50 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
"+I[b, STRING, true, null, null, null, from column b]",
"+I[c, STRING, true, null, null, null, null]");
}
+
+ @Test
+ public void testAlterTableNonPhysicalColumn() {
+ sql(
+ "CREATE TABLE T (a INT, c ROW < a INT, d INT> METADATA, b
INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts)");
+ sql("ALTER TABLE T ADD e VARCHAR METADATA");
+ sql("ALTER TABLE T DROP c ");
+ sql("ALTER TABLE T RENAME e TO ee");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.get(0).toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` INT,\n"
+ + " `b` INT,\n"
+ + " `ts` TIMESTAMP(3),\n"
+ + " `ee` VARCHAR(2147483647) METADATA,\n"
+ + " WATERMARK FOR `ts` AS `ts`\n"
+ + ") ")
+ .doesNotContain("schema");
+ }
+
+ @Test
+ public void testAlterTableMetadataComment() {
+ sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b
INT)");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.get(0).toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` INT,\n"
+ + " `name` VARCHAR(2147483647) METADATA
COMMENT 'header1',\n"
+ + " `b` INT\n"
+ + ")")
+ .doesNotContain("schema");
+ sql("ALTER TABLE T MODIFY name VARCHAR METADATA COMMENT 'header2'");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.get(0).toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` INT,\n"
+ + " `name` VARCHAR(2147483647) METADATA
COMMENT 'header2',\n"
+ + " `b` INT\n"
+ + ")")
+ .doesNotContain("schema");
+ // change name from non-physical column to physical column is not
allowed
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR
COMMENT 'header3'"));
+ }
}