This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fac896822 [core] update test as rest catalog support alter format 
table (#5822)
8fac896822 is described below

commit 8fac896822c72de836a00cb612f02dcb04455672
Author: jerry <[email protected]>
AuthorDate: Wed Jul 2 11:40:21 2025 +0800

    [core] update test as rest catalog support alter format table (#5822)
---
 .../org/apache/paimon/schema/SchemaManager.java    |   81 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java | 1127 ++++++++++----------
 .../org/apache/paimon/rest/RESTCatalogServer.java  |   24 +-
 .../org/apache/paimon/rest/RESTCatalogTest.java    |    5 +
 4 files changed, 618 insertions(+), 619 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index a33d4e4f1f..2d8a6052fd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -289,7 +289,10 @@ public class SchemaManager implements Serializable {
                                             new Catalog.TableNotExistException(
                                                     identifierFromPath(
                                                             
tableRoot.toString(), true, branch)));
-            TableSchema newTableSchema = generateTableSchema(oldTableSchema, 
changes, hasSnapshots);
+            LazyField<Identifier> lazyIdentifier =
+                    new LazyField<>(() -> 
identifierFromPath(tableRoot.toString(), true, branch));
+            TableSchema newTableSchema =
+                    generateTableSchema(oldTableSchema, changes, hasSnapshots, 
lazyIdentifier);
             try {
                 boolean success = commit(newTableSchema);
                 if (success) {
@@ -301,8 +304,11 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    public TableSchema generateTableSchema(
-            TableSchema oldTableSchema, List<SchemaChange> changes, 
LazyField<Boolean> hasSnapshots)
+    public static TableSchema generateTableSchema(
+            TableSchema oldTableSchema,
+            List<SchemaChange> changes,
+            LazyField<Boolean> hasSnapshots,
+            LazyField<Identifier> lazyIdentifier)
             throws Catalog.ColumnAlreadyExistException, 
Catalog.ColumnNotExistException {
         Map<String, String> oldOptions = new 
HashMap<>(oldTableSchema.options());
         Map<String, String> newOptions = new 
HashMap<>(oldTableSchema.options());
@@ -351,16 +357,15 @@ public class SchemaManager implements Serializable {
                         addColumn.dataType().isNullable(),
                         "Column %s cannot specify NOT NULL in the %s table.",
                         String.join(".", addColumn.fieldNames()),
-                        identifierFromPath(tableRoot.toString(), true, 
branch).getFullName());
+                        lazyIdentifier.get().getFullName());
                 int id = highestFieldId.incrementAndGet();
                 DataType dataType = 
ReassignFieldId.reassign(addColumn.dataType(), highestFieldId);
-
-                new NestedColumnModifier(addColumn.fieldNames()) {
+                new NestedColumnModifier(addColumn.fieldNames(), 
lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
                             int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnAlreadyExistException {
-                        assertColumnNotExists(newFields, fieldName);
+                        assertColumnNotExists(newFields, fieldName, 
lazyIdentifier);
 
                         DataField dataField =
                                 new DataField(id, fieldName, dataType, 
addColumn.description());
@@ -386,14 +391,14 @@ public class SchemaManager implements Serializable {
             } else if (change instanceof RenameColumn) {
                 RenameColumn rename = (RenameColumn) change;
                 assertNotUpdatingPrimaryKeys(oldTableSchema, 
rename.fieldNames(), "rename");
-                new NestedColumnModifier(rename.fieldNames()) {
+                new NestedColumnModifier(rename.fieldNames(), lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
                             int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnNotExistException,
                                     Catalog.ColumnAlreadyExistException {
-                        assertColumnExists(newFields, fieldName);
-                        assertColumnNotExists(newFields, rename.newName());
+                        assertColumnExists(newFields, fieldName, 
lazyIdentifier);
+                        assertColumnNotExists(newFields, rename.newName(), 
lazyIdentifier);
                         for (int i = 0; i < newFields.size(); i++) {
                             DataField field = newFields.get(i);
                             if (!field.name().equals(fieldName)) {
@@ -415,12 +420,12 @@ public class SchemaManager implements Serializable {
             } else if (change instanceof DropColumn) {
                 DropColumn drop = (DropColumn) change;
                 dropColumnValidation(oldTableSchema, drop);
-                new NestedColumnModifier(drop.fieldNames()) {
+                new NestedColumnModifier(drop.fieldNames(), lazyIdentifier) {
                     @Override
                     protected void updateLastColumn(
                             int depth, List<DataField> newFields, String 
fieldName)
                             throws Catalog.ColumnNotExistException {
-                        assertColumnExists(newFields, fieldName);
+                        assertColumnExists(newFields, fieldName, 
lazyIdentifier);
                         newFields.removeIf(f -> f.name().equals(fieldName));
                         if (newFields.isEmpty()) {
                             throw new IllegalArgumentException("Cannot drop 
all fields in table");
@@ -467,7 +472,8 @@ public class SchemaManager implements Serializable {
                                             update.fieldNames().length),
                                     field.description(),
                                     field.defaultValue());
-                        });
+                        },
+                        lazyIdentifier);
             } else if (change instanceof UpdateColumnNullability) {
                 UpdateColumnNullability update = (UpdateColumnNullability) 
change;
                 if (update.fieldNames().length == 1
@@ -499,7 +505,8 @@ public class SchemaManager implements Serializable {
                                             update.fieldNames().length),
                                     field.description(),
                                     field.defaultValue());
-                        });
+                        },
+                        lazyIdentifier);
             } else if (change instanceof UpdateColumnComment) {
                 UpdateColumnComment update = (UpdateColumnComment) change;
                 updateNestedColumn(
@@ -511,7 +518,8 @@ public class SchemaManager implements Serializable {
                                         field.name(),
                                         field.type(),
                                         update.newDescription(),
-                                        field.defaultValue()));
+                                        field.defaultValue()),
+                        lazyIdentifier);
             } else if (change instanceof UpdateColumnPosition) {
                 UpdateColumnPosition update = (UpdateColumnPosition) change;
                 SchemaChange.Move move = update.move();
@@ -527,7 +535,8 @@ public class SchemaManager implements Serializable {
                                         field.name(),
                                         field.type(),
                                         field.description(),
-                                        update.newDefaultValue()));
+                                        update.newDefaultValue()),
+                        lazyIdentifier);
             } else {
                 throw new UnsupportedOperationException("Unsupported change: " 
+ change.getClass());
             }
@@ -561,7 +570,7 @@ public class SchemaManager implements Serializable {
     // the maxDepth will be based on updateFieldNames
     // which in the case will be [v, element, value, element],
     // so maxDepth is 4 and return DataType will be INT
-    private DataType getRootType(DataType type, int currDepth, int maxDepth) {
+    private static DataType getRootType(DataType type, int currDepth, int 
maxDepth) {
         if (currDepth == maxDepth - 1) {
             return type;
         }
@@ -579,7 +588,7 @@ public class SchemaManager implements Serializable {
     // ex: ARRAY<MAP<STRING, ARRAY<INT>>> -> ARRAY<MAP<STRING, ARRAY<BIGINT>>>
     // here we only need to update type of ARRAY<INT> to ARRAY<BIGINT> and 
rest of the type
     // remains same. This function achieves this.
-    private DataType getArrayMapTypeWithTargetTypeRoot(
+    private static DataType getArrayMapTypeWithTargetTypeRoot(
             DataType source, DataType target, int currDepth, int maxDepth) {
         if (currDepth == maxDepth - 1) {
             return target;
@@ -607,7 +616,7 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    private void assertNullabilityChange(
+    private static void assertNullabilityChange(
             boolean oldNullability,
             boolean newNullability,
             String fieldName,
@@ -622,7 +631,7 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
+    public static void applyMove(List<DataField> newFields, SchemaChange.Move 
move) {
         Map<String, Integer> map = new HashMap<>();
         for (int i = 0; i < newFields.size(); i++) {
             map.put(newFields.get(i).name(), i);
@@ -671,7 +680,7 @@ public class SchemaManager implements Serializable {
     }
 
     // Utility method to move a field within the list, handling range checks
-    private void moveField(List<DataField> newFields, int fromIndex, int 
toIndex) {
+    private static void moveField(List<DataField> newFields, int fromIndex, 
int toIndex) {
         if (fromIndex < 0 || fromIndex >= newFields.size() || toIndex < 0) {
             return;
         }
@@ -768,12 +777,14 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    private abstract class NestedColumnModifier {
+    private abstract static class NestedColumnModifier {
 
         private final String[] updateFieldNames;
+        private final LazyField<Identifier> identifier;
 
-        private NestedColumnModifier(String[] updateFieldNames) {
+        private NestedColumnModifier(String[] updateFieldNames, 
LazyField<Identifier> identifier) {
             this.updateFieldNames = updateFieldNames;
+            this.identifier = identifier;
         }
 
         private void updateIntermediateColumn(
@@ -816,7 +827,7 @@ public class SchemaManager implements Serializable {
             }
 
             throw new Catalog.ColumnNotExistException(
-                    identifierFromPath(tableRoot.toString(), true, branch),
+                    identifier.get(),
                     String.join(".", 
Arrays.asList(updateFieldNames).subList(0, depth + 1)));
         }
 
@@ -863,7 +874,8 @@ public class SchemaManager implements Serializable {
                 int depth, List<DataField> newFields, String fieldName)
                 throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException;
 
-        protected void assertColumnExists(List<DataField> newFields, String 
fieldName)
+        protected void assertColumnExists(
+                List<DataField> newFields, String fieldName, 
LazyField<Identifier> lazyIdentifier)
                 throws Catalog.ColumnNotExistException {
             for (DataField field : newFields) {
                 if (field.name().equals(fieldName)) {
@@ -871,17 +883,16 @@ public class SchemaManager implements Serializable {
                 }
             }
             throw new Catalog.ColumnNotExistException(
-                    identifierFromPath(tableRoot.toString(), true, branch),
-                    getLastFieldName(fieldName));
+                    lazyIdentifier.get(), getLastFieldName(fieldName));
         }
 
-        protected void assertColumnNotExists(List<DataField> newFields, String 
fieldName)
+        protected void assertColumnNotExists(
+                List<DataField> newFields, String fieldName, 
LazyField<Identifier> lazyIdentifier)
                 throws Catalog.ColumnAlreadyExistException {
             for (DataField field : newFields) {
                 if (field.name().equals(fieldName)) {
                     throw new Catalog.ColumnAlreadyExistException(
-                            identifierFromPath(tableRoot.toString(), true, 
branch),
-                            getLastFieldName(fieldName));
+                            lazyIdentifier.get(), getLastFieldName(fieldName));
                 }
             }
         }
@@ -896,12 +907,13 @@ public class SchemaManager implements Serializable {
         }
     }
 
-    private void updateNestedColumn(
+    private static void updateNestedColumn(
             List<DataField> newFields,
             String[] updateFieldNames,
-            BiFunction<DataField, Integer, DataField> updateFunc)
+            BiFunction<DataField, Integer, DataField> updateFunc,
+            LazyField<Identifier> lazyIdentifier)
             throws Catalog.ColumnNotExistException, 
Catalog.ColumnAlreadyExistException {
-        new NestedColumnModifier(updateFieldNames) {
+        new NestedColumnModifier(updateFieldNames, lazyIdentifier) {
             @Override
             protected void updateLastColumn(int depth, List<DataField> 
newFields, String fieldName)
                     throws Catalog.ColumnNotExistException {
@@ -916,8 +928,7 @@ public class SchemaManager implements Serializable {
                 }
 
                 throw new Catalog.ColumnNotExistException(
-                        identifierFromPath(tableRoot.toString(), true, branch),
-                        String.join(".", updateFieldNames));
+                        lazyIdentifier.get(), String.join(".", 
updateFieldNames));
             }
         }.updateIntermediateColumn(newFields, 0);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 46eaac25c0..2e525a515f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -65,6 +65,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
+import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
 import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
@@ -693,626 +694,228 @@ public abstract class CatalogTestBase {
     }
 
     @Test
-    public void testAlterTable() throws Exception {
-        catalog.createDatabase("test_db", false);
+    public void testAlterDataTable() throws Exception {
+        baseAlterTable(Maps.newHashMap());
+    }
 
-        // Alter table adds a new column to an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(new DataField(0, "col1", 
DataTypes.STRING())),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        ""),
-                false);
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(
-                        SchemaChange.addColumn("col2", DataTypes.DATE()),
-                        SchemaChange.addColumn("col3", DataTypes.STRING(), 
"col3 field")),
-                false);
-        Table table = catalog.getTable(identifier);
-        assertThat(table.rowType().getFields()).hasSize(3);
-        int index = table.rowType().getFieldIndex("col2");
-        int index2 = table.rowType().getFieldIndex("col3");
-        assertThat(index).isEqualTo(1);
-        assertThat(index2).isEqualTo(2);
-        
assertThat(table.rowType().getTypeAt(index)).isEqualTo(DataTypes.DATE());
-        
assertThat(table.rowType().getTypeAt(index2)).isEqualTo(DataTypes.STRING());
-        
assertThat(table.rowType().getFields().get(2).description()).isEqualTo("col3 
field");
+    @Test
+    public void testAlterMaterializedTable() throws Exception {
+        Map<String, String> initOptions = Maps.newHashMap();
+        initOptions.put(CoreOptions.TYPE.key(), 
TableType.MATERIALIZED_TABLE.toString());
+        baseAlterTable(initOptions);
+    }
 
-        // Alter table throws Exception when table is system table
-        assertThatExceptionOfType(IllegalArgumentException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        Identifier.create("test_db", 
"$system_table"),
-                                        Lists.newArrayList(
-                                                SchemaChange.addColumn("col2", 
DataTypes.DATE())),
-                                        false))
-                .withMessage(
-                        "Cannot 'alterTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
+    @Test
+    public void testView() throws Exception {
+        if (!supportsView()) {
+            return;
+        }
+        Identifier identifier = new Identifier("view_db", "my_view");
+        View view = createView(identifier);
 
-        // Alter table throws TableNotExistException when table does not exist
-        assertThatExceptionOfType(Catalog.TableNotExistException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        Identifier.create("test_db", 
"non_existing_table"),
-                                        Lists.newArrayList(
-                                                SchemaChange.addColumn("col3", 
DataTypes.INT())),
-                                        false))
-                .withMessage("Table test_db.non_existing_table does not 
exist.");
+        assertThatThrownBy(() -> catalog.createView(identifier, view, false))
+                .isInstanceOf(Catalog.DatabaseNotExistException.class);
 
-        // Alter table adds a column throws ColumnAlreadyExistException when 
column already exists
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.addColumn("col1", 
DataTypes.INT())),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnAlreadyExistException.class,
-                                "Column col1 already exists in the 
test_db.test_table table."));
+        assertThatThrownBy(() -> 
catalog.listViews(identifier.getDatabaseName()))
+                .isInstanceOf(Catalog.DatabaseNotExistException.class);
 
-        // conflict options
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.setOption(
-                                                        "changelog-producer", 
"input")),
-                                        false))
-                .isInstanceOf(RuntimeException.class)
-                .hasMessageContaining(
-                        "Can not set changelog-producer on table without 
primary keys");
-    }
+        catalog.createDatabase(identifier.getDatabaseName(), false);
 
-    @Test
-    public void testAlterTableRenameColumn() throws Exception {
-        catalog.createDatabase("test_db", false);
+        assertThatThrownBy(() -> catalog.getView(identifier))
+                .isInstanceOf(Catalog.ViewNotExistException.class);
 
-        // Alter table renames a column in an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col1", DataTypes.STRING()),
-                                new DataField(1, "col2", DataTypes.STRING())),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        ""),
-                false);
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(SchemaChange.renameColumn("col1", 
"new_col1")),
-                false);
-        Table table = catalog.getTable(identifier);
+        catalog.createView(identifier, view, false);
 
-        assertThat(table.rowType().getFields()).hasSize(2);
-        assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
-        assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
+        View catalogView = catalog.getView(identifier);
+        assertThat(catalogView.fullName()).isEqualTo(view.fullName());
+        assertThat(catalogView.rowType()).isEqualTo(view.rowType());
+        assertThat(catalogView.query()).isEqualTo(view.query());
+        assertThat(catalogView.dialects()).isEqualTo(view.dialects());
+        assertThat(catalogView.comment()).isEqualTo(view.comment());
+        assertThat(catalogView.options()).containsAllEntriesOf(view.options());
 
-        // Alter table renames a new column throws ColumnAlreadyExistException 
when column already
-        // exists
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                
SchemaChange.renameColumn("col2", "new_col1")),
-                                        false))
-                .isInstanceOf(Catalog.ColumnAlreadyExistException.class);
+        List<String> views = catalog.listViews(identifier.getDatabaseName());
+        assertThat(views).containsOnly(identifier.getObjectName());
 
-        // Alter table renames a column throws ColumnNotExistException when 
column does not exist
+        catalog.createView(identifier, view, true);
+        assertThatThrownBy(() -> catalog.createView(identifier, view, false))
+                .isInstanceOf(Catalog.ViewAlreadyExistException.class);
+
+        Identifier newIdentifier = new Identifier("view_db", "new_view");
+        catalog.renameView(new Identifier("view_db", "unknown"), 
newIdentifier, true);
         assertThatThrownBy(
                         () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.renameColumn(
-                                                        "non_existing_col", 
"new_col2")),
-                                        false))
-                .isInstanceOf(Catalog.ColumnNotExistException.class);
+                                catalog.renameView(
+                                        new Identifier("view_db", "unknown"), 
newIdentifier, false))
+                .isInstanceOf(Catalog.ViewNotExistException.class);
+        catalog.renameView(identifier, newIdentifier, false);
+
+        catalog.dropView(newIdentifier, true);
+        assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
+                .isInstanceOf(Catalog.ViewNotExistException.class);
     }
 
     @Test
-    public void testAlterTableDropColumn() throws Exception {
-        catalog.createDatabase("test_db", false);
+    public void testListViewsPaged() throws Exception {
+        if (!supportsView()) {
+            return;
+        }
 
-        // Alter table drop a column in an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col1", DataTypes.STRING()),
-                                new DataField(1, "col2", DataTypes.STRING())),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        ""),
-                false);
-        catalog.alterTable(identifier, 
Lists.newArrayList(SchemaChange.dropColumn("col1")), false);
-        Table table = catalog.getTable(identifier);
+        // List views returns an empty list when there are no views in the 
database
+        String databaseName = "views_paged_db";
+        catalog.createDatabase(databaseName, false);
+        PagedList<String> pagedViews = catalog.listViewsPaged(databaseName, 
null, null, null);
+        assertThat(pagedViews.getElements()).isEmpty();
+        assertNull(pagedViews.getNextPageToken());
 
-        assertThat(table.rowType().getFields()).hasSize(1);
-        assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
+        // List views paged returns a list with the names of all views in the 
database in all
+        // catalogs except RestCatalog
+        // even if the maxResults or pageToken is not null
+        View view = buildView(databaseName);
+        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
+        for (String viewName : viewNames) {
+            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
+        }
 
-        // Alter table drop all fields throws Exception
-        assertThatThrownBy(
+        pagedViews = catalog.listViewsPaged(databaseName, null, null, null);
+        assertPagedViews(pagedViews, viewNames);
+
+        int maxResults = 2;
+        pagedViews = catalog.listViewsPaged(databaseName, maxResults, null, 
null);
+        assertPagedViews(pagedViews, viewNames);
+
+        String pageToken = "view1";
+        pagedViews = catalog.listViewsPaged(databaseName, maxResults, 
pageToken, null);
+        assertPagedViews(pagedViews, viewNames);
+
+        maxResults = 8;
+        pagedViews = catalog.listViewsPaged(databaseName, maxResults, null, 
null);
+        assertPagedViews(pagedViews, viewNames);
+
+        pagedViews = catalog.listViewsPaged(databaseName, maxResults, 
pageToken, null);
+        assertPagedViews(pagedViews, viewNames);
+
+        // List views throws DatabaseNotExistException when the database does 
not exist
+        final int finalMaxResults = maxResults;
+        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
+                .isThrownBy(
                         () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        
Lists.newArrayList(SchemaChange.dropColumn("col2")),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalArgumentException.class, "Cannot drop 
all fields in table"));
+                                catalog.listViewsPaged(
+                                        "non_existing_db", finalMaxResults, 
pageToken, null));
 
-        // Alter table drop a column throws ColumnNotExistException when 
column does not exist
-        assertThatThrownBy(
+        assertThatExceptionOfType(UnsupportedOperationException.class)
+                .isThrownBy(
                         () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                
SchemaChange.dropColumn("non_existing_col")),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnNotExistException.class,
-                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+                                catalog.listViewsPaged(
+                                        databaseName, finalMaxResults, 
pageToken, "view%"));
     }
 
     @Test
-    public void testAlterTableUpdateColumnType() throws Exception {
-        catalog.createDatabase("test_db", false);
-
-        // Alter table update a column type in an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "dt", DataTypes.STRING()),
-                                new DataField(1, "col1", DataTypes.BIGINT(), 
"col1 field")),
-                        Lists.newArrayList("dt"),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        ""),
-                false);
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(SchemaChange.updateColumnType("col1", 
DataTypes.DOUBLE())),
-                false);
-        Table table = catalog.getTable(identifier);
+    public void testListViewDetailsPaged() throws Exception {
+        if (!supportsView()) {
+            return;
+        }
 
-        assertThat(table.rowType().getFieldIndex("col1")).isEqualTo(1);
-        assertThat(table.rowType().getTypeAt(1)).isEqualTo(DataTypes.DOUBLE());
-        
assertThat(table.rowType().getFields().get(1).description()).isEqualTo("col1 
field");
-
-        // Alter table update a column type throws Exception when column data 
type does not support
-        // cast
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.updateColumnType(
-                                                        "col1", 
DataTypes.DATE())),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                IllegalStateException.class,
-                                "Column type col1[DOUBLE] cannot be converted 
to DATE without loosing information."));
+        // List views returns an empty list when there are no views in the 
database
+        String databaseName = "view_details_paged_db";
+        catalog.createDatabase(databaseName, false);
+        PagedList<View> pagedViewDetailsPaged =
+                catalog.listViewDetailsPaged(databaseName, null, null, null);
+        assertThat(pagedViewDetailsPaged.getElements()).isEmpty();
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        // Alter table update a column type throws ColumnNotExistException 
when column does not
-        // exist
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.updateColumnType(
-                                                        "non_existing_col", 
DataTypes.INT())),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnNotExistException.class,
-                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
-        // Alter table update a column type throws Exception when column is 
partition columns
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                SchemaChange.updateColumnType(
-                                                        "dt", 
DataTypes.DATE())),
-                                        false))
-                .satisfies(anyCauseMatches("Cannot update partition column: 
[dt]"));
-    }
+        // List view details paged returns a list with all view in the 
database in all catalogs
+        // except RestCatalog
+        // even if the maxResults or pageToken is not null
+        View view = buildView(databaseName);
+        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
+        for (String viewName : viewNames) {
+            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
+        }
 
-    @Test
-    public void testAlterTableUpdateColumnComment() throws Exception {
-        catalog.createDatabase("test_db", false);
+        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
null, null, null);
+        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        // Alter table update a column comment in an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
-                                new DataField(1, "col2", DataTypes.STRING(), 
"field2"),
-                                new DataField(
-                                        2,
-                                        "col3",
-                                        DataTypes.ROW(
-                                                new DataField(4, "f1", 
DataTypes.STRING(), "f1"),
-                                                new DataField(5, "f2", 
DataTypes.STRING(), "f2"),
-                                                new DataField(6, "f3", 
DataTypes.STRING(), "f3")),
-                                        "field3")),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        ""),
-                false);
+        int maxResults = 2;
+        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
maxResults, null, null);
+        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(SchemaChange.updateColumnComment("col2", 
"col2 field")),
-                false);
+        String pageToken = "view1";
+        pagedViewDetailsPaged =
+                catalog.listViewDetailsPaged(databaseName, maxResults, 
pageToken, null);
+        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        // Update nested column
-        String[] fields = new String[] {"col3", "f1"};
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(SchemaChange.updateColumnComment(fields, 
"col3 f1 field")),
-                false);
+        maxResults = 8;
+        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
maxResults, null, null);
+        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        Table table = catalog.getTable(identifier);
-        
assertThat(table.rowType().getFields().get(1).description()).isEqualTo("col2 
field");
-        RowType rowType = (RowType) table.rowType().getFields().get(2).type();
-        assertThat(rowType.getFields().get(0).description()).isEqualTo("col3 
f1 field");
+        pagedViewDetailsPaged =
+                catalog.listViewDetailsPaged(databaseName, maxResults, 
pageToken, null);
+        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
+        assertNull(pagedViewDetailsPaged.getNextPageToken());
 
-        // Alter table update a column comment throws Exception when column 
does not exist
-        assertThatThrownBy(
+        // List view details throws DatabaseNotExistException when the 
database does not exist
+        final int finalMaxResults = maxResults;
+        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
+                .isThrownBy(
                         () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                
SchemaChange.updateColumnComment(
-                                                        new String[] 
{"non_existing_col"}, "")),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnNotExistException.class,
-                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+                                catalog.listViewDetailsPaged(
+                                        "non_existing_db", finalMaxResults, 
pageToken, null));
     }
 
     @Test
-    public void testAlterTableUpdateColumnNullability() throws Exception {
-        catalog.createDatabase("test_db", false);
-
-        // Alter table update a column nullability in an existing table
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
-                                new DataField(1, "col2", DataTypes.STRING(), 
"field2"),
-                                new DataField(
-                                        2,
-                                        "col3",
-                                        DataTypes.ROW(
-                                                new DataField(4, "f1", 
DataTypes.STRING(), "f1"),
-                                                new DataField(5, "f2", 
DataTypes.STRING(), "f2"),
-                                                new DataField(6, "f3", 
DataTypes.STRING(), "f3")),
-                                        "field3")),
-                        Lists.newArrayList("col1"),
-                        Lists.newArrayList("col1", "col2"),
-                        Maps.newHashMap(),
-                        ""),
-                false);
-
-        catalog.alterTable(
-                identifier,
-                Lists.newArrayList(
-                        SchemaChange.setOption(
-                                
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(), "false")),
-                false);
-
-        catalog.alterTable(
-                identifier,
-                
Lists.newArrayList(SchemaChange.updateColumnNullability("col1", false)),
-                false);
-
-        // Update nested column
-        String[] fields = new String[] {"col3", "f1"};
-        catalog.alterTable(
-                identifier,
-                
Lists.newArrayList(SchemaChange.updateColumnNullability(fields, false)),
-                false);
+    public void testListViewsPagedGlobally() throws Exception {
+        if (!supportsView()) {
+            return;
+        }
 
-        Table table = catalog.getTable(identifier);
-        
assertThat(table.rowType().getFields().get(0).type().isNullable()).isEqualTo(false);
+        // List view paged globally throws UnsupportedOperationException if 
current catalog does not
+        // supportsListObjectsPaged or odes not supportsListByPattern
+        String databaseName = "list_views_paged_globally_db";
+        catalog.createDatabase(databaseName, false);
+        if (!catalog.supportsListObjectsPaged() || 
!catalog.supportsListByPattern()) {
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(databaseName, null, 
null, null));
+        }
 
