This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 36ed2c37 [FLINK-27847] Support rename/drop column in SchemaManager 36ed2c37 is described below commit 36ed2c37201cabea3779dcb95b9a425f03a80d8e Author: shammon <zjur...@gmail.com> AuthorDate: Fri Dec 2 13:40:20 2022 +0800 [FLINK-27847] Support rename/drop column in SchemaManager This closes #347 --- .../table/store/file/schema/SchemaChange.java | 77 ++++ .../table/store/file/schema/SchemaManager.java | 55 ++- .../table/store/table/SchemaEvolutionTest.java | 98 ++++ .../flink/table/store/spark/SparkCatalog.java | 10 + .../table/store/spark/SimpleTableTestHelper.java | 15 +- .../flink/table/store/spark/SparkReadITCase.java | 336 +------------- .../flink/table/store/spark/SparkReadTestBase.java | 230 ++++++++++ .../store/spark/SparkSchemaEvolutionITCase.java | 511 +++++++++++++++++++++ 8 files changed, 988 insertions(+), 344 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java index 8d8246bf..ebbb87fd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaChange.java @@ -45,6 +45,14 @@ public interface SchemaChange { return new AddColumn(fieldName, logicalType, isNullable, comment); } + static SchemaChange renameColumn(String fieldName, String newName) { + return new RenameColumn(fieldName, newName); + } + + static SchemaChange dropColumn(String fieldName) { + return new DropColumn(fieldName); + } + static SchemaChange updateColumnType(String fieldName, LogicalType newLogicalType) { return new UpdateColumnType(fieldName, newLogicalType); } @@ -178,6 +186,75 @@ public interface SchemaChange { } } + /** A SchemaChange to rename a field. */ + final class RenameColumn implements SchemaChange { + private final String fieldName; + private final String newName; + + private RenameColumn(String fieldName, String newName) { + this.fieldName = fieldName; + this.newName = newName; + } + + public String fieldName() { + return fieldName; + } + + public String newName() { + return newName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RenameColumn that = (RenameColumn) o; + return Objects.equals(fieldName, that.fieldName) + && Objects.equals(newName, that.newName); + } + + @Override + public int hashCode() { + int result = Objects.hash(newName); + result = 31 * result + Objects.hashCode(fieldName); + return result; + } + } + + /** A SchemaChange to drop a field. */ + final class DropColumn implements SchemaChange { + private final String fieldName; + + private DropColumn(String fieldName) { + this.fieldName = fieldName; + } + + public String fieldName() { + return fieldName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DropColumn that = (DropColumn) o; + return Objects.equals(fieldName, that.fieldName); + } + + @Override + public int hashCode() { + return Objects.hashCode(fieldName); + } + } + /** A SchemaChange to update the field type. */ final class UpdateColumnType implements SchemaChange { private final String fieldName; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java index dc6fa535..c6fa60fe 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaManager.java @@ -24,7 +24,9 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.operation.Lock; import org.apache.flink.table.store.file.schema.SchemaChange.AddColumn; +import org.apache.flink.table.store.file.schema.SchemaChange.DropColumn; import org.apache.flink.table.store.file.schema.SchemaChange.RemoveOption; +import org.apache.flink.table.store.file.schema.SchemaChange.RenameColumn; import org.apache.flink.table.store.file.schema.SchemaChange.SetOption; import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnComment; import org.apache.flink.table.store.file.schema.SchemaChange.UpdateColumnNullability; @@ -37,8 +39,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeCasts; import org.apache.flink.util.Preconditions; -import org.apache.commons.lang3.StringUtils; - import javax.annotation.Nullable; import java.io.IOException; @@ -205,11 +205,7 @@ public class SchemaManager implements Serializable { newOptions.remove(removeOption.key()); } else if (change instanceof AddColumn) { AddColumn addColumn = (AddColumn) change; - if (newFields.stream() - .anyMatch( - f -> - StringUtils.equalsIgnoreCase( - f.name(), addColumn.fieldName()))) { + if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) { throw new IllegalArgumentException( String.format( "The column [%s] exists in the table[%s].", @@ -221,6 +217,39 @@ public class SchemaManager implements Serializable { newFields.add( new DataField( id, addColumn.fieldName(), dataType, addColumn.description())); + } else if (change instanceof RenameColumn) { + RenameColumn rename = (RenameColumn) change; + validateNotPrimaryAndPartitionKey(schema, rename.fieldName()); + if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { + throw new IllegalArgumentException( + String.format( + "The column [%s] exists in the table[%s].", + rename.newName(), tableRoot)); + } + + updateNestedColumn( + newFields, + new String[] {rename.fieldName()}, + 0, + (field) -> + new DataField( + field.id(), + rename.newName(), + field.type(), + field.description())); + } else if (change instanceof DropColumn) { + DropColumn drop = (DropColumn) change; + validateNotPrimaryAndPartitionKey(schema, drop.fieldName()); + if (!newFields.removeIf( + f -> f.name().equals(((DropColumn) change).fieldName()))) { + throw new IllegalArgumentException( + String.format( + "The column [%s] doesn't exist in the table[%s].", + drop.fieldName(), tableRoot)); + } + if (newFields.isEmpty()) { + throw new IllegalArgumentException("Cannot drop all fields in table"); + } } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; updateColumn( @@ -300,6 +329,18 @@ public class SchemaManager implements Serializable { } } + private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) { + /// TODO support partition and primary keys schema evolution + if (schema.partitionKeys().contains(fieldName)) { + throw new UnsupportedOperationException( + String.format("Cannot drop/rename partition key[%s]", fieldName)); + } + if (schema.primaryKeys().contains(fieldName)) { + throw new UnsupportedOperationException( + String.format("Cannot drop/rename primary key[%s]", fieldName)); + } + } + private void updateNestedColumn( List<DataField> newFields, String[] updateFieldNames, diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java index 590aff22..4525b218 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/SchemaEvolutionTest.java @@ -46,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -173,6 +174,103 @@ public class SchemaEvolutionTest { "f0", new BigIntType(), new IntType())); } + @Test + public void testRenameField() throws Exception { + UpdateSchema updateSchema = + new UpdateSchema( + RowType.of(new IntType(), new BigIntType()), + Collections.emptyList(), + Collections.emptyList(), + new HashMap<>(), + ""); + schemaManager.commitNewVersion(updateSchema); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f1"); + + // Rename "f0" to "f01", "f1" to "f0", "f01" to "f1" + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.renameColumn("f0", "f01"))); + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.renameColumn("f1", "f0"))); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f01", "f0"); + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.renameColumn("f01", "f1"))); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f1", "f0"); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList( + SchemaChange.renameColumn("f0", "f1")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + String.format("The column [%s] exists in the table[%s].", "f1", tablePath)); + } + + @Test + public void testDropField() throws Exception { + UpdateSchema updateSchema = + new UpdateSchema( + RowType.of( + new IntType(), new BigIntType(), new IntType(), new BigIntType()), + Collections.singletonList("f0"), + Arrays.asList("f0", "f2"), + new HashMap<>(), + ""); + schemaManager.commitNewVersion(updateSchema); + assertThat(schemaManager.latest().get().fieldNames()) + .containsExactly("f0", "f1", "f2", "f3"); + + schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("f1"))); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f2", "f3"); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("f0")))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(String.format("Cannot drop/rename partition key[%s]", "f0")); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("f2")))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(String.format("Cannot drop/rename primary key[%s]", "f2")); + } + + @Test + public void testDropAllFields() throws Exception { + UpdateSchema updateSchema = + new UpdateSchema( + RowType.of(new IntType(), new BigIntType()), + Collections.emptyList(), + Collections.emptyList(), + new HashMap<>(), + ""); + schemaManager.commitNewVersion(updateSchema); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f0", "f1"); + + schemaManager.commitChanges(Collections.singletonList(SchemaChange.dropColumn("f0"))); + assertThat(schemaManager.latest().get().fieldNames()).containsExactly("f1"); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("f100")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + String.format( + "The column [%s] doesn't exist in the table[%s].", + "f100", tablePath)); + + assertThatThrownBy( + () -> + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.dropColumn("f1")))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Cannot drop all fields in table"); + } + private List<Row> readRecords(FileStoreTable table, Predicate filter) throws IOException { RowRowConverter converter = RowRowConverter.create( diff --git a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java index 05ad2e2e..234ddd6b 100644 --- a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java +++ b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java @@ -41,7 +41,9 @@ import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.catalog.TableChange.AddColumn; +import org.apache.spark.sql.connector.catalog.TableChange.DeleteColumn; import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; +import org.apache.spark.sql.connector.catalog.TableChange.RenameColumn; import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnComment; import org.apache.spark.sql.connector.catalog.TableChange.UpdateColumnNullability; @@ -269,6 +271,14 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces { toFlinkType(add.dataType()), add.isNullable(), add.comment()); + } else if (change instanceof RenameColumn) { + RenameColumn rename = (RenameColumn) change; + validateAlterNestedField(rename.fieldNames()); + return SchemaChange.renameColumn(rename.fieldNames()[0], rename.newName()); + } else if (change instanceof DeleteColumn) { + DeleteColumn delete = (DeleteColumn) change; + validateAlterNestedField(delete.fieldNames()); + return SchemaChange.dropColumn(delete.fieldNames()[0]); } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; validateAlterNestedField(update.fieldNames()); diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java index 353c16e7..0aa6e8b8 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SimpleTableTestHelper.java @@ -38,11 +38,18 @@ import java.util.Map; /** A simple table test helper to write and commit. */ public class SimpleTableTestHelper { - private final TableWrite writer; - private final TableCommit commit; + private TableWrite writer; + private TableCommit commit; private long commitIdentifier; + public SimpleTableTestHelper(Path path) throws Exception { + Map<String, String> options = new HashMap<>(); + // orc is shaded, can not find shaded classes in ide + options.put(CoreOptions.FILE_FORMAT.key(), "avro"); + createTable(path, options); + } + public SimpleTableTestHelper(Path path, RowType rowType) throws Exception { this(path, rowType, Collections.emptyList(), Collections.emptyList()); } @@ -56,6 +63,10 @@ public class SimpleTableTestHelper { new SchemaManager(path) .commitNewVersion( new UpdateSchema(rowType, partitionKeys, primaryKeys, options, "")); + createTable(path, options); + } + + private void createTable(Path path, Map<String, String> options) { Configuration conf = Configuration.fromMap(options); conf.setString("path", path.toString()); FileStoreTable table = FileStoreTableFactory.create(conf); diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java index 03f6391c..c9c697e8 100644 --- a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadITCase.java @@ -21,41 +21,26 @@ package org.apache.flink.table.store.spark; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.binary.BinaryStringData; import org.apache.flink.table.store.file.schema.ArrayDataType; import org.apache.flink.table.store.file.schema.AtomicDataType; import org.apache.flink.table.store.file.schema.DataField; import org.apache.flink.table.store.file.schema.DataType; -import org.apache.flink.table.store.file.schema.RowDataType; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.table.FileStoreTableFactory; -import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; -import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.DoubleType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.flink.types.RowKind; -import org.apache.commons.io.FileUtils; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.File; -import java.io.IOException; -import java.nio.file.Files; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -68,142 +53,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for spark reader. */ -public class SparkReadITCase { - - private static File warehouse = null; - - private static SparkSession spark = null; - - private static Path warehousePath = null; - - private static Path tablePath1; - - private static Path tablePath2; - - @BeforeAll - public static void startMetastoreAndSpark() throws Exception { - warehouse = Files.createTempFile("warehouse", null).toFile(); - assertThat(warehouse.delete()).isTrue(); - warehousePath = new Path("file:" + warehouse); - spark = SparkSession.builder().master("local[2]").getOrCreate(); - spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName()); - spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString()); - - // flink sink - tablePath1 = new Path(warehousePath, "default.db/t1"); - SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1()); - testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); - testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2"))); - testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); - testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2"))); - testHelper1.commit(); - - // a int not null - // b array<varchar> not null - // c row<row<double, array<boolean> not null> not null, bigint> not null - tablePath2 = new Path(warehousePath, "default.db/t2"); - SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2()); - testHelper2.write( - GenericRowData.of( - 1, - new GenericArrayData( - new StringData[] { - StringData.fromString("AAA"), StringData.fromString("BBB") - }), - GenericRowData.of( - GenericRowData.of(1.0d, new GenericArrayData(new Boolean[] {null})), - 1L))); - testHelper2.write( - GenericRowData.of( - 2, - new GenericArrayData( - new StringData[] { - StringData.fromString("CCC"), StringData.fromString("DDD") - }), - GenericRowData.of( - GenericRowData.of(null, new GenericArrayData(new Boolean[] {true})), - null))); - testHelper2.commit(); - - testHelper2.write( - GenericRowData.of( - 3, - new GenericArrayData(new StringData[] {null, null}), - GenericRowData.of( - GenericRowData.of( - 2.0d, new GenericArrayData(new boolean[] {true, false})), - 2L))); - - testHelper2.write( - GenericRowData.of( - 4, - new GenericArrayData(new StringData[] {null, StringData.fromString("EEE")}), - GenericRowData.of( - GenericRowData.of( - 3.0d, - new GenericArrayData(new Boolean[] {true, false, true})), - 3L))); - testHelper2.commit(); - } - - private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception { - RowType rowType = - new RowType( - Arrays.asList( - new RowType.RowField("a", new IntType(false)), - new RowType.RowField("b", new BigIntType()), - new RowType.RowField("c", new VarCharType()))); - return new SimpleTableTestHelper(tablePath, rowType); - } - - private static RowType rowType1() { - return new RowType( - Arrays.asList( - new RowType.RowField("a", new IntType(false)), - new RowType.RowField("b", new BigIntType()), - new RowType.RowField("c", new VarCharType()))); - } - - private static RowType rowType2() { - return new RowType( - Arrays.asList( - new RowType.RowField("a", new IntType(false), "comment about a"), - new RowType.RowField("b", new ArrayType(false, new VarCharType())), - new RowType.RowField( - "c", - new RowType( - false, - Arrays.asList( - new RowType.RowField( - "c1", - new RowType( - false, - Arrays.asList( - new RowType.RowField( - "c11", - new DoubleType()), - new RowType.RowField( - "c12", - new ArrayType( - false, - new BooleanType()))))), - new RowType.RowField( - "c2", - new BigIntType(), - "comment about c2"))), - "comment about c"))); - } - - @AfterAll - public static void stopMetastoreAndSpark() throws IOException { - if (warehouse != null && warehouse.exists()) { - FileUtils.deleteDirectory(warehouse); - } - if (spark != null) { - spark.stop(); - spark = null; - } - } +public class SparkReadITCase extends SparkReadTestBase { @Test public void testNormal() { @@ -243,98 +93,6 @@ public class SparkReadITCase { innerTestNestedTypeFilterPushDown(spark.table("tablestore.default.t2")); } - @Test - public void testSetAndRemoveOption() { - spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')"); - - Map<String, String> options = schema1().options(); - assertThat(options).containsEntry("xyc", "unknown1"); - - spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')"); - - options = schema1().options(); - assertThat(options).doesNotContainKey("xyc"); - - assertThatThrownBy( - () -> - spark.sql( - "ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('primary-key' = 'a')")) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Alter primary key is not supported"); - } - - @Test - public void testAddColumn() throws Exception { - Path tablePath = new Path(warehousePath, "default.db/testAddColumn"); - SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); - testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); - testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); - testHelper1.commit(); - - spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING"); - - Dataset<Row> table = spark.table("tablestore.default.testAddColumn"); - List<Row> results = table.collectAsList(); - assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]"); - - results = table.select("a", "c").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); - - results = table.groupBy().sum("b").collectAsList(); - assertThat(results.toString()).isEqualTo("[[8]]"); - } - - /** - * In fact, the table store does not currently support alter column type. In this case, changing - * "a" type from int to bigint can run successfully because the underlying orc supports directly - * reading int to bigint. At present, we read int value from orc into {@link RowData} according - * to the underlying data schema, and then read long from {@link RowData} will cause failure. - * TODO: This case needs to be ignored first and will be completely fixed in - * https://issues.apache.org/jira/browse/FLINK-27845 - */ - @Disabled - @Test - public void testAlterColumnType() throws Exception { - Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType"); - SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); - testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); - testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); - testHelper1.commit(); - - spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT"); - innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType")); - } - - @Test - public void testAlterTableColumnNullability() { - assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse(); - assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse(); - assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse(); - assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isFalse(); - assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 1))).isTrue(); - assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 0))) - .isTrue(); - assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1))) - .isFalse(); - - // note: for Spark, it is illegal to change nullable column to non-nullable - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT NULL"); - assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue(); - - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT NULL"); - assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue(); - - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT NULL"); - assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue(); - - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP NOT NULL"); - assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue(); - - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL"); - assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1))) - .isTrue(); - } - @Test public void testDefaultNamespace() { spark.sql("USE tablestore"); @@ -342,63 +100,6 @@ public class SparkReadITCase { .isEqualTo("[[tablestore,default]]"); } - @Test - public void testAlterPrimaryKeyNullability() { - spark.sql("USE tablestore"); - spark.sql( - "CREATE TABLE default.testAlterPkNullability (\n" - + "a BIGINT,\n" - + "b STRING) USING tablestore\n" - + "COMMENT 'table comment'\n" - + "TBLPROPERTIES ('primary-key' = 'a')"); - assertThatThrownBy( - () -> - spark.sql( - "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL")) - .getRootCause() - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Cannot change nullability of primary key"); - } - - @Test - public void testAlterTableColumnComment() { - assertThat(getField(schema1(), 0).description()).isNull(); - - spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'"); - assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment"); - - spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'yet another comment'"); - assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment"); - - assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c"); - assertThat(getNestedField(getField(schema2(), 2), 0).description()).isNull(); - assertThat(getNestedField(getField(schema2(), 2), 1).description()) - .isEqualTo("comment about c2"); - assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description()) - .isNull(); - assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description()) - .isNull(); - - spark.sql( - "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'"); - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'"); - spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'"); - spark.sql( - "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'"); - spark.sql( - "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'"); - - assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c"); - assertThat(getNestedField(getField(schema2(), 2), 0).description()) - .isEqualTo("a nested type"); - assertThat(getNestedField(getField(schema2(), 2), 1).description()) - .isEqualTo("a bigint type"); - assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description()) - .isEqualTo("a double type"); - assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description()) - .isEqualTo("a boolean array"); - } - @Test public void testCreateTableWithNullablePk() { spark.sql("USE tablestore"); @@ -686,25 +387,6 @@ public class SparkReadITCase { assertThat(new File(nsPath.toUri())).doesNotExist(); } - private TableSchema schema1() { - return FileStoreTableFactory.create(tablePath1).schema(); - } - - private TableSchema schema2() { - return FileStoreTableFactory.create(tablePath2).schema(); - } - - private void innerTestSimpleType(Dataset<Row> dataset) { - List<Row> results = dataset.collectAsList(); - assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]"); - - results = dataset.select("a", "c").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); - - results = dataset.groupBy().sum("b").collectAsList(); - assertThat(results.toString()).isEqualTo("[[8]]"); - } - private void innerTestNestedType(Dataset<Row> dataset) { List<Row> results = dataset.collectAsList(); assertThat(results.toString()) @@ -771,20 +453,4 @@ public class SparkReadITCase { .isEqualTo( "[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]"); } - - private boolean fieldIsNullable(DataField field) { - return field.type().logicalType().isNullable(); - } - - private DataField getField(TableSchema schema, int index) { - return schema.fields().get(index); - } - - private DataField getNestedField(DataField field, int index) { - if (field.type() instanceof RowDataType) { - RowDataType rowDataType = (RowDataType) field.type(); - return rowDataType.fields().get(index); - } - throw new IllegalArgumentException(); - } } diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java new file mode 100644 index 00000000..15ec0c6f --- /dev/null +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.spark; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.store.file.schema.DataField; +import org.apache.flink.table.store.file.schema.RowDataType; +import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.table.store.table.FileStoreTableFactory; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.BooleanType; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.RowKind; + +import org.apache.commons.io.FileUtils; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Base tests for spark read. */ +public abstract class SparkReadTestBase { + + private static File warehouse = null; + + protected static SparkSession spark = null; + + protected static Path warehousePath = null; + + protected static Path tablePath1; + + protected static Path tablePath2; + + @BeforeAll + public static void startMetastoreAndSpark() throws Exception { + warehouse = Files.createTempFile("warehouse", null).toFile(); + assertThat(warehouse.delete()).isTrue(); + warehousePath = new Path("file:" + warehouse); + spark = SparkSession.builder().master("local[2]").getOrCreate(); + spark.conf().set("spark.sql.catalog.tablestore", SparkCatalog.class.getName()); + spark.conf().set("spark.sql.catalog.tablestore.warehouse", warehousePath.toString()); + + // flink sink + tablePath1 = new Path(warehousePath, "default.db/t1"); + SimpleTableTestHelper testHelper1 = new SimpleTableTestHelper(tablePath1, rowType1()); + testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); + testHelper1.write(GenericRowData.of(3, 4L, StringData.fromString("2"))); + testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); + testHelper1.write(GenericRowData.ofKind(RowKind.DELETE, 3, 4L, StringData.fromString("2"))); + testHelper1.commit(); + + // a int not null + // b array<varchar> not null + // c row<row<double, array<boolean> not null> not null, bigint> not null + tablePath2 = new Path(warehousePath, "default.db/t2"); + SimpleTableTestHelper testHelper2 = new SimpleTableTestHelper(tablePath2, rowType2()); + testHelper2.write( + GenericRowData.of( + 1, + new GenericArrayData( + new StringData[] { + StringData.fromString("AAA"), StringData.fromString("BBB") + }), + GenericRowData.of( + GenericRowData.of(1.0d, new GenericArrayData(new Boolean[] {null})), + 1L))); + testHelper2.write( + GenericRowData.of( + 2, + new GenericArrayData( + new StringData[] { + StringData.fromString("CCC"), StringData.fromString("DDD") + }), + GenericRowData.of( + GenericRowData.of(null, new GenericArrayData(new Boolean[] {true})), + null))); + testHelper2.commit(); + + testHelper2.write( + GenericRowData.of( + 3, + new GenericArrayData(new StringData[] {null, null}), + GenericRowData.of( + GenericRowData.of( + 2.0d, new GenericArrayData(new boolean[] {true, false})), + 2L))); + + testHelper2.write( + GenericRowData.of( + 4, + new GenericArrayData(new StringData[] {null, StringData.fromString("EEE")}), + GenericRowData.of( + GenericRowData.of( + 3.0d, + new GenericArrayData(new Boolean[] {true, false, true})), + 3L))); + testHelper2.commit(); + } + + protected static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exception { + RowType rowType = + new RowType( + Arrays.asList( + new RowType.RowField("a", new IntType(false)), + new RowType.RowField("b", new BigIntType()), + new RowType.RowField("c", new VarCharType()))); + return new SimpleTableTestHelper(tablePath, rowType); + } + + protected static SimpleTableTestHelper createTestHelperWithoutDDL(Path tablePath) + throws Exception { + return new SimpleTableTestHelper(tablePath); + } + + private static RowType rowType1() { + return new RowType( + Arrays.asList( + new RowType.RowField("a", new IntType(false)), + new RowType.RowField("b", new BigIntType()), + new RowType.RowField("c", new VarCharType()))); + } + + private static RowType rowType2() { + return new RowType( + Arrays.asList( + new RowType.RowField("a", new IntType(false), "comment about a"), + new RowType.RowField("b", new ArrayType(false, new VarCharType())), + new RowType.RowField( + "c", + new RowType( + false, + Arrays.asList( + new RowType.RowField( + "c1", + new RowType( + false, + Arrays.asList( + new RowType.RowField( + "c11", + new DoubleType()), + new RowType.RowField( + "c12", + new ArrayType( + false, + new BooleanType()))))), + new RowType.RowField( + "c2", + new BigIntType(), + "comment about c2"))), + "comment about c"))); + } + + @AfterAll + public static void stopMetastoreAndSpark() throws IOException { + if (warehouse != null && warehouse.exists()) { + FileUtils.deleteDirectory(warehouse); + } + if (spark != null) { + spark.stop(); + spark = null; + } + } + + protected void innerTestSimpleType(Dataset<Row> dataset) { + List<Row> results = dataset.collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,2,1], [5,6,3]]"); + + results = dataset.select("a", "c").collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); + + results = dataset.groupBy().sum("b").collectAsList(); + assertThat(results.toString()).isEqualTo("[[8]]"); + } + + protected TableSchema schema1() { + return FileStoreTableFactory.create(tablePath1).schema(); + } + + protected TableSchema schema2() { + return FileStoreTableFactory.create(tablePath2).schema(); + } + + protected boolean fieldIsNullable(DataField field) { + return field.type().logicalType().isNullable(); + } + + protected DataField getField(TableSchema schema, int index) { + return schema.fields().get(index); + } + + protected DataField getNestedField(DataField field, int index) { + if (field.type() instanceof RowDataType) { + RowDataType rowDataType = (RowDataType) field.type(); + return rowDataType.fields().get(index); + } + throw new IllegalArgumentException(); + } +} diff --git a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java new file mode 100644 index 00000000..c23facfe --- /dev/null +++ b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.spark; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; + +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** ITCase for schema evolution in spark. */ +public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + + @Test + public void testSetAndRemoveOption() { + spark.sql("ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('xyc' 'unknown1')"); + + Map<String, String> options = schema1().options(); + assertThat(options).containsEntry("xyc", "unknown1"); + + spark.sql("ALTER TABLE tablestore.default.t1 UNSET TBLPROPERTIES('xyc')"); + + options = schema1().options(); + assertThat(options).doesNotContainKey("xyc"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE tablestore.default.t1 SET TBLPROPERTIES('primary-key' = 'a')")) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Alter primary key is not supported"); + } + + @Test + public void testAddColumn() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testAddColumn"); + SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); + testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); + testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); + testHelper1.commit(); + + spark.sql("ALTER TABLE tablestore.default.testAddColumn ADD COLUMN d STRING"); + + Dataset<Row> table = spark.table("tablestore.default.testAddColumn"); + List<Row> results = table.collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,2,1,null], [5,6,3,null]]"); + + results = table.select("a", "c").collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); + + results = table.groupBy().sum("b").collectAsList(); + assertThat(results.toString()).isEqualTo("[[8]]"); + } + + @Test + public void testRenameColumn() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testRenameColumn"); + SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); + testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); + testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); + testHelper1.commit(); + + List<Row> beforeRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList(); + assertThat(beforeRename.toString()) + .isEqualTo( + "[[CREATE TABLE testRenameColumn (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + Dataset<Row> table1 = spark.table("tablestore.default.testRenameColumn"); + List<Row> results = table1.select("a", "c").collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); + + // Rename "a" to "aa" + spark.sql("ALTER TABLE tablestore.default.testRenameColumn RENAME COLUMN a to aa"); + List<Row> afterRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testRenameColumn").collectAsList(); + assertThat(afterRename.toString()) + .isEqualTo( + "[[CREATE TABLE testRenameColumn (\n" + + " `aa` INT NOT NULL,\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + Dataset<Row> table2 = spark.table("tablestore.default.testRenameColumn"); + results = table2.select("aa", "c").collectAsList(); + assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]"); + assertThatThrownBy(() -> table2.select("a", "c").collectAsList()) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("cannot resolve '%s' given input columns", "a"); + } + + @Test + public void testRenamePartitionKey() { + spark.sql("USE tablestore"); + spark.sql( + "CREATE TABLE default.testRenamePartitionKey (\n" + + "a BIGINT,\n" + + "b STRING) USING tablestore\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a)\n" + + "TBLPROPERTIES ('foo' = 'bar')"); + + List<Row> beforeRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testRenamePartitionKey") + .collectAsList(); + assertThat(beforeRename.toString()) + .isEqualTo( + "[[CREATE TABLE testRenamePartitionKey (\n" + + " `a` BIGINT,\n" + + " `b` STRING)\n" + + "PARTITIONED BY (a)\n" + + "]]"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE tablestore.default.testRenamePartitionKey RENAME COLUMN a to aa")) + .isInstanceOf(RuntimeException.class) + .hasMessage( + String.format( + "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]", + "a")); + } + + @Test + public void testDropSingleColumn() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testDropSingleColumn"); + SimpleTableTestHelper testHelper = createTestHelper(tablePath); + testHelper.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); + testHelper.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); + testHelper.commit(); + + List<Row> beforeDrop = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn") + .collectAsList(); + assertThat(beforeDrop.toString()) + .isEqualTo( + "[[CREATE TABLE testDropSingleColumn (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + + spark.sql("ALTER TABLE tablestore.default.testDropSingleColumn DROP COLUMN a"); + + List<Row> afterDrop = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropSingleColumn") + .collectAsList(); + assertThat(afterDrop.toString()) + .isEqualTo( + "[[CREATE TABLE testDropSingleColumn (\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + + Dataset<Row> table = spark.table("tablestore.default.testDropSingleColumn"); + List<Row> results = table.collectAsList(); + assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]"); + } + + @Test + public void testDropColumns() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testDropColumns"); + createTestHelper(tablePath); + + List<Row> beforeRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList(); + assertThat(beforeRename.toString()) + .isEqualTo( + "[[CREATE TABLE testDropColumns (\n" + + " `a` INT NOT NULL,\n" + + " `b` BIGINT,\n" + + " `c` STRING)\n" + + "]]"); + + spark.sql("ALTER TABLE tablestore.default.testDropColumns DROP COLUMNS a, b"); + + List<Row> afterRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropColumns").collectAsList(); + assertThat(afterRename.toString()) + .isEqualTo("[[CREATE TABLE testDropColumns (\n" + " `c` STRING)\n" + "]]"); + } + + @Test + public void testDropPartitionKey() { + spark.sql("USE tablestore"); + spark.sql( + "CREATE TABLE default.testDropPartitionKey (\n" + + "a BIGINT,\n" + + "b STRING) USING tablestore\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a)\n" + + "TBLPROPERTIES ('foo' = 'bar')"); + + List<Row> beforeRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropPartitionKey") + .collectAsList(); + assertThat(beforeRename.toString()) + .isEqualTo( + "[[CREATE TABLE testDropPartitionKey (\n" + + " `a` BIGINT,\n" + + " `b` STRING)\n" + + "PARTITIONED BY (a)\n" + + "]]"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE tablestore.default.testDropPartitionKey DROP COLUMN a")) + .isInstanceOf(RuntimeException.class) + .hasMessage( + String.format( + "java.lang.UnsupportedOperationException: Cannot drop/rename partition key[%s]", + "a")); + } + + @Test + public void testDropPrimaryKey() { + spark.sql("USE tablestore"); + spark.sql( + "CREATE TABLE default.testDropPrimaryKey (\n" + + "a BIGINT,\n" + + "b STRING) USING tablestore\n" + + "COMMENT 'table comment'\n" + + "PARTITIONED BY (a)\n" + + "TBLPROPERTIES ('primary-key' = 'a, b')"); + + List<Row> beforeRename = + spark.sql("SHOW CREATE TABLE tablestore.default.testDropPrimaryKey") + .collectAsList(); + assertThat(beforeRename.toString()) + .isEqualTo( + "[[CREATE TABLE testDropPrimaryKey (\n" + + " `a` BIGINT NOT NULL,\n" + + " `b` STRING NOT NULL)\n" + + "PARTITIONED BY (a)\n" + + "]]"); + + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE tablestore.default.testDropPrimaryKey DROP COLUMN b")) + .isInstanceOf(RuntimeException.class) + .hasMessage( + String.format( + "java.lang.UnsupportedOperationException: Cannot drop/rename primary key[%s]", + "b")); + } + + /** + * In fact, the table store does not currently support alter column type. In this case, changing + * "a" type from int to bigint can run successfully because the underlying orc supports directly + * reading int to bigint. At present, we read int value from orc into {@link RowData} according + * to the underlying data schema, and then read long from {@link RowData} will cause failure. + * TODO: This case needs to be ignored first and will be completely fixed in + * https://issues.apache.org/jira/browse/FLINK-27845 + */ + @Disabled + @Test + public void testAlterColumnType() throws Exception { + Path tablePath = new Path(warehousePath, "default.db/testAlterColumnType"); + SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); + testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("1"))); + testHelper1.write(GenericRowData.of(5, 6L, StringData.fromString("3"))); + testHelper1.commit(); + + spark.sql("ALTER TABLE tablestore.default.testAlterColumnType ALTER COLUMN a TYPE BIGINT"); + innerTestSimpleType(spark.table("tablestore.default.testAlterColumnType")); + } + + @Test + public void testAlterTableColumnNullability() { + assertThat(fieldIsNullable(getField(schema2(), 0))).isFalse(); + assertThat(fieldIsNullable(getField(schema2(), 1))).isFalse(); + assertThat(fieldIsNullable(getField(schema2(), 2))).isFalse(); + assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isFalse(); + assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 1))).isTrue(); + assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 0))) + .isTrue(); + assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1))) + .isFalse(); + + // note: for Spark, it is illegal to change nullable column to non-nullable + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN a DROP NOT NULL"); + assertThat(fieldIsNullable(getField(schema2(), 0))).isTrue(); + + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN b DROP NOT NULL"); + assertThat(fieldIsNullable(getField(schema2(), 1))).isTrue(); + + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c DROP NOT NULL"); + assertThat(fieldIsNullable(getField(schema2(), 2))).isTrue(); + + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 DROP NOT NULL"); + assertThat(fieldIsNullable(getNestedField(getField(schema2(), 2), 0))).isTrue(); + + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 DROP NOT NULL"); + assertThat(fieldIsNullable(getNestedField(getNestedField(getField(schema2(), 2), 0), 1))) + .isTrue(); + } + + @Test + public void testAlterPrimaryKeyNullability() { + spark.sql("USE tablestore"); + spark.sql( + "CREATE TABLE default.testAlterPkNullability (\n" + + "a BIGINT,\n" + + "b STRING) USING tablestore\n" + + "COMMENT 'table comment'\n" + + "TBLPROPERTIES ('primary-key' = 'a')"); + assertThatThrownBy( + () -> + spark.sql( + "ALTER TABLE default.testAlterPkNullability ALTER COLUMN a DROP NOT NULL")) + .getRootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("Cannot change nullability of primary key"); + } + + @Test + public void testAlterTableColumnComment() { + assertThat(getField(schema1(), 0).description()).isNull(); + + spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'a new comment'"); + assertThat(getField(schema1(), 0).description()).isEqualTo("a new comment"); + + spark.sql("ALTER TABLE tablestore.default.t1 ALTER COLUMN a COMMENT 'yet another comment'"); + assertThat(getField(schema1(), 0).description()).isEqualTo("yet another comment"); + + assertThat(getField(schema2(), 2).description()).isEqualTo("comment about c"); + assertThat(getNestedField(getField(schema2(), 2), 0).description()).isNull(); + assertThat(getNestedField(getField(schema2(), 2), 1).description()) + .isEqualTo("comment about c2"); + assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description()) + .isNull(); + assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description()) + .isNull(); + + spark.sql( + "ALTER TABLE tablestore.default.t2 ALTER COLUMN c COMMENT 'yet another comment about c'"); + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1 COMMENT 'a nested type'"); + spark.sql("ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c2 COMMENT 'a bigint type'"); + spark.sql( + "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c11 COMMENT 'a double type'"); + spark.sql( + "ALTER TABLE tablestore.default.t2 ALTER COLUMN c.c1.c12 COMMENT 'a boolean array'"); + + assertThat(getField(schema2(), 2).description()).isEqualTo("yet another comment about c"); + assertThat(getNestedField(getField(schema2(), 2), 0).description()) + .isEqualTo("a nested type"); + assertThat(getNestedField(getField(schema2(), 2), 1).description()) + .isEqualTo("a bigint type"); + assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 0).description()) + .isEqualTo("a double type"); + assertThat(getNestedField(getNestedField(getField(schema2(), 2), 0), 1).description()) + .isEqualTo("a boolean array"); + } + + /** + * Test for schema evolution as followed: + * + * <ul> + * <li>1. Create table with fields ["a", "b", "c"], insert 2 records + * <li>2. Rename "a->aa", "c"->"a", "b"->"c", insert 2 records + * <li>3. Drop fields "aa", "c", insert 2 records + * <li>4. Add new fields "d", "c", "b", insert 2 records + * </ul> + * + * <p>Verify records in table above. + */ + @Test + public void testSchemaEvolution() throws Exception { + // Create table with fields [a, b, c] and insert 2 records + Path tablePath = new Path(warehousePath, "default.db/testSchemaEvolution"); + SimpleTableTestHelper testHelper1 = createTestHelper(tablePath); + testHelper1.write(GenericRowData.of(1, 2L, StringData.fromString("3"))); + testHelper1.write(GenericRowData.of(4, 5L, StringData.fromString("6"))); + testHelper1.commit(); + assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString()) + .isEqualTo("[[1,2,3], [4,5,6]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("a", "b", "c") + .collectAsList() + .toString()) + .isEqualTo("[[1,2,3], [4,5,6]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .filter("a>1") + .collectAsList() + .toString()) + .isEqualTo("[[4,5,6]]"); + + // Rename "a->aa", "c"->"a", "b"->"c" and the fields are [aa, c, a], insert 2 records + spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN a to aa"); + spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN c to a"); + spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution RENAME COLUMN b to c"); + SimpleTableTestHelper testHelper2 = createTestHelperWithoutDDL(tablePath); + testHelper2.write(GenericRowData.of(7, 8L, StringData.fromString("9"))); + testHelper2.write(GenericRowData.of(10, 11L, StringData.fromString("12"))); + testHelper2.commit(); + assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString()) + .isEqualTo("[[1,2,3], [4,5,6], [7,8,9], [10,11,12]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("aa", "a", "c") + .collectAsList() + .toString()) + .isEqualTo("[[1,3,2], [4,6,5], [7,9,8], [10,12,11]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("aa", "a", "c") + .filter("aa>4") + .collectAsList() + .toString()) + .isEqualTo("[[7,9,8], [10,12,11]]"); + + // Drop fields "aa", "c" and the fields are [a], insert 2 records + spark.sql("ALTER TABLE tablestore.default.testSchemaEvolution DROP COLUMNS aa, c"); + SimpleTableTestHelper testHelper3 = createTestHelperWithoutDDL(tablePath); + testHelper3.write(GenericRowData.of(StringData.fromString("13"))); + testHelper3.write(GenericRowData.of(StringData.fromString("14"))); + testHelper3.commit(); + assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString()) + .isEqualTo("[[3], [6], [9], [12], [13], [14]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("a") + .filter("a>10") + .collectAsList() + .toString()) + .isEqualTo("[[12], [13], [14]]"); + + // Add new fields "d", "c", "b" and the fields are [a, d, c, b], insert 2 records + spark.sql( + "ALTER TABLE tablestore.default.testSchemaEvolution ADD COLUMNS (d INT, c INT, b INT)"); + SimpleTableTestHelper testHelper4 = createTestHelperWithoutDDL(tablePath); + testHelper4.write(GenericRowData.of(StringData.fromString("15"), 16, 17, 18)); + testHelper4.write(GenericRowData.of(StringData.fromString("19"), 20, 21, 22)); + testHelper4.commit(); + assertThat(spark.table("tablestore.default.testSchemaEvolution").collectAsList().toString()) + .isEqualTo( + "[[3,null,null,null], " + + "[6,null,null,null], " + + "[9,null,null,null], " + + "[12,null,null,null], " + + "[13,null,null,null], " + + "[14,null,null,null], " + + "[15,16,17,18], " + + "[19,20,21,22]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .filter("a>10") + .collectAsList() + .toString()) + .isEqualTo( + "[[12,null,null,null], " + + "[13,null,null,null], " + + "[14,null,null,null], " + + "[15,16,17,18], " + + "[19,20,21,22]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("a", "b", "c", "d") + .filter("a>10") + .collectAsList() + .toString()) + .isEqualTo( + "[[12,null,null,null], " + + "[13,null,null,null], " + + "[14,null,null,null], " + + "[15,18,17,16], " + + "[19,22,21,20]]"); + assertThat( + spark.table("tablestore.default.testSchemaEvolution") + .select("a", "b", "c", "d") + .filter("a>10 and b is not null") + .collectAsList() + .toString()) + .isEqualTo("[[15,18,17,16], [19,22,21,20]]"); + } +}