This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 36ed2c37 [FLINK-27847] Support rename/drop column in SchemaManager
36ed2c37 is described below

commit 36ed2c37201cabea3779dcb95b9a425f03a80d8e
Author: shammon <zjur...@gmail.com>
AuthorDate: Fri Dec 2 13:40:20 2022 +0800

    [FLINK-27847] Support rename/drop column in SchemaManager
    
    This closes #347
---
 .../table/store/file/schema/SchemaChange.java      |  77 ++++
 .../table/store/file/schema/SchemaManager.java     |  55 ++-
 .../table/store/table/SchemaEvolutionTest.java     |  98 ++++
 .../flink/table/store/spark/SparkCatalog.java      |  10 +
 .../table/store/spark/SimpleTableTestHelper.java   |  15 +-
 .../flink/table/store/spark/SparkReadITCase.java   | 336 +-------------
 .../flink/table/store/spark/SparkReadTestBase.java | 230 ++++++++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 511 +++++++++++++++++++++
 8 files changed, 988 insertions(+), 344 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
index 8d8246bf..ebbb87fd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java
@@ -45,6 +45,14 @@ public interface SchemaChange {
         return new AddColumn(fieldName, logicalType, isNullable, comment);
     }
 
+    static SchemaChange renameColumn(String fieldName, String newName) {
+        return new RenameColumn(fieldName, newName);
+    }
+
+    static SchemaChange dropColumn(String fieldName) {
+        return new DropColumn(fieldName);
+    }
+
     static SchemaChange updateColumnType(String fieldName, LogicalType 
newLogicalType) {
         return new UpdateColumnType(fieldName, newLogicalType);
     }
@@ -178,6 +186,75 @@ public interface SchemaChange {
         }
     }
 
+    /** A SchemaChange to rename a field. */
+    final class RenameColumn implements SchemaChange {
+        private final String fieldName;
+        private final String newName;
+
+        private RenameColumn(String fieldName, String newName) {
+            this.fieldName = fieldName;
+            this.newName = newName;
+        }
+
+        public String fieldName() {
+            return fieldName;
+        }
+
+        public String newName() {
+            return newName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            RenameColumn that = (RenameColumn) o;
+            return Objects.equals(fieldName, that.fieldName)
+                    && Objects.equals(newName, that.newName);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(newName);
+            result = 31 * result + Objects.hashCode(fieldName);
+            return result;
+        }
+    }
+
+    /** A SchemaChange to drop a field. */
+    final class DropColumn implements SchemaChange {
+        private final String fieldName;
+
+        private DropColumn(String fieldName) {
+            this.fieldName = fieldName;
+        }
+
+        public String fieldName() {
+            return fieldName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DropColumn that = (DropColumn) o;
+            return Objects.equals(fieldName, that.fieldName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(fieldName);
+        }
+    }
+
     /** A SchemaChange to update the field type. */
     final class UpdateColumnType implements SchemaChange {
         private final String fieldName;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
index dc6fa535..c6fa60fe 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java
@@ -24,7 +24,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn;
+import org.apache.flink.table.store.file.schema.SchemaChange.DropColumn;
 import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption;
+import org.apache.flink.table.store.file.schema.SchemaChange.RenameColumn;
 import org.apache.flink.table.store.file.schema.SchemaChange.SetOption;
 import 
org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment;
 import 
org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability;
@@ -37,8 +39,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.commons.lang3.StringUtils;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -205,11 +205,7 @@ public class SchemaManager implements Serializable {
                     newOptions.remove(removeOption.key());
                 } else if (change instanceof AddColumn) {
                     AddColumn addColumn = (AddColumn) change;
-                    if (newFields.stream()
-                            .anyMatch(
-                                    f ->
-                                            StringUtils.equalsIgnoreCase(
-                                                    f.name(), 
addColumn.fieldName()))) {
+                    if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
                         throw new IllegalArgumentException(
                                 String.format(
                                         "The column [%s] exists in the 
table[%s].",
@@ -221,6 +217,39 @@ public class SchemaManager implements Serializable {
                     newFields.add(
                             new DataField(
                                     id, addColumn.fieldName(), dataType, 
addColumn.description()));
+                } else if (change instanceof RenameColumn) {
+                    RenameColumn rename = (RenameColumn) change;
+                    validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
+                    if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] exists in the 
table[%s].",
+                                        rename.newName(), tableRoot));
+                    }
+
+                    updateNestedColumn(
+                            newFields,
+                            new String[] {rename.fieldName()},
+                            0,
+                            (field) ->
+                                    new DataField(
+                                            field.id(),
+                                            rename.newName(),
+                                            field.type(),
+                                            field.description()));
+                } else if (change instanceof DropColumn) {
+                    DropColumn drop = (DropColumn) change;
+                    validateNotPrimaryAndPartitionKey(schema, 
drop.fieldName());
+                    if (!newFields.removeIf(
+                            f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "The column [%s] doesn't exist in the 
table[%s].",
+                                        drop.fieldName(), tableRoot));
+                    }
+                    if (newFields.isEmpty()) {
+                        throw new IllegalArgumentException("Cannot drop all 
fields in table");
+                    }
                 } else if (change instanceof UpdateColumnType) {
                     UpdateColumnType update = (UpdateColumnType) change;
                     updateColumn(
@@ -300,6 +329,18 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    private void validateNotPrimaryAndPartitionKey(TableSchema schema, String 
fieldName) {
+        /// TODO support partition and primary keys schema evolution
+        if (schema.partitionKeys().contains(fieldName)) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot drop/rename partition key[%s]", 
fieldName));
+        }
+        if (schema.primaryKeys().contains(fieldName)) {
+            throw new UnsupportedOperationException(
+                    String.format("Cannot drop/rename primary key[%s]", 
fieldName));
+        }
+    }
+
     private void updateNestedColumn(
             List<DataField> newFields,
             String[] updateFieldNames,
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
index 590aff22..4525b218 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java
@@ -46,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -173,6 +174,103 @@ public class SchemaEvolutionTest {
                                 "f0", new BigIntType(), new IntType()));
     }
 