-        // Alter table update a column nullability throws Exception when 
column does not exist
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                
SchemaChange.updateColumnNullability(
-                                                        new String[] 
{"non_existing_col"}, false)),
-                                        false))
-                .satisfies(
-                        anyCauseMatches(
-                                Catalog.ColumnNotExistException.class,
-                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+        View view = buildView(databaseName);
+        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
+        for (String viewName : viewNames) {
+            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
+        }
 
-        // Alter table update a column nullability throws Exception when 
column is pk columns
-        assertThatThrownBy(
-                        () ->
-                                catalog.alterTable(
-                                        identifier,
-                                        Lists.newArrayList(
-                                                
SchemaChange.updateColumnNullability(
-                                                        new String[] {"col2"}, 
true)),
-                                        false))
-                .satisfies(anyCauseMatches("Cannot change nullability of 
primary key"));
+        if (!catalog.supportsListObjectsPaged() || 
!catalog.supportsListByPattern()) {
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(null, null, null, 
null));
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(databaseName, null, 
null, null));
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(null, null, 100, 
null));
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(databaseName, "abc", 
null, null));
+            Assertions.assertThrows(
+                    UnsupportedOperationException.class,
+                    () -> catalog.listViewsPagedGlobally(databaseName, "abc", 
null, "view"));
+        }
     }
 
     @Test
