This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c7843a1a4c3302fe107860792180526d553d9e73 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Mar 24 10:55:10 2026 +0100 [FLINK-39284][table] Extract test for `CREATE OR ALTER MATERIALIZED TABLE` --- ...erializedTableNodeToOperationConverterTest.java | 203 +------------- ...reateOrAlterMaterializedTableConverterTest.java | 312 +++++++++++++++++++++ 2 files changed, 314 insertions(+), 201 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index dd00b8c8506..32281b83936 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -37,8 +37,6 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; -import org.apache.flink.table.catalog.TableDistribution; -import org.apache.flink.table.catalog.TableDistribution.Kind; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; @@ -53,7 +51,6 @@ import org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; -import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; import org.apache.flink.table.planner.utils.TableFunc0; import org.junit.jupiter.api.BeforeEach; @@ -546,7 +543,8 @@ class SqlMaterializedTableNodeToOperationConverterTest + "FROM `builtin`.`default`.`t3` AS `t3`")); assertThat(operation.asSummaryString()) .isEqualTo( - "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS " + + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + "FROM `builtin`.`default`.`t3` AS `t3`"); // new table only difference schema & definition query with old table. @@ -599,48 +597,6 @@ class SqlMaterializedTableNodeToOperationConverterTest + "FROM `builtin`.`default`.`t3` AS `t3`")); } - @Test - void testAlterMaterializedTableAsQueryWithDefinedSchema() { - String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (" - + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` STRING, `a1` BIGINT NOT NULL, `f` INT) " - + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t3"; - FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = - (FullAlterMaterializedTableOperation) parse(sql); - - assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) - .containsExactly( - // If NOT NULL is defined in schema, it should stay - TableChange.add(Column.physical("a1", DataTypes.BIGINT().notNull())), - TableChange.add(Column.physical("f", DataTypes.INT())), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS `f`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`a` AS `a1`, 3 AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.reset("connector"), - TableChange.reset("format")); - } - - @Test - void testAlterMaterializedTableAsQueryWithoutDefinedSchema() { - String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl " - + "AS SELECT a, b, c, d, a as `a1` FROM t3"; - FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = - (FullAlterMaterializedTableOperation) parse(sql); - - assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) - .containsExactly( - // No explicit schema, so nullable will be used - TableChange.add(Column.physical("a1", DataTypes.BIGINT())), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`a` AS `a1`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.reset("connector"), - TableChange.reset("format")); - } - @Test void testDropMaterializedTable() { final String sql = "DROP MATERIALIZED TABLE mtbl1"; @@ -694,161 +650,6 @@ class SqlMaterializedTableNodeToOperationConverterTest assertThat(materializedTable.getOrigin()).isEqualTo(expected); } - private static Collection<TestSpec> createOrAlterForExistingMaterializedTableFailedCaseSpecs() { - return List.of( - TestSpec.of( - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = CONTINUOUS\n" - + "AS SELECT * FROM t1", - "Changing of REFRESH MODE is unsupported"), - TestSpec.of( - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " a BIGINT, b INT, c INT, d INT, " - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT * FROM t1", - "Incompatible types for sink column 'b' at position 2. " - + "The source column has type 'STRING', while the target column has type 'INT'.")); - } - - @Test - void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistException { - final String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n" - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n" - + "PARTITIONED BY (a, d)\n" - + "WITH (\n" - + " 'format' = 'json2'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t3"; - Operation operation = parse(sql); - - assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); - - FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; - assertThat(op.getTableChanges()) - .containsExactly( - TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), - TableChange.modifyDefinitionQuery( - "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, CAST('123' AS STRING) AS `f`\nFROM `t3`", - "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`"), - TableChange.set("format", "json2"), - TableChange.reset("connector"), - TableChange.add(TableDistribution.of(Kind.HASH, 7, List.of("b")))); - assertThat(operation.asSummaryString()) - .isEqualTo( - "CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl\n" - + " ADD `e` STRING ,\n" - + " ADD `f` STRING ,\n" - + " MODIFY DEFINITION QUERY TO 'SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" - + "FROM `builtin`.`default`.`t3` AS `t3`',\n" - + " SET 'format' = 'json2',\n" - + " RESET 'connector',\n" - + " ADD DISTRIBUTED BY HASH(`b`) INTO 7 BUCKETS"); - - // new table only difference schema & definition query with old table. - CatalogMaterializedTable oldTable = - (CatalogMaterializedTable) - catalog.getTable( - new ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl")); - CatalogMaterializedTable newTable = op.getNewTable(); - - assertThat(newTable.getOptions()).containsExactly(Map.entry("format", "json2")); - assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); - assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) - .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); - assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) - .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); - assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery()); - assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery()); - assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); - assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); - assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); - assertThat(oldTable.getSerializedRefreshHandler()) - .isEqualTo(newTable.getSerializedRefreshHandler()); - - List<UnresolvedColumn> addedColumn = - newTable.getUnresolvedSchema().getColumns().stream() - .filter( - column -> - !oldTable.getUnresolvedSchema() - .getColumns() - .contains(column)) - .collect(Collectors.toList()); - // added column should be a nullable column. - assertThat(addedColumn) - .containsExactly( - new UnresolvedPhysicalColumn("e", DataTypes.VARCHAR(Integer.MAX_VALUE)), - new UnresolvedPhysicalColumn("f", DataTypes.VARCHAR(Integer.MAX_VALUE))); - } - - @Test - void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() { - final String sql = - "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata (\n" - + " t AS current_timestamp," - + " m STRING METADATA VIRTUAL," - + " m_p STRING METADATA," - + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED," - + " WATERMARK FOR t as current_timestamp - INTERVAL '5' SECOND" - + ")\n" - + "COMMENT 'materialized table comment'\n" - + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n" - + "WITH (\n" - + " 'connector' = 'filesystem', \n" - + " 'format' = 'json'\n" - + ")\n" - + "FRESHNESS = INTERVAL '30' SECOND\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT t1.* FROM t1"; - Operation operation = parse(sql); - - assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); - - FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; - assertThat(op.getTableChanges()) - .containsExactly( - TableChange.modifyDefinitionQuery( - "SELECT `t1`.*\n" + "FROM `t1`", - "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`"), - TableChange.set("connector", "filesystem"), - TableChange.set("format", "json"), - TableChange.modify(TableDistribution.of(Kind.HASH, 5, List.of("a")))); - assertThat(operation.asSummaryString()) - .isEqualTo( - "CREATE OR ALTER MATERIALIZED TABLE builtin.default.base_mtbl_with_metadata\n" - + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" - + "FROM `builtin`.`default`.`t1` AS `t1`',\n" - + " SET 'connector' = 'filesystem',\n" - + " SET 'format' = 'json',\n" - + " MODIFY DISTRIBUTED BY HASH(`a`) INTO 5 BUCKETS"); - } - private static Collection<TestSpec> testDataForCreateAlterMaterializedTableFailedCase() { final Collection<TestSpec> list = new ArrayList<>(); list.addAll(createWithInvalidSchema()); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java new file mode 100644 index 00000000000..2d4940369f1 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.TableDistribution; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest + extends SqlNodeToOperationConversionTestBase { + private static final String DEFAULT_MATERIALIZED_TABLE = + "CREATE MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'connector' = 'filesystem', \n" + + " 'format' = 'json'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT * FROM t1"; + + @BeforeEach + void before() throws TableAlreadyExistException, DatabaseNotExistException { + super.before(); + createMaterializedTableInCatalog(DEFAULT_MATERIALIZED_TABLE, "mt"); + } + + @Test + void testAlterMaterializedTableAsQueryWithDefinedSchema() { + String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (" + + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` STRING, `a1` BIGINT NOT NULL, `f` INT) " + + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t1"; + FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = + (FullAlterMaterializedTableOperation) parse(sql); + + assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) + .containsExactly( + // If NOT NULL is defined in schema, it should stay + TableChange.add(Column.physical("a1", DataTypes.BIGINT().notNull())), + TableChange.add(Column.physical("f", DataTypes.INT())), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS `f`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.reset("connector"), + TableChange.reset("format")); + } + + @Test + void testAlterMaterializedTableAsQueryWithoutDefinedSchema() { + String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt " + + "AS SELECT a, b, c, d, a as `a1` FROM t1"; + FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery = + (FullAlterMaterializedTableOperation) parse(sql); + + assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges()) + .containsExactly( + // No explicit schema, so nullable will be used + TableChange.add(Column.physical("a1", DataTypes.BIGINT())), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`a` AS `a1`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.reset("connector"), + TableChange.reset("format")); + } + + @ParameterizedTest + @MethodSource("createOrAlterForExistingMaterializedTableFailedCaseSpecs") + void createOrAlterForExistingMaterializedTableFailedCase(TestSpec spec) { + Operation operation = parse(spec.sql); + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + // Will be invoked while operation#execute + assertThatThrownBy(op::getTableChanges) + .isInstanceOf(spec.expectedException) + .hasMessage(spec.errMessage); + } + + private static Collection<TestSpec> createOrAlterForExistingMaterializedTableFailedCaseSpecs() { + return List.of( + TestSpec.of( + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT * FROM t1", + "Changing of REFRESH MODE is unsupported"), + TestSpec.of( + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " a BIGINT, b INT, c INT, d INT, " + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "AS SELECT * FROM t1", + "Incompatible types for sink column 'b' at position 2. " + + "The source column has type 'STRING', while the target column has type 'INT'.")); + } + + @Test + void testCreateOrAlterMaterializedTableForExistingTable() throws TableNotExistException { + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt (\n" + + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" + + ")\n" + + "COMMENT 'materialized table comment'\n" + + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n" + + "PARTITIONED BY (a, d)\n" + + "WITH (\n" + + " 'format' = 'json2'\n" + + ")\n" + + "FRESHNESS = INTERVAL '30' SECOND\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT a, b, c, d, d as e, cast('123' as string) as f FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.add(Column.physical("e", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.add(Column.physical("f", DataTypes.VARCHAR(Integer.MAX_VALUE))), + TableChange.modifyDefinitionQuery( + "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, CAST('123' AS STRING) AS `f`\nFROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.set("format", "json2"), + TableChange.reset("connector"), + TableChange.add( + TableDistribution.of( + TableDistribution.Kind.HASH, 7, List.of("b")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt\n" + + " ADD `e` STRING ,\n" + + " ADD `f` STRING ,\n" + + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`',\n" + + " SET 'format' = 'json2',\n" + + " RESET 'connector',\n" + + " ADD DISTRIBUTED BY HASH(`b`) INTO 7 BUCKETS"); + + // new table only difference schema & definition query with old table. + CatalogMaterializedTable oldTable = + (CatalogMaterializedTable) + catalog.getTable(new ObjectPath(catalogManager.getCurrentDatabase(), "mt")); + CatalogMaterializedTable newTable = op.getNewTable(); + + assertThat(newTable.getOptions()).containsExactly(Map.entry("format", "json2")); + assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema()); + assertThat(oldTable.getUnresolvedSchema().getPrimaryKey()) + .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey()); + assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs()) + .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery()); + assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode()); + assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + + List<Schema.UnresolvedColumn> addedColumn = + newTable.getUnresolvedSchema().getColumns().stream() + .filter( + column -> + !oldTable.getUnresolvedSchema() + .getColumns() + .contains(column)) + .collect(Collectors.toList()); + // added column should be a nullable column. + assertThat(addedColumn) + .containsExactly( + new Schema.UnresolvedPhysicalColumn( + "e", DataTypes.VARCHAR(Integer.MAX_VALUE)), + new Schema.UnresolvedPhysicalColumn( + "f", DataTypes.VARCHAR(Integer.MAX_VALUE))); + } + + @Test + void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() + throws TableAlreadyExistException, DatabaseNotExistException { + final String prepSql = + "CREATE MATERIALIZED TABLE mt2\n" + + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n" + + "AS SELECT t1.* FROM t1"; + createMaterializedTableInCatalog(prepSql, "mt2"); + + final String sql = + "CREATE OR ALTER MATERIALIZED TABLE mt2\n" + + "DISTRIBUTED BY HASH (b) INTO 4 BUCKETS\n" + + "AS SELECT t1.* FROM t1"; + Operation operation = parse(sql); + + assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class); + + FullAlterMaterializedTableOperation op = (FullAlterMaterializedTableOperation) operation; + assertThat(op.getTableChanges()) + .containsExactly( + TableChange.modifyDefinitionQuery( + "SELECT `t1`.*\n" + "FROM `t1`", + "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`"), + TableChange.modify( + TableDistribution.of( + TableDistribution.Kind.HASH, 4, List.of("b")))); + assertThat(operation.asSummaryString()) + .isEqualTo( + "CREATE OR ALTER MATERIALIZED TABLE builtin.default.mt2\n" + + " MODIFY DEFINITION QUERY TO 'SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n" + + "FROM `builtin`.`default`.`t1` AS `t1`',\n" + + " MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 BUCKETS"); + } + + private void createMaterializedTableInCatalog(String sql, String materializedTableName) + throws TableAlreadyExistException, DatabaseNotExistException { + final ObjectPath objectPath = + new ObjectPath(catalogManager.getCurrentDatabase(), materializedTableName); + final CreateMaterializedTableOperation operation = createMaterializedTableOperation(sql); + catalog.createTable(objectPath, operation.getCatalogMaterializedTable(), true); + } + + private CreateMaterializedTableOperation createMaterializedTableOperation(String sql) { + final Operation operation = parse(sql); + assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); + return (CreateMaterializedTableOperation) operation; + } + + private static class TestSpec { + private final String sql; + private final Class<?> expectedException; + private final String errMessage; + private final String expectedSchema; + + private TestSpec(String sql, Class<?> expectedException, String errMessage) { + this.sql = sql; + this.expectedException = expectedException; + this.errMessage = errMessage; + this.expectedSchema = null; + } + + private TestSpec(String sql, String expectedSchema) { + this.sql = sql; + this.expectedException = null; + this.errMessage = null; + this.expectedSchema = expectedSchema; + } + + public static TestSpec of(String sql, Class<?> expectedException, String errMessage) { + return new TestSpec(sql, expectedException, errMessage); + } + + public static TestSpec of(String sql, String errMessage) { + return of(sql, ValidationException.class, errMessage); + } + + public static TestSpec withExpectedSchema(String sql, String expectedSchema) { + return new TestSpec(sql, expectedSchema); + } + + @Override + public String toString() { + return sql; + } + } +}