+    @Test
+    public void testRenameField() throws Exception {
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(new IntType(), new BigIntType()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", 
"f1");
+
+        // Rename "f0" to "f01", "f1" to "f0", "f01" to "f1"
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.renameColumn("f0", 
"f01")));
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.renameColumn("f1", 
"f0")));
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f01", 
"f0");
+        schemaManager.commitChanges(
+                Collections.singletonList(SchemaChange.renameColumn("f01", 
"f1")));
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f1", 
"f0");
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        Collections.singletonList(
+                                                
SchemaChange.renameColumn("f0", "f1"))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        String.format("The column [%s] exists in the 
table[%s].", "f1", tablePath));
+    }
+
+    @Test
+    public void testDropField() throws Exception {
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(
+                                new IntType(), new BigIntType(), new 
IntType(), new BigIntType()),
+                        Collections.singletonList("f0"),
+                        Arrays.asList("f0", "f2"),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+        assertThat(schemaManager.latest().get().fieldNames())
+                .containsExactly("f0", "f1", "f2", "f3");
+
+        
schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("f1")));
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", 
"f2", "f3");
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        
Collections.singletonList(SchemaChange.dropColumn("f0"))))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(String.format("Cannot drop/rename partition 
key[%s]", "f0"));
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        
Collections.singletonList(SchemaChange.dropColumn("f2"))))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage(String.format("Cannot drop/rename primary 
key[%s]", "f2"));
+    }
+
+    @Test
+    public void testDropAllFields() throws Exception {
+        UpdateSchema updateSchema =
+                new UpdateSchema(
+                        RowType.of(new IntType(), new BigIntType()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<>(),
+                        "");
+        schemaManager.commitNewVersion(updateSchema);
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", 
"f1");
+
+        
schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("f0")));
+        
assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f1");
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        
Collections.singletonList(SchemaChange.dropColumn("f100"))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        String.format(
+                                "The column [%s] doesn't exist in the 
table[%s].",
+                                "f100", tablePath));
+
+        assertThatThrownBy(
+                        () ->
+                                schemaManager.commitChanges(
+                                        
Collections.singletonList(SchemaChange.dropColumn("f1"))))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Cannot drop all fields in table");
+    }
+
     private List<Row> readRecords(FileStoreTable table, Predicate filter) 
throws IOException {
         RowRowConverter converter =
                 RowRowConverter.create(
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 05ad2e2e..234ddd6b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -41,7 +41,9 @@ import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.TableChange.AddColumn;
+import org.apache.spark.sql.connector.catalog.TableChange.DeleteColumn;
 import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty;
+import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn;
 import org.apache.spark.sql.connector.catalog.TableChange.SetProperty;
 import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment;
 import 
org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability;
@@ -269,6 +271,14 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
                     toFlinkType(add.dataType()),
                     add.isNullable(),
                     add.comment());
+        } else if (change instanceof RenameColumn) {
+            RenameColumn rename = (RenameColumn) change;
+            validateAlterNestedField(rename.fieldNames());
+            return SchemaChange.renameColumn(rename.fieldNames()[0], 
rename.newName());
+        } else if (change instanceof DeleteColumn) {
+            DeleteColumn delete = (DeleteColumn) change;
+            validateAlterNestedField(delete.fieldNames());
+            return SchemaChange.dropColumn(delete.fieldNames()[0]);
         } else if (change instanceof UpdateColumnType) {
             UpdateColumnType update = (UpdateColumnType) change;
             validateAlterNestedField(update.fieldNames());
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
index 353c16e7..0aa6e8b8 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java
@@ -38,11 +38,18 @@ import java.util.Map;
 /** A simple table test helper to write and commit. */
 public class SimpleTableTestHelper {
 
-    private final TableWrite writer;
-    private final TableCommit commit;
+    private TableWrite writer;
+    private TableCommit commit;
 
     private long commitIdentifier;
 
+    public SimpleTableTestHelper(Path path) throws Exception {
+        Map<String, String> options = new HashMap<>();
+        // orc is shaded, can not find shaded classes in ide
+        options.put(CoreOptions.FILE_FORMAT.key(), "avro");
+        createTable(path, options);
+    }
+
     public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
         this(path, rowType, Collections.emptyList(), Collections.emptyList());
     }
@@ -56,6 +63,10 @@ public class SimpleTableTestHelper {
         new SchemaManager(path)
                 .commitNewVersion(
                         new UpdateSchema(rowType, partitionKeys, primaryKeys, 
options, ""));
+        createTable(path, options);
+    }
+
+    private void createTable(Path path, Map<String, String> options) {
         Configuration conf = Configuration.fromMap(options);
         conf.setString("path", path.toString());
         FileStoreTable table = FileStoreTableFactory.create(conf);
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
index 03f6391c..c9c697e8 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java
@@ -21,41 +21,26 @@ package org.apache.flink.table.store.spark;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.binary.BinaryStringData;
 import org.apache.flink.table.store.file.schema.ArrayDataType;
 import org.apache.flink.table.store.file.schema.AtomicDataType;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.DataType;
-import org.apache.flink.table.store.file.schema.RowDataType;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
-import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
-import org.apache.flink.table.types.logical.BooleanType;
 import org.apache.flink.table.types.logical.DoubleType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.types.RowKind;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -68,142 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for spark reader. */
-public class SparkReadITCase {
-
-    private static File warehouse = null;
-
-    private static SparkSession spark = null;
-
-    private static Path warehousePath = null;
-
-    private static Path tablePath1;
-
-    private static Path tablePath2;
-
-    @BeforeAll
-    public static void startMetastoreAndSpark() throws Exception {
-        warehouse = Files.createTempFile("warehouse", null).toFile();
-        assertThat(warehouse.delete()).isTrue();
-        warehousePath = new Path("file:" + warehouse);
-        spark = SparkSession.builder().master("local[2]").getOrCreate();
-        spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
-        spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
-
-        // flink sink
-        tablePath1 = new Path(warehousePath, "default.db/t1");
-        SimpleTableTestHelper testHelper1 = new 
SimpleTableTestHelper(tablePath1, rowType1());
-        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
-        testHelper1.write(GenericRowData.of(3, 4L, 
StringData.fromString("2")));
-        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
-        testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, 
StringData.fromString("2")));
-        testHelper1.commit();
-
-        // a int not null
-        // b array<varchar> not null
-        // c row<row<double, array<boolean> not null> not null, bigint> not 
null
-        tablePath2 = new Path(warehousePath, "default.db/t2");
-        SimpleTableTestHelper testHelper2 = new 
SimpleTableTestHelper(tablePath2, rowType2());
-        testHelper2.write(
-                GenericRowData.of(
-                        1,
-                        new GenericArrayData(
-                                new StringData[] {
-                                    StringData.fromString("AAA"), 
StringData.fromString("BBB")
-                                }),
-                        GenericRowData.of(
-                                GenericRowData.of(1.0d, new 
GenericArrayData(new Boolean[] {null})),
-                                1L)));
-        testHelper2.write(
-                GenericRowData.of(
-                        2,
-                        new GenericArrayData(
-                                new StringData[] {
-                                    StringData.fromString("CCC"), 
StringData.fromString("DDD")
-                                }),
-                        GenericRowData.of(
-                                GenericRowData.of(null, new 
GenericArrayData(new Boolean[] {true})),
-                                null)));
-        testHelper2.commit();
-
-        testHelper2.write(
-                GenericRowData.of(
-                        3,
-                        new GenericArrayData(new StringData[] {null, null}),
-                        GenericRowData.of(
-                                GenericRowData.of(
-                                        2.0d, new GenericArrayData(new 
boolean[] {true, false})),
-                                2L)));
-
-        testHelper2.write(
-                GenericRowData.of(
-                        4,
-                        new GenericArrayData(new StringData[] {null, 
StringData.fromString("EEE")}),
-                        GenericRowData.of(
-                                GenericRowData.of(
-                                        3.0d,
-                                        new GenericArrayData(new Boolean[] 
{true, false, true})),
-                                3L)));
-        testHelper2.commit();
-    }
-
-    private static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
-        RowType rowType =
-                new RowType(
-                        Arrays.asList(
-                                new RowType.RowField("a", new IntType(false)),
-                                new RowType.RowField("b", new BigIntType()),
-                                new RowType.RowField("c", new VarCharType())));
-        return new SimpleTableTestHelper(tablePath, rowType);
-    }
-
-    private static RowType rowType1() {
-        return new RowType(
-                Arrays.asList(
-                        new RowType.RowField("a", new IntType(false)),
-                        new RowType.RowField("b", new BigIntType()),
-                        new RowType.RowField("c", new VarCharType())));
-    }
-
-    private static RowType rowType2() {
-        return new RowType(
-                Arrays.asList(
-                        new RowType.RowField("a", new IntType(false), "comment 
about a"),
-                        new RowType.RowField("b", new ArrayType(false, new 
VarCharType())),
-                        new RowType.RowField(
-                                "c",
-                                new RowType(
-                                        false,
-                                        Arrays.asList(
-                                                new RowType.RowField(
-                                                        "c1",
-                                                        new RowType(
-                                                                false,
-                                                                Arrays.asList(
-                                                                        new 
RowType.RowField(
-                                                                               
 "c11",
-                                                                               
 new DoubleType()),
-                                                                        new 
RowType.RowField(
-                                                                               
 "c12",
-                                                                               
 new ArrayType(
-                                                                               
         false,
-                                                                               
         new BooleanType()))))),
-                                                new RowType.RowField(
-                                                        "c2",
-                                                        new BigIntType(),
-                                                        "comment about c2"))),
-                                "comment about c")));
-    }
-
-    @AfterAll
-    public static void stopMetastoreAndSpark() throws IOException {
-        if (warehouse != null && warehouse.exists()) {
-            FileUtils.deleteDirectory(warehouse);
-        }
-        if (spark != null) {
-            spark.stop();
-            spark = null;
-        }
-    }
+public class SparkReadITCase extends SparkReadTestBase {
 
     @Test
     public void testNormal() {
@@ -243,98 +93,6 @@ public class SparkReadITCase {
         
innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2"));
     }
 
-    @Test
-    public void testSetAndRemoveOption() {
-        spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 
'unknown1')");
-
-        Map<String, String> options = schema1().options();
-        assertThat(options).containsEntry("xyc", "unknown1");
-
-        spark.sql("ALTER TABLE tablestore.default.t1 UNSET 
TBLPROPERTIES('xyc')");
-
-        options = schema1().options();
-        assertThat(options).doesNotContainKey("xyc");
-
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE tablestore.default.t1 SET 
TBLPROPERTIES('primary-key' = 'a')"))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Alter primary key is not supported");
-    }
-
-    @Test
-    public void testAddColumn() throws Exception {
-        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
-        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
-        testHelper1.commit();
-
-        spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d 
STRING");
-
-        Dataset<Row> table = spark.table("tablestore.default.testAddColumn");
-        List<Row> results = table.collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,2,1,null], 
[5,6,3,null]]");
-
-        results = table.select("a", "c").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
-
-        results = table.groupBy().sum("b").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[8]]");
-    }
-
-    /**
-     * In fact, the table store does not currently support alter column type. 
In this case, changing
-     * "a" type from int to bigint can run successfully because the underlying 
orc supports directly
-     * reading int to bigint. At present, we read int value from orc into 
{@link RowData} according
-     * to the underlying data schema, and then read long from {@link RowData} 
will cause failure.
-     * TODO: This case needs to be ignored first and will be completely fixed 
in
-     * https://issues.apache.org/jira/browse/FLINK-27845
-     */
-    @Disabled
-    @Test
-    public void testAlterColumnType() throws Exception {
-        Path tablePath = new Path(warehousePath, 
"default.db/testAlterColumnType");
-        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
-        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
-        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
-        testHelper1.commit();
-
-        spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER 
COLUMN a TYPE BIGINT");
-        
innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
-    }
-
-    @Test
-    public void testAlterTableColumnNullability() {
-        assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse();
-        assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse();
-        assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse();
-        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
0))).isFalse();
-        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
1))).isTrue();
-        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 0)))
-                .isTrue();
-        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 1)))
-                .isFalse();
-
-        // note: for Spark, it is illegal to change nullable column to 
non-nullable
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT 
NULL");
-        assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
-
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT 
NULL");
-        assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
-
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT 
NULL");
-        assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
-
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP 
NOT NULL");
-        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
0))).isTrue();
-
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 
DROP NOT NULL");
-        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 1)))
-                .isTrue();
-    }
-
     @Test
     public void testDefaultNamespace() {
         spark.sql("USE tablestore");
@@ -342,63 +100,6 @@ public class SparkReadITCase {
                 .isEqualTo("[[tablestore,default]]");
     }
 
-    @Test
-    public void testAlterPrimaryKeyNullability() {
-        spark.sql("USE tablestore");
-        spark.sql(
-                "CREATE TABLE default.testAlterPkNullability (\n"
-                        + "a BIGINT,\n"
-                        + "b STRING) USING tablestore\n"
-                        + "COMMENT 'table comment'\n"
-                        + "TBLPROPERTIES ('primary-key' = 'a')");
-        assertThatThrownBy(
-                        () ->
-                                spark.sql(
-                                        "ALTER TABLE 
default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
-                .getRootCause()
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessageContaining("Cannot change nullability of primary 
key");
-    }
-
-    @Test
-    public void testAlterTableColumnComment() {
-        assertThat(getField(schema1(), 0).description()).isNull();
-
-        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a 
new comment'");
-        assertThat(getField(schema1(), 0).description()).isEqualTo("a new 
comment");
-
-        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 
'yet another comment'");
-        assertThat(getField(schema1(), 0).description()).isEqualTo("yet 
another comment");
-
-        assertThat(getField(schema2(), 2).description()).isEqualTo("comment 
about c");
-        assertThat(getNestedField(getField(schema2(), 2), 
0).description()).isNull();
-        assertThat(getNestedField(getField(schema2(), 2), 1).description())
-                .isEqualTo("comment about c2");
-        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
0).description())
-                .isNull();
-        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
1).description())
-                .isNull();
-
-        spark.sql(
-                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet 
another comment about c'");
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 
'a nested type'");
-        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 
'a bigint type'");
-        spark.sql(
-                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 
COMMENT 'a double type'");
-        spark.sql(
-                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 
COMMENT 'a boolean array'");
-
-        assertThat(getField(schema2(), 2).description()).isEqualTo("yet 
another comment about c");
-        assertThat(getNestedField(getField(schema2(), 2), 0).description())
-                .isEqualTo("a nested type");
-        assertThat(getNestedField(getField(schema2(), 2), 1).description())
-                .isEqualTo("a bigint type");
-        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
0).description())
-                .isEqualTo("a double type");
-        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
1).description())
-                .isEqualTo("a boolean array");
-    }
-
     @Test
     public void testCreateTableWithNullablePk() {
         spark.sql("USE tablestore");
@@ -686,25 +387,6 @@ public class SparkReadITCase {
         assertThat(new File(nsPath.toUri())).doesNotExist();
     }
 
-    private TableSchema schema1() {
-        return FileStoreTableFactory.create(tablePath1).schema();
-    }
-
-    private TableSchema schema2() {
-        return FileStoreTableFactory.create(tablePath2).schema();
-    }
-
-    private void innerTestSimpleType(Dataset<Row> dataset) {
-        List<Row> results = dataset.collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
-
-        results = dataset.select("a", "c").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
-
-        results = dataset.groupBy().sum("b").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[8]]");
-    }
-
     private void innerTestNestedType(Dataset<Row> dataset) {
         List<Row> results = dataset.collectAsList();
         assertThat(results.toString())
@@ -771,20 +453,4 @@ public class SparkReadITCase {
                 .isEqualTo(
                         "[[3,WrappedArray(true, false),2], 
[4,WrappedArray(true, false, true),3]]");
     }
-
-    private boolean fieldIsNullable(DataField field) {
-        return field.type().logicalType().isNullable();
-    }
-
-    private DataField getField(TableSchema schema, int index) {
-        return schema.fields().get(index);
-    }
-
-    private DataField getNestedField(DataField field, int index) {
-        if (field.type() instanceof RowDataType) {
-            RowDataType rowDataType = (RowDataType) field.type();
-            return rowDataType.fields().get(index);
-        }
-        throw new IllegalArgumentException();
-    }
 }
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
new file mode 100644
index 00000000..15ec0c6f
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.schema.DataField;
+import org.apache.flink.table.store.file.schema.RowDataType;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.RowKind;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base tests for spark read. */
+public abstract class SparkReadTestBase {
+
+    private static File warehouse = null;
+
+    protected static SparkSession spark = null;
+
+    protected static Path warehousePath = null;
+
+    protected static Path tablePath1;
+
+    protected static Path tablePath2;
+
+    @BeforeAll
+    public static void startMetastoreAndSpark() throws Exception {
+        warehouse = Files.createTempFile("warehouse", null).toFile();
+        assertThat(warehouse.delete()).isTrue();
+        warehousePath = new Path("file:" + warehouse);
+        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
+
+        // flink sink
+        tablePath1 = new Path(warehousePath, "default.db/t1");
+        SimpleTableTestHelper testHelper1 = new 
SimpleTableTestHelper(tablePath1, rowType1());
+        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(3, 4L, 
StringData.fromString("2")));
+        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
+        testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, 
StringData.fromString("2")));
+        testHelper1.commit();
+
+        // a int not null
+        // b array<varchar> not null
+        // c row<row<double, array<boolean> not null> not null, bigint> not 
null
+        tablePath2 = new Path(warehousePath, "default.db/t2");
+        SimpleTableTestHelper testHelper2 = new 
SimpleTableTestHelper(tablePath2, rowType2());
+        testHelper2.write(
+                GenericRowData.of(
+                        1,
+                        new GenericArrayData(
+                                new StringData[] {
+                                    StringData.fromString("AAA"), 
StringData.fromString("BBB")
+                                }),
+                        GenericRowData.of(
+                                GenericRowData.of(1.0d, new 
GenericArrayData(new Boolean[] {null})),
+                                1L)));
+        testHelper2.write(
+                GenericRowData.of(
+                        2,
+                        new GenericArrayData(
+                                new StringData[] {
+                                    StringData.fromString("CCC"), 
StringData.fromString("DDD")
+                                }),
+                        GenericRowData.of(
+                                GenericRowData.of(null, new 
GenericArrayData(new Boolean[] {true})),
+                                null)));
+        testHelper2.commit();
+
+        testHelper2.write(
+                GenericRowData.of(
+                        3,
+                        new GenericArrayData(new StringData[] {null, null}),
+                        GenericRowData.of(
+                                GenericRowData.of(
+                                        2.0d, new GenericArrayData(new 
boolean[] {true, false})),
+                                2L)));
+
+        testHelper2.write(
+                GenericRowData.of(
+                        4,
+                        new GenericArrayData(new StringData[] {null, 
StringData.fromString("EEE")}),
+                        GenericRowData.of(
+                                GenericRowData.of(
+                                        3.0d,
+                                        new GenericArrayData(new Boolean[] 
{true, false, true})),
+                                3L)));
+        testHelper2.commit();
+    }
+
+    protected static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
+        RowType rowType =
+                new RowType(
+                        Arrays.asList(
+                                new RowType.RowField("a", new IntType(false)),
+                                new RowType.RowField("b", new BigIntType()),
+                                new RowType.RowField("c", new VarCharType())));
+        return new SimpleTableTestHelper(tablePath, rowType);
+    }
+
+    protected static SimpleTableTestHelper createTestHelperWithoutDDL(Path 
tablePath)
+            throws Exception {
+        return new SimpleTableTestHelper(tablePath);
+    }
+
+    private static RowType rowType1() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("a", new IntType(false)),
+                        new RowType.RowField("b", new BigIntType()),
+                        new RowType.RowField("c", new VarCharType())));
+    }
+
+    private static RowType rowType2() {
+        return new RowType(
+                Arrays.asList(
+                        new RowType.RowField("a", new IntType(false), "comment 
about a"),
+                        new RowType.RowField("b", new ArrayType(false, new 
VarCharType())),
+                        new RowType.RowField(
+                                "c",
+                                new RowType(
+                                        false,
+                                        Arrays.asList(
+                                                new RowType.RowField(
+                                                        "c1",
+                                                        new RowType(
+                                                                false,
+                                                                Arrays.asList(
+                                                                        new 
RowType.RowField(
+                                                                               
 "c11",
+                                                                               
 new DoubleType()),
+                                                                        new 
RowType.RowField(
+                                                                               
 "c12",
+                                                                               
 new ArrayType(
+                                                                               
         false,
+                                                                               
         new BooleanType()))))),
+                                                new RowType.RowField(
+                                                        "c2",
+                                                        new BigIntType(),
+                                                        "comment about c2"))),
+                                "comment about c")));
+    }
+
+    @AfterAll
+    public static void stopMetastoreAndSpark() throws IOException {
+        if (warehouse != null && warehouse.exists()) {
+            FileUtils.deleteDirectory(warehouse);
+        }
+        if (spark != null) {
+            spark.stop();
+            spark = null;
+        }
+    }
+
+    protected void innerTestSimpleType(Dataset<Row> dataset) {
+        List<Row> results = dataset.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+
+        results = dataset.select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+        results = dataset.groupBy().sum("b").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[8]]");
+    }
+
+    protected TableSchema schema1() {
+        return FileStoreTableFactory.create(tablePath1).schema();
+    }
+
+    protected TableSchema schema2() {
+        return FileStoreTableFactory.create(tablePath2).schema();
+    }
+
+    protected boolean fieldIsNullable(DataField field) {
+        return field.type().logicalType().isNullable();
+    }
+
+    protected DataField getField(TableSchema schema, int index) {
+        return schema.fields().get(index);
+    }
+
+    protected DataField getNestedField(DataField field, int index) {
+        if (field.type() instanceof RowDataType) {
+            RowDataType rowDataType = (RowDataType) field.type();
+            return rowDataType.fields().get(index);
+        }
+        throw new IllegalArgumentException();
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
new file mode 100644
index 00000000..c23facfe
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+
+import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for schema evolution in spark. */
+public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
+
+    @Test
+    public void testSetAndRemoveOption() {
+        spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 
'unknown1')");
+
+        Map<String, String> options = schema1().options();
+        assertThat(options).containsEntry("xyc", "unknown1");
+
+        spark.sql("ALTER TABLE tablestore.default.t1 UNSET 
TBLPROPERTIES('xyc')");
+
+        options = schema1().options();
+        assertThat(options).doesNotContainKey("xyc");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE tablestore.default.t1 SET 
TBLPROPERTIES('primary-key' = 'a')"))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Alter primary key is not supported");
+    }
+
+    @Test
+    public void testAddColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testAddColumn");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
+        testHelper1.commit();
+
+        spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d 
STRING");
+
+        Dataset<Row> table = spark.table("tablestore.default.testAddColumn");
+        List<Row> results = table.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,2,1,null], 
[5,6,3,null]]");
+
+        results = table.select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+        results = table.groupBy().sum("b").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[8]]");
+    }
+
+    @Test
+    public void testRenameColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testRenameColumn");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
+        testHelper1.commit();
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+        Dataset<Row> table1 = 
spark.table("tablestore.default.testRenameColumn");
+        List<Row> results = table1.select("a", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+
+        // Rename "a" to "aa"
+        spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME 
COLUMN a to aa");
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenameColumn").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenameColumn (\n"
+                                + "  `aa` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+        Dataset<Row> table2 = 
spark.table("tablestore.default.testRenameColumn");
+        results = table2.select("aa", "c").collectAsList();
+        assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
+        assertThatThrownBy(() -> table2.select("a", "c").collectAsList())
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("cannot resolve '%s' given input 
columns", "a");
+    }
+
+    @Test
+    public void testRenamePartitionKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testRenamePartitionKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testRenamePartitionKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testRenamePartitionKey (\n"
+                                + "  `a` BIGINT,\n"
+                                + "  `b` STRING)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        String.format(
+                                "java.lang.UnsupportedOperationException: 
Cannot drop/rename partition key[%s]",
+                                "a"));
+    }
+
+    @Test
+    public void testDropSingleColumn() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testDropSingleColumn");
+        SimpleTableTestHelper testHelper = createTestHelper(tablePath);
+        testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1")));
+        testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3")));
+        testHelper.commit();
+
+        List<Row> beforeDrop =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(beforeDrop.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP 
COLUMN a");
+
+        List<Row> afterDrop =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropSingleColumn")
+                        .collectAsList();
+        assertThat(afterDrop.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropSingleColumn (\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        Dataset<Row> table = 
spark.table("tablestore.default.testDropSingleColumn");
+        List<Row> results = table.collectAsList();
+        assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");
+    }
+
+    @Test
+    public void testDropColumns() throws Exception {
+        Path tablePath = new Path(warehousePath, "default.db/testDropColumns");
+        createTestHelper(tablePath);
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropColumns").collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropColumns (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + "]]");
+
+        spark.sql("ALTER TABLE tablestore.default.testDropColumns DROP COLUMNS 
a, b");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropColumns").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo("[[CREATE TABLE testDropColumns (\n" + "  `c` 
STRING)\n" + "]]");
+    }
+
+    @Test
+    public void testDropPartitionKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testDropPartitionKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('foo' = 'bar')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropPartitionKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropPartitionKey (\n"
+                                + "  `a` BIGINT,\n"
+                                + "  `b` STRING)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testDropPartitionKey DROP COLUMN a"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        String.format(
+                                "java.lang.UnsupportedOperationException: 
Cannot drop/rename partition key[%s]",
+                                "a"));
+    }
+
+    @Test
+    public void testDropPrimaryKey() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testDropPrimaryKey (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "PARTITIONED BY (a)\n"
+                        + "TBLPROPERTIES ('primary-key' = 'a, b')");
+
+        List<Row> beforeRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.testDropPrimaryKey")
+                        .collectAsList();
+        assertThat(beforeRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE testDropPrimaryKey (\n"
+                                + "  `a` BIGINT NOT NULL,\n"
+                                + "  `b` STRING NOT NULL)\n"
+                                + "PARTITIONED BY (a)\n"
+                                + "]]");
+
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
tablestore.default.testDropPrimaryKey DROP COLUMN b"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage(
+                        String.format(
+                                "java.lang.UnsupportedOperationException: 
Cannot drop/rename primary key[%s]",
+                                "b"));
+    }
+
+    /**
+     * In fact, the table store does not currently support alter column type. 
In this case, changing
+     * "a" type from int to bigint can run successfully because the underlying 
orc supports directly
+     * reading int to bigint. At present, we read int value from orc into 
{@link RowData} according
+     * to the underlying data schema, and then read long from {@link RowData} 
will cause failure.
+     * TODO: This case needs to be ignored first and will be completely fixed 
in
+     * https://issues.apache.org/jira/browse/FLINK-27845
+     */
+    @Disabled
+    @Test
+    public void testAlterColumnType() throws Exception {
+        Path tablePath = new Path(warehousePath, 
"default.db/testAlterColumnType");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("1")));
+        testHelper1.write(GenericRowData.of(5, 6L, 
StringData.fromString("3")));
+        testHelper1.commit();
+
+        spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER 
COLUMN a TYPE BIGINT");
+        
innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType"));
+    }
+
+    @Test
+    public void testAlterTableColumnNullability() {
+        assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse();
+        assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse();
+        assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse();
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
0))).isFalse();
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
1))).isTrue();
+        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 0)))
+                .isTrue();
+        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 1)))
+                .isFalse();
+
+        // note: for Spark, it is illegal to change nullable column to 
non-nullable
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT 
NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue();
+
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT 
NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue();
+
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT 
NULL");
+        assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue();
+
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP 
NOT NULL");
+        assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 
0))).isTrue();
+
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 
DROP NOT NULL");
+        
assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 
2), 0), 1)))
+                .isTrue();
+    }
+
+    @Test
+    public void testAlterPrimaryKeyNullability() {
+        spark.sql("USE tablestore");
+        spark.sql(
+                "CREATE TABLE default.testAlterPkNullability (\n"
+                        + "a BIGINT,\n"
+                        + "b STRING) USING tablestore\n"
+                        + "COMMENT 'table comment'\n"
+                        + "TBLPROPERTIES ('primary-key' = 'a')");
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                        "ALTER TABLE 
default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL"))
+                .getRootCause()
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Cannot change nullability of primary 
key");
+    }
+
+    @Test
+    public void testAlterTableColumnComment() {
+        assertThat(getField(schema1(), 0).description()).isNull();
+
+        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a 
new comment'");
+        assertThat(getField(schema1(), 0).description()).isEqualTo("a new 
comment");
+
+        spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 
'yet another comment'");
+        assertThat(getField(schema1(), 0).description()).isEqualTo("yet 
another comment");
+
+        assertThat(getField(schema2(), 2).description()).isEqualTo("comment 
about c");
+        assertThat(getNestedField(getField(schema2(), 2), 
0).description()).isNull();
+        assertThat(getNestedField(getField(schema2(), 2), 1).description())
+                .isEqualTo("comment about c2");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
0).description())
+                .isNull();
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
1).description())
+                .isNull();
+
+        spark.sql(
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet 
another comment about c'");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 
'a nested type'");
+        spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 
'a bigint type'");
+        spark.sql(
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 
COMMENT 'a double type'");
+        spark.sql(
+                "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 
COMMENT 'a boolean array'");
+
+        assertThat(getField(schema2(), 2).description()).isEqualTo("yet 
another comment about c");
+        assertThat(getNestedField(getField(schema2(), 2), 0).description())
+                .isEqualTo("a nested type");
+        assertThat(getNestedField(getField(schema2(), 2), 1).description())
+                .isEqualTo("a bigint type");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
0).description())
+                .isEqualTo("a double type");
+        assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 
1).description())
+                .isEqualTo("a boolean array");
+    }
+
+    /**
+     * Test for schema evolution as followed:
+     *
+     * <ul>
+     *   <li>1. Create table with fields ["a", "b", "c"], insert 2 records
+     *   <li>2. Rename "a->aa", "c"->"a", "b"->"c", insert 2 records
+     *   <li>3. Drop fields "aa", "c", insert 2 records
+     *   <li>4. Add new fields "d", "c", "b", insert 2 records
+     * </ul>
+     *
+     * <p>Verify records in table above.
+     */
+    @Test
+    public void testSchemaEvolution() throws Exception {
+        // Create table with fields [a, b, c] and insert 2 records
+        Path tablePath = new Path(warehousePath, 
"default.db/testSchemaEvolution");
+        SimpleTableTestHelper testHelper1 = createTestHelper(tablePath);
+        testHelper1.write(GenericRowData.of(1, 2L, 
StringData.fromString("3")));
+        testHelper1.write(GenericRowData.of(4, 5L, 
StringData.fromString("6")));
+        testHelper1.commit();
+        
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
+                .isEqualTo("[[1,2,3], [4,5,6]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("a", "b", "c")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[1,2,3], [4,5,6]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .filter("a>1")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[4,5,6]]");
+
+        // Rename "a->aa", "c"->"a", "b"->"c" and the fields are [aa, c, a], 
insert 2 records
+        spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME 
COLUMN a to aa");
+        spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME 
COLUMN c to a");
+        spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME 
COLUMN b to c");
+        SimpleTableTestHelper testHelper2 = 
createTestHelperWithoutDDL(tablePath);
+        testHelper2.write(GenericRowData.of(7, 8L, 
StringData.fromString("9")));
+        testHelper2.write(GenericRowData.of(10, 11L, 
StringData.fromString("12")));
+        testHelper2.commit();
+        
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
+                .isEqualTo("[[1,2,3], [4,5,6], [7,8,9], [10,11,12]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("aa", "a", "c")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[1,3,2], [4,6,5], [7,9,8], [10,12,11]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("aa", "a", "c")
+                                .filter("aa>4")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[7,9,8], [10,12,11]]");
+
+        // Drop fields "aa", "c" and the fields are [a], insert 2 records
+        spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution DROP 
COLUMNS aa, c");
+        SimpleTableTestHelper testHelper3 = 
createTestHelperWithoutDDL(tablePath);
+        testHelper3.write(GenericRowData.of(StringData.fromString("13")));
+        testHelper3.write(GenericRowData.of(StringData.fromString("14")));
+        testHelper3.commit();
+        
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
+                .isEqualTo("[[3], [6], [9], [12], [13], [14]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("a")
+                                .filter("a>10")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[12], [13], [14]]");
+
+        // Add new fields "d", "c", "b" and the fields are [a, d, c, b], 
insert 2 records
+        spark.sql(
+                "ALTER TABLE tablestore.default.testSchemaEvolution ADD 
COLUMNS (d INT, c INT, b INT)");
+        SimpleTableTestHelper testHelper4 = 
createTestHelperWithoutDDL(tablePath);
+        testHelper4.write(GenericRowData.of(StringData.fromString("15"), 16, 
17, 18));
+        testHelper4.write(GenericRowData.of(StringData.fromString("19"), 20, 
21, 22));
+        testHelper4.commit();
+        
assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString())
+                .isEqualTo(
+                        "[[3,null,null,null], "
+                                + "[6,null,null,null], "
+                                + "[9,null,null,null], "
+                                + "[12,null,null,null], "
+                                + "[13,null,null,null], "
+                                + "[14,null,null,null], "
+                                + "[15,16,17,18], "
+                                + "[19,20,21,22]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .filter("a>10")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        "[[12,null,null,null], "
+                                + "[13,null,null,null], "
+                                + "[14,null,null,null], "
+                                + "[15,16,17,18], "
+                                + "[19,20,21,22]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("a", "b", "c", "d")
+                                .filter("a>10")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo(
+                        "[[12,null,null,null], "
+                                + "[13,null,null,null], "
+                                + "[14,null,null,null], "
+                                + "[15,18,17,16], "
+                                + "[19,22,21,20]]");
+        assertThat(
+                        spark.table("tablestore.default.testSchemaEvolution")
+                                .select("a", "b", "c", "d")
+                                .filter("a>10 and b is not null")
+                                .collectAsList()
+                                .toString())
+                .isEqualTo("[[15,18,17,16], [19,22,21,20]]");
+    }
+}

Reply via email to