-    public void testAlterTableUpdateComment() throws Exception {
-        catalog.createDatabase("test_db", false);
-
-        Identifier identifier = Identifier.create("test_db", "test_table");
-        catalog.createTable(
-                identifier,
-                new Schema(
-                        Lists.newArrayList(
-                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
-                                new DataField(1, "col2", DataTypes.STRING(), 
"field2")),
-                        Collections.emptyList(),
-                        Collections.emptyList(),
-                        Maps.newHashMap(),
-                        "comment"),
-                false);
+    public void testFormatTable() throws Exception {
+        if (!supportsFormatTable()) {
+            return;
+        }
 
-        catalog.alterTable(
-                identifier, Lists.newArrayList(SchemaChange.updateComment("new 
comment")), false);
-
-        Table table = catalog.getTable(identifier);
-        assertThat(table.comment().isPresent() && 
table.comment().get().equals("new comment"))
-                .isTrue();
-
-        // drop comment
-        catalog.alterTable(identifier, 
Lists.newArrayList(SchemaChange.updateComment(null)), false);
-
-        table = catalog.getTable(identifier);
-        assertThat(table.comment().isPresent()).isFalse();
-    }
-
-    @Test
-    public void testView() throws Exception {
-        if (!supportsView()) {
-            return;
-        }
-        Identifier identifier = new Identifier("view_db", "my_view");
-        View view = createView(identifier);
-
-        assertThatThrownBy(() -> catalog.createView(identifier, view, false))
-                .isInstanceOf(Catalog.DatabaseNotExistException.class);
-
-        assertThatThrownBy(() -> 
catalog.listViews(identifier.getDatabaseName()))
-                .isInstanceOf(Catalog.DatabaseNotExistException.class);
-
-        catalog.createDatabase(identifier.getDatabaseName(), false);
-
-        assertThatThrownBy(() -> catalog.getView(identifier))
-                .isInstanceOf(Catalog.ViewNotExistException.class);
-
-        catalog.createView(identifier, view, false);
-
-        View catalogView = catalog.getView(identifier);
-        assertThat(catalogView.fullName()).isEqualTo(view.fullName());
-        assertThat(catalogView.rowType()).isEqualTo(view.rowType());
-        assertThat(catalogView.query()).isEqualTo(view.query());
-        assertThat(catalogView.dialects()).isEqualTo(view.dialects());
-        assertThat(catalogView.comment()).isEqualTo(view.comment());
-        assertThat(catalogView.options()).containsAllEntriesOf(view.options());
-
-        List<String> views = catalog.listViews(identifier.getDatabaseName());
-        assertThat(views).containsOnly(identifier.getObjectName());
-
-        catalog.createView(identifier, view, true);
-        assertThatThrownBy(() -> catalog.createView(identifier, view, false))
-                .isInstanceOf(Catalog.ViewAlreadyExistException.class);
-
-        Identifier newIdentifier = new Identifier("view_db", "new_view");
-        catalog.renameView(new Identifier("view_db", "unknown"), 
newIdentifier, true);
-        assertThatThrownBy(
-                        () ->
-                                catalog.renameView(
-                                        new Identifier("view_db", "unknown"), 
newIdentifier, false))
-                .isInstanceOf(Catalog.ViewNotExistException.class);
-        catalog.renameView(identifier, newIdentifier, false);
-
-        catalog.dropView(newIdentifier, true);
-        assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
-                .isInstanceOf(Catalog.ViewNotExistException.class);
-    }
-
-    @Test
-    public void testListViewsPaged() throws Exception {
-        if (!supportsView()) {
-            return;
-        }
-
-        // List views returns an empty list when there are no views in the 
database
-        String databaseName = "views_paged_db";
-        catalog.createDatabase(databaseName, false);
-        PagedList<String> pagedViews = catalog.listViewsPaged(databaseName, 
null, null, null);
-        assertThat(pagedViews.getElements()).isEmpty();
-        assertNull(pagedViews.getNextPageToken());
-
-        // List views paged returns a list with the names of all views in the 
database in all
-        // catalogs except RestCatalog
-        // even if the maxResults or pageToken is not null
-        View view = buildView(databaseName);
-        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
-        for (String viewName : viewNames) {
-            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
-        }
-
-        pagedViews = catalog.listViewsPaged(databaseName, null, null, null);
-        assertPagedViews(pagedViews, viewNames);
-
-        int maxResults = 2;
-        pagedViews = catalog.listViewsPaged(databaseName, maxResults, null, 
null);
-        assertPagedViews(pagedViews, viewNames);
-
-        String pageToken = "view1";
-        pagedViews = catalog.listViewsPaged(databaseName, maxResults, 
pageToken, null);
-        assertPagedViews(pagedViews, viewNames);
-
-        maxResults = 8;
-        pagedViews = catalog.listViewsPaged(databaseName, maxResults, null, 
null);
-        assertPagedViews(pagedViews, viewNames);
-
-        pagedViews = catalog.listViewsPaged(databaseName, maxResults, 
pageToken, null);
-        assertPagedViews(pagedViews, viewNames);
-
-        // List views throws DatabaseNotExistException when the database does 
not exist
-        final int finalMaxResults = maxResults;
-        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.listViewsPaged(
-                                        "non_existing_db", finalMaxResults, 
pageToken, null));
-
-        assertThatExceptionOfType(UnsupportedOperationException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.listViewsPaged(
-                                        databaseName, finalMaxResults, 
pageToken, "view%"));
-    }
-
-    @Test
-    public void testListViewDetailsPaged() throws Exception {
-        if (!supportsView()) {
-            return;
-        }
-
-        // List views returns an empty list when there are no views in the 
database
-        String databaseName = "view_details_paged_db";
-        catalog.createDatabase(databaseName, false);
-        PagedList<View> pagedViewDetailsPaged =
-                catalog.listViewDetailsPaged(databaseName, null, null, null);
-        assertThat(pagedViewDetailsPaged.getElements()).isEmpty();
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        // List view details paged returns a list with all view in the 
database in all catalogs
-        // except RestCatalog
-        // even if the maxResults or pageToken is not null
-        View view = buildView(databaseName);
-        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
-        for (String viewName : viewNames) {
-            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
-        }
-
-        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
null, null, null);
-        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        int maxResults = 2;
-        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
maxResults, null, null);
-        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        String pageToken = "view1";
-        pagedViewDetailsPaged =
-                catalog.listViewDetailsPaged(databaseName, maxResults, 
pageToken, null);
-        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        maxResults = 8;
-        pagedViewDetailsPaged = catalog.listViewDetailsPaged(databaseName, 
maxResults, null, null);
-        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        pagedViewDetailsPaged =
-                catalog.listViewDetailsPaged(databaseName, maxResults, 
pageToken, null);
-        assertPagedViewDetails(pagedViewDetailsPaged, view, viewNames.length, 
viewNames);
-        assertNull(pagedViewDetailsPaged.getNextPageToken());
-
-        // List view details throws DatabaseNotExistException when the 
database does not exist
-        final int finalMaxResults = maxResults;
-        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.listViewDetailsPaged(
-                                        "non_existing_db", finalMaxResults, 
pageToken, null));
-    }
-
-    @Test
-    public void testListViewsPagedGlobally() throws Exception {
-        if (!supportsView()) {
-            return;
-        }
-
-        // List view paged globally throws UnsupportedOperationException if 
current catalog does not
-        // supportsListObjectsPaged or odes not supportsListByPattern
-        String databaseName = "list_views_paged_globally_db";
-        catalog.createDatabase(databaseName, false);
-        if (!catalog.supportsListObjectsPaged() || 
!catalog.supportsListByPattern()) {
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(databaseName, null, 
null, null));
-        }
-
-        View view = buildView(databaseName);
-        String[] viewNames = {"view1", "view2", "view3", "abd", "def", "opr"};
-        for (String viewName : viewNames) {
-            catalog.createView(Identifier.create(databaseName, viewName), 
view, false);
-        }
-
-        if (!catalog.supportsListObjectsPaged() || 
!catalog.supportsListByPattern()) {
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(null, null, null, 
null));
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(databaseName, null, 
null, null));
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(null, null, 100, 
null));
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(databaseName, "abc", 
null, null));
-            Assertions.assertThrows(
-                    UnsupportedOperationException.class,
-                    () -> catalog.listViewsPagedGlobally(databaseName, "abc", 
null, "view"));
-        }
-    }
-
-    @Test
-    public void testFormatTable() throws Exception {
-        if (!supportsFormatTable()) {
-            return;
-        }
-
-        Identifier identifier = new Identifier("format_db", "my_format");
-        catalog.createDatabase(identifier.getDatabaseName(), false);
+        Identifier identifier = new Identifier("format_db", "my_format");
+        catalog.createDatabase(identifier.getDatabaseName(), false);
 
         // create table
         Schema schema =
@@ -1321,6 +924,7 @@ public abstract class CatalogTestBase {
                         .column("int", DataTypes.INT())
                         .options(getFormatTableOptions())
                         .option("file.format", "csv")
+                        .option("remove-key", "value")
                         .build();
         catalog.createTable(identifier, schema, false);
         assertThat(catalog.listTables(identifier.getDatabaseName()))
@@ -1328,9 +932,13 @@ public abstract class CatalogTestBase {
         
assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class);
 
         // alter table
-        SchemaChange schemaChange = SchemaChange.addColumn("new_col", 
DataTypes.STRING());
-        assertThatThrownBy(() -> catalog.alterTable(identifier, schemaChange, 
false))
-                .hasMessageContaining("Only data table support alter table.");
+        if (supportsAlterFormatTable()) {
+            baseAlterTable(getFormatTableOptions());
+        } else {
+            SchemaChange schemaChange = SchemaChange.addColumn("new_col", 
DataTypes.STRING());
+            assertThatThrownBy(() -> catalog.alterTable(identifier, 
schemaChange, false))
+                    .hasMessageContaining("Only data table support alter 
table.");
+        }
 
         // drop table
         catalog.dropTable(identifier, false);
@@ -1513,6 +1121,10 @@ public abstract class CatalogTestBase {
         return false;
     }
 
+    protected boolean supportsAlterFormatTable() {
+        return false;
+    }
+
     protected boolean supportsView() {
         return false;
     }
@@ -1700,4 +1312,359 @@ public abstract class CatalogTestBase {
         }
         return new ViewImpl(identifier, rowType.getFields(), query, dialects, 
comment, options);
     }
+
+    private void baseAlterTable(Map<String, String> initOptions) throws 
Exception {
+        catalog.createDatabase("test_db", true);
+
+        Identifier identifier = Identifier.create("test_db", "test_table");
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(new DataField(0, "col1", 
DataTypes.STRING(), "field1")),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        initOptions,
+                        "comment"),
+                false);
+
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(
+                        SchemaChange.addColumn("col2", DataTypes.DATE()),
+                        SchemaChange.addColumn("col3", DataTypes.STRING(), 
"col3 field")),
+                false);
+        Table table = catalog.getTable(identifier);
+        assertThat(table.rowType().getFields()).hasSize(3);
+        int index = table.rowType().getFieldIndex("col2");
+        int index2 = table.rowType().getFieldIndex("col3");
+        assertThat(index).isEqualTo(1);
+        assertThat(index2).isEqualTo(2);
+        
assertThat(table.rowType().getTypeAt(index)).isEqualTo(DataTypes.DATE());
+        
assertThat(table.rowType().getTypeAt(index2)).isEqualTo(DataTypes.STRING());
+        
assertThat(table.rowType().getFields().get(2).description()).isEqualTo("col3 
field");
+
+        // Alter table throws Exception when table is system table
+        assertThatExceptionOfType(IllegalArgumentException.class)
+                .isThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        Identifier.create("test_db", 
"$system_table"),
+                                        Lists.newArrayList(
+                                                SchemaChange.addColumn("col2", 
DataTypes.DATE())),
+                                        false))
+                .withMessage(
+                        "Cannot 'alterTable' for system table 
'Identifier{database='test_db', object='$system_table'}', please use data 
table.");
+
+        // Alter table throws TableNotExistException when table does not exist
+        assertThatExceptionOfType(Catalog.TableNotExistException.class)
+                .isThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        Identifier.create("test_db", 
"non_existing_table"),
+                                        Lists.newArrayList(
+                                                SchemaChange.addColumn("col3", 
DataTypes.INT())),
+                                        false))
+                .withMessage("Table test_db.non_existing_table does not 
exist.");
+
+        // Alter table adds a column throws ColumnAlreadyExistException when 
column already exists
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                SchemaChange.addColumn("col1", 
DataTypes.INT())),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnAlreadyExistException.class,
+                                "Column col1 already exists in the 
test_db.test_table table."));
+
+        // conflict options
+        if (Options.fromMap(table.options()).get(TYPE) == 
TableType.MATERIALIZED_TABLE
+                || Options.fromMap(table.options()).get(TYPE) == 
TableType.TABLE) {
+            assertThatThrownBy(
+                            () ->
+                                    catalog.alterTable(
+                                            identifier,
+                                            Lists.newArrayList(
+                                                    SchemaChange.setOption(
+                                                            
"changelog-producer", "input")),
+                                            false))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining(
+                            "Can not set changelog-producer on table without 
primary keys");
+        }
+
+        catalog.alterTable(
+                identifier, Lists.newArrayList(SchemaChange.updateComment("new 
comment")), false);
+
+        table = catalog.getTable(identifier);
+        assertThat(table.comment().isPresent() && 
table.comment().get().equals("new comment"))
+                .isTrue();
+
+        // Alter table renames a column in an existing table;
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(SchemaChange.renameColumn("col1", 
"new_col1")),
+                false);
+        table = catalog.getTable(identifier);
+        assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
+        assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);
+
+        // Alter table renames a new column throws ColumnAlreadyExistException 
when column already
+        // exists
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                
SchemaChange.renameColumn("col2", "new_col1")),
+                                        false))
+                .isInstanceOf(Catalog.ColumnAlreadyExistException.class);
+
+        // Alter table renames a column throws ColumnNotExistException when 
column does not exist
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                SchemaChange.renameColumn(
+                                                        "non_existing_col", 
"new_col2")),
+                                        false))
+                .isInstanceOf(Catalog.ColumnNotExistException.class);
+        catalog.dropTable(identifier, true);
+
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.STRING()),
+                                new DataField(1, "col2", DataTypes.STRING())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Maps.newHashMap(),
+                        ""),
+                false);
+        catalog.alterTable(identifier, 
Lists.newArrayList(SchemaChange.dropColumn("col1")), false);
+        table = catalog.getTable(identifier);
+
+        assertThat(table.rowType().getFields()).hasSize(1);
+        assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
+
+        // Alter table drop all fields throws Exception
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        
Lists.newArrayList(SchemaChange.dropColumn("col2")),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalArgumentException.class, "Cannot drop 
all fields in table"));
+
+        // Alter table drop a column throws ColumnNotExistException when 
column does not exist
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                
SchemaChange.dropColumn("non_existing_col")),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+
+        // drop comment
+        catalog.alterTable(identifier, 
Lists.newArrayList(SchemaChange.updateComment(null)), false);
+
+        table = catalog.getTable(identifier);
+        assertThat(table.comment().isPresent()).isFalse();
+        catalog.dropTable(identifier, false);
+
+        // Alter table update a column type in an existing table
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "dt", DataTypes.STRING()),
+                                new DataField(1, "col1", DataTypes.BIGINT(), 
"col1 field")),
+                        Lists.newArrayList("dt"),
+                        Collections.emptyList(),
+                        initOptions,
+                        ""),
+                false);
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(SchemaChange.updateColumnType("col1", 
DataTypes.DOUBLE())),
+                false);
+        table = catalog.getTable(identifier);
+
+        assertThat(table.rowType().getFieldIndex("col1")).isEqualTo(1);
+        assertThat(table.rowType().getTypeAt(1)).isEqualTo(DataTypes.DOUBLE());
+        
assertThat(table.rowType().getFields().get(1).description()).isEqualTo("col1 
field");
+
+        // Alter table update a column type throws Exception when column data 
type does not support
+        // cast
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                SchemaChange.updateColumnType(
+                                                        "col1", 
DataTypes.DATE())),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                IllegalStateException.class,
+                                "Column type col1[DOUBLE] cannot be converted 
to DATE without loosing information."));
+
+        // Alter table update a column type throws ColumnNotExistException 
when column does not
+        // exist
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                SchemaChange.updateColumnType(
+                                                        "non_existing_col", 
DataTypes.INT())),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+        // Alter table update a column type throws Exception when column is 
partition columns
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                SchemaChange.updateColumnType(
+                                                        "dt", 
DataTypes.DATE())),
+                                        false))
+                .satisfies(anyCauseMatches("Cannot update partition column: 
[dt]"));
+        catalog.dropTable(identifier, false);
+
+        // Alter table update a column comment in an existing table
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
+                                new DataField(1, "col2", DataTypes.STRING(), 
"field2"),
+                                new DataField(
+                                        2,
+                                        "col3",
+                                        DataTypes.ROW(
+                                                new DataField(4, "f1", 
DataTypes.STRING(), "f1"),
+                                                new DataField(5, "f2", 
DataTypes.STRING(), "f2"),
+                                                new DataField(6, "f3", 
DataTypes.STRING(), "f3")),
+                                        "field3")),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        initOptions,
+                        ""),
+                false);
+
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(SchemaChange.updateColumnComment("col2", 
"col2 field")),
+                false);
+
+        // Update nested column
+        String[] fields = new String[] {"col3", "f1"};
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(SchemaChange.updateColumnComment(fields, 
"col3 f1 field")),
+                false);
+
+        table = catalog.getTable(identifier);
+        
assertThat(table.rowType().getFields().get(1).description()).isEqualTo("col2 
field");
+        RowType rowType = (RowType) table.rowType().getFields().get(2).type();
+        assertThat(rowType.getFields().get(0).description()).isEqualTo("col3 
f1 field");
+
+        // Alter table update a column comment throws Exception when column 
does not exist
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                
SchemaChange.updateColumnComment(
+                                                        new String[] 
{"non_existing_col"}, "")),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+        catalog.dropTable(identifier, false);
+
+        // Alter table update a column nullability in an existing table
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col1", DataTypes.STRING(), 
"field1"),
+                                new DataField(1, "col2", DataTypes.STRING(), 
"field2"),
+                                new DataField(
+                                        2,
+                                        "col3",
+                                        DataTypes.ROW(
+                                                new DataField(4, "f1", 
DataTypes.STRING(), "f1"),
+                                                new DataField(5, "f2", 
DataTypes.STRING(), "f2"),
+                                                new DataField(6, "f3", 
DataTypes.STRING(), "f3")),
+                                        "field3")),
+                        Lists.newArrayList("col1"),
+                        Lists.newArrayList("col1", "col2"),
+                        Maps.newHashMap(),
+                        ""),
+                false);
+
+        catalog.alterTable(
+                identifier,
+                Lists.newArrayList(
+                        SchemaChange.setOption(
+                                
CoreOptions.DISABLE_ALTER_COLUMN_NULL_TO_NOT_NULL.key(), "false")),
+                false);
+
+        catalog.alterTable(
+                identifier,
+                
Lists.newArrayList(SchemaChange.updateColumnNullability("col1", false)),
+                false);
+
+        // Update nested column
+        fields = new String[] {"col3", "f1"};
+        catalog.alterTable(
+                identifier,
+                
Lists.newArrayList(SchemaChange.updateColumnNullability(fields, false)),
+                false);
+
+        table = catalog.getTable(identifier);
+        
assertThat(table.rowType().getFields().get(0).type().isNullable()).isEqualTo(false);
+
+        // Alter table update a column nullability throws Exception when 
column does not exist
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                
SchemaChange.updateColumnNullability(
+                                                        new String[] 
{"non_existing_col"}, false)),
+                                        false))
+                .satisfies(
+                        anyCauseMatches(
+                                Catalog.ColumnNotExistException.class,
+                                "Column non_existing_col does not exist in the 
test_db.test_table table."));
+
+        // Alter table update a column nullability throws Exception when 
column is pk columns
+        assertThatThrownBy(
+                        () ->
+                                catalog.alterTable(
+                                        identifier,
+                                        Lists.newArrayList(
+                                                
SchemaChange.updateColumnNullability(
+                                                        new String[] {"col2"}, 
true)),
+                                        false))
+                .satisfies(anyCauseMatches("Cannot change nullability of 
primary key"));
+        catalog.dropTable(identifier, false);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index c46522140a..8dbefb3d4e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -85,6 +85,7 @@ import 
org.apache.paimon.rest.responses.ListViewsGloballyResponse;
 import org.apache.paimon.rest.responses.ListViewsResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
@@ -93,6 +94,7 @@ import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.tag.Tag;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.view.View;
@@ -1963,11 +1965,25 @@ public class RESTCatalogServer {
                     Catalog.ColumnNotExistException {
         if (tableMetadataStore.containsKey(identifier.getFullName())) {
             TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
-            TableSchema schema = tableMetadata.schema();
-            if (isFormatTable(schema.toSchema())) {
-                throw new UnsupportedOperationException("Only data table 
support alter table.");
-            }
             try {
+                TableSchema schema = tableMetadata.schema();
+                if (isFormatTable(schema.toSchema())) {
+                    TableSchema newSchema =
+                            SchemaManager.generateTableSchema(
+                                    schema,
+                                    changes,
+                                    new LazyField<>(() -> false),
+                                    new LazyField<>(() -> identifier));
+                    TableMetadata newTableMetadata =
+                            createTableMetadata(
+                                    identifier,
+                                    newSchema.id(),
+                                    newSchema.toSchema(),
+                                    tableMetadata.uuid(),
+                                    tableMetadata.isExternal());
+                    tableMetadataStore.put(identifier.getFullName(), 
newTableMetadata);
+                    return;
+                }
                 catalog.alterTable(identifier, changes, false);
                 FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
                 TableSchema newSchema = table.schema();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index c476cbc78b..6db20f6bfd 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -2009,6 +2009,11 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         return true;
     }
 
+    @Override
+    protected boolean supportsAlterFormatTable() {
+        return true;
+    }
+
     @Override
     protected boolean supportPartitions() {
         return true;

Reply via email to