This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cb2f6c8e4 [Bug] Support modify non physical column comment (#2548)
cb2f6c8e4 is described below

commit cb2f6c8e4831ea011bd1ad9bba3963c0d55bd600
Author: Aitozi <[email protected]>
AuthorDate: Thu Dec 21 15:04:00 2023 +0800

    [Bug] Support modify non physical column comment (#2548)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 16 ++++++--
 .../apache/paimon/flink/CatalogTableITCase.java    | 20 ----------
 .../apache/paimon/flink/SchemaChangeITCase.java    | 46 ++++++++++++++++++++++
 3 files changed, 59 insertions(+), 23 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e8a0f9635..8ab2d2f36 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -48,6 +48,7 @@ import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -459,10 +460,19 @@ public class FlinkCatalog extends AbstractCatalog {
             ResetOption resetOption = (ResetOption) change;
             schemaChanges.add(SchemaChange.removeOption(resetOption.getKey()));
             return schemaChanges;
-        } else {
-            throw new UnsupportedOperationException(
-                    "Change is not supported: " + change.getClass());
+        } else if (change instanceof TableChange.ModifyColumn) {
+            // let non-physical column handle by option
+            if (oldTableNonPhysicalColumnIndex.containsKey(
+                            ((TableChange.ModifyColumn) 
change).getOldColumn().getName())
+                    && !(((TableChange.ModifyColumn) change).getNewColumn()
+                            instanceof Column.PhysicalColumn)) {
+                return schemaChanges;
+            } else {
+                throw new UnsupportedOperationException(
+                        "Change is not supported: " + change.getClass());
+            }
         }
+        throw new UnsupportedOperationException("Change is not supported: " + 
change.getClass());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index ab798c426..f662086fe 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -754,26 +754,6 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                         + "is full-compaction or lookup 
because it will read duplicated changes."));
     }
 
-    @Test
-    public void testAlterTableNonPhysicalColumn() {
-        sql(
-                "CREATE TABLE T (a INT,  c ROW < a INT, d INT> METADATA, b 
INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts)");
-        sql("ALTER TABLE T ADD e VARCHAR METADATA");
-        sql("ALTER TABLE T DROP c ");
-        sql("ALTER TABLE T RENAME e TO ee");
-        List<Row> result = sql("SHOW CREATE TABLE T");
-        assertThat(result.get(0).toString())
-                .contains(
-                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
-                                + "  `a` INT,\n"
-                                + "  `b` INT,\n"
-                                + "  `ts` TIMESTAMP(3),\n"
-                                + "  `ee` VARCHAR(2147483647) METADATA,\n"
-                                + "  WATERMARK FOR `ts` AS `ts`\n"
-                                + ") ")
-                .doesNotContain("schema");
-    }
-
     @Test
     public void testShowTableMetadataComment() {
         sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b 
INT)");
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 1a6444149..90d4235b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -940,4 +940,50 @@ public class SchemaChangeITCase extends CatalogITCaseBase {
                         "+I[b, STRING, true, null, null, null, from column b]",
                         "+I[c, STRING, true, null, null, null, null]");
     }
+
+    @Test
+    public void testAlterTableNonPhysicalColumn() {
+        sql(
+                "CREATE TABLE T (a INT,  c ROW < a INT, d INT> METADATA, b 
INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts)");
+        sql("ALTER TABLE T ADD e VARCHAR METADATA");
+        sql("ALTER TABLE T DROP c ");
+        sql("ALTER TABLE T RENAME e TO ee");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.get(0).toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` INT,\n"
+                                + "  `b` INT,\n"
+                                + "  `ts` TIMESTAMP(3),\n"
+                                + "  `ee` VARCHAR(2147483647) METADATA,\n"
+                                + "  WATERMARK FOR `ts` AS `ts`\n"
+                                + ") ")
+                .doesNotContain("schema");
+    }
+
+    @Test
+    public void testAlterTableMetadataComment() {
+        sql("CREATE TABLE T (a INT, name VARCHAR METADATA COMMENT 'header1', b 
INT)");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.get(0).toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` INT,\n"
+                                + "  `name` VARCHAR(2147483647) METADATA 
COMMENT 'header1',\n"
+                                + "  `b` INT\n"
+                                + ")")
+                .doesNotContain("schema");
+        sql("ALTER TABLE T MODIFY name VARCHAR METADATA COMMENT 'header2'");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.get(0).toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` INT,\n"
+                                + "  `name` VARCHAR(2147483647) METADATA 
COMMENT 'header2',\n"
+                                + "  `b` INT\n"
+                                + ")")
+                .doesNotContain("schema");
+        // change name from non-physical column to physical column is not 
allowed
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY name VARCHAR 
COMMENT 'header3'"));
+    }
 }

Reply via email to