This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7843a1a4c3302fe107860792180526d553d9e73
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Tue Mar 24 10:55:10 2026 +0100

    [FLINK-39284][table] Extract test for `CREATE OR ALTER MATERIALIZED TABLE`
---
 ...erializedTableNodeToOperationConverterTest.java | 203 +-------------
 ...reateOrAlterMaterializedTableConverterTest.java | 312 +++++++++++++++++++++
 2 files changed, 314 insertions(+), 201 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index dd00b8c8506..32281b83936 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -37,8 +37,6 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
-import org.apache.flink.table.catalog.TableDistribution;
-import org.apache.flink.table.catalog.TableDistribution.Kind;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.WatermarkSpec;
@@ -53,7 +51,6 @@ import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTabl
 import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
 import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
 import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
-import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
 import org.apache.flink.table.planner.utils.TableFunc0;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -546,7 +543,8 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
         assertThat(operation.asSummaryString())
                 .isEqualTo(
-                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS 
STRING) AS `f`\n"
+                        "ALTER MATERIALIZED TABLE builtin.default.base_mtbl AS 
"
+                                + "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
                                 + "FROM `builtin`.`default`.`t3` AS `t3`");
 
         // new table only difference schema & definition query with old table.
@@ -599,48 +597,6 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         + "FROM `builtin`.`default`.`t3` AS 
`t3`"));
     }
 
-    @Test
-    void testAlterMaterializedTableAsQueryWithDefinedSchema() {
-        String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl ("
-                        + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` 
STRING, `a1` BIGINT NOT NULL, `f` INT) "
-                        + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t3";
-        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
-                (FullAlterMaterializedTableOperation) parse(sql);
-
-        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
-                .containsExactly(
-                        // If NOT NULL is defined in schema, it should stay
-                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT().notNull())),
-                        TableChange.add(Column.physical("f", DataTypes.INT())),
-                        TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS 
`f`\nFROM `t3`",
-                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`a` AS `a1`, 3 AS `f`\n"
-                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
-                        TableChange.reset("connector"),
-                        TableChange.reset("format"));
-    }
-
-    @Test
-    void testAlterMaterializedTableAsQueryWithoutDefinedSchema() {
-        String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl "
-                        + "AS SELECT a, b, c, d, a as `a1` FROM t3";
-        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
-                (FullAlterMaterializedTableOperation) parse(sql);
-
-        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
-                .containsExactly(
-                        // No explicit schema, so nullable will be used
-                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
-                        TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t3`",
-                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`a` AS `a1`\n"
-                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
-                        TableChange.reset("connector"),
-                        TableChange.reset("format"));
-    }
-
     @Test
     void testDropMaterializedTable() {
         final String sql = "DROP MATERIALIZED TABLE mtbl1";
@@ -694,161 +650,6 @@ class SqlMaterializedTableNodeToOperationConverterTest
         assertThat(materializedTable.getOrigin()).isEqualTo(expected);
     }
 
-    private static Collection<TestSpec> 
createOrAlterForExistingMaterializedTableFailedCaseSpecs() {
-        return List.of(
-                TestSpec.of(
-                        "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
-                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
-                                + ")\n"
-                                + "COMMENT 'materialized table comment'\n"
-                                + "PARTITIONED BY (a, d)\n"
-                                + "WITH (\n"
-                                + "  'connector' = 'filesystem', \n"
-                                + "  'format' = 'json'\n"
-                                + ")\n"
-                                + "FRESHNESS = INTERVAL '30' SECOND\n"
-                                + "REFRESH_MODE = CONTINUOUS\n"
-                                + "AS SELECT * FROM t1",
-                        "Changing of REFRESH MODE is unsupported"),
-                TestSpec.of(
-                        "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
-                                + "   a BIGINT, b INT, c INT, d INT, "
-                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
-                                + ")\n"
-                                + "COMMENT 'materialized table comment'\n"
-                                + "PARTITIONED BY (a, d)\n"
-                                + "WITH (\n"
-                                + "  'connector' = 'filesystem', \n"
-                                + "  'format' = 'json'\n"
-                                + ")\n"
-                                + "FRESHNESS = INTERVAL '30' SECOND\n"
-                                + "REFRESH_MODE = FULL\n"
-                                + "AS SELECT * FROM t1",
-                        "Incompatible types for sink column 'b' at position 2. 
"
-                                + "The source column has type 'STRING', while 
the target column has type 'INT'."));
-    }
-
-    @Test
-    void testCreateOrAlterMaterializedTableForExistingTable() throws 
TableNotExistException {
-        final String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl (\n"
-                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
-                        + ")\n"
-                        + "COMMENT 'materialized table comment'\n"
-                        + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n"
-                        + "PARTITIONED BY (a, d)\n"
-                        + "WITH (\n"
-                        + "  'format' = 'json2'\n"
-                        + ")\n"
-                        + "FRESHNESS = INTERVAL '30' SECOND\n"
-                        + "REFRESH_MODE = FULL\n"
-                        + "AS SELECT a, b, c, d, d as e, cast('123' as string) 
as f FROM t3";
-        Operation operation = parse(sql);
-
-        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
-
-        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
-        assertThat(op.getTableChanges())
-                .containsExactly(
-                        TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                        TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
-                        TableChange.modifyDefinitionQuery(
-                                "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, 
CAST('123' AS STRING) AS `f`\nFROM `t3`",
-                                "SELECT `t3`.`a`, `t3`.`b`, `t3`.`c`, 
`t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
-                                        + "FROM `builtin`.`default`.`t3` AS 
`t3`"),
-                        TableChange.set("format", "json2"),
-                        TableChange.reset("connector"),
-                        TableChange.add(TableDistribution.of(Kind.HASH, 7, 
List.of("b"))));
-        assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.base_mtbl\n"
-                                + "  ADD `e` STRING ,\n"
-                                + "  ADD `f` STRING ,\n"
-                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t3`.`a`, `t3`.`b`, `t3`.`c`, `t3`.`d`, `t3`.`d` AS `e`, CAST('123' AS STRING) 
AS `f`\n"
-                                + "FROM `builtin`.`default`.`t3` AS `t3`',\n"
-                                + "  SET 'format' = 'json2',\n"
-                                + "  RESET 'connector',\n"
-                                + "  ADD DISTRIBUTED BY HASH(`b`) INTO 7 
BUCKETS");
-
-        // new table only difference schema & definition query with old table.
-        CatalogMaterializedTable oldTable =
-                (CatalogMaterializedTable)
-                        catalog.getTable(
-                                new 
ObjectPath(catalogManager.getCurrentDatabase(), "base_mtbl"));
-        CatalogMaterializedTable newTable = op.getNewTable();
-
-        assertThat(newTable.getOptions()).containsExactly(Map.entry("format", 
"json2"));
-        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
-        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
-                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
-        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
-                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
-        
assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery());
-        
assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery());
-        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
-        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
-        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
-        assertThat(oldTable.getSerializedRefreshHandler())
-                .isEqualTo(newTable.getSerializedRefreshHandler());
-
-        List<UnresolvedColumn> addedColumn =
-                newTable.getUnresolvedSchema().getColumns().stream()
-                        .filter(
-                                column ->
-                                        !oldTable.getUnresolvedSchema()
-                                                .getColumns()
-                                                .contains(column))
-                        .collect(Collectors.toList());
-        // added column should be a nullable column.
-        assertThat(addedColumn)
-                .containsExactly(
-                        new UnresolvedPhysicalColumn("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE)),
-                        new UnresolvedPhysicalColumn("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE)));
-    }
-
-    @Test
-    void testCreateOrAlterMaterializedTableWithDistributionForExistingTable() {
-        final String sql =
-                "CREATE OR ALTER MATERIALIZED TABLE base_mtbl_with_metadata 
(\n"
-                        + "   t AS current_timestamp,"
-                        + "   m STRING METADATA VIRTUAL,"
-                        + "   m_p STRING METADATA,"
-                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED,"
-                        + "   WATERMARK FOR t as current_timestamp - INTERVAL 
'5' SECOND"
-                        + ")\n"
-                        + "COMMENT 'materialized table comment'\n"
-                        + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n"
-                        + "WITH (\n"
-                        + "  'connector' = 'filesystem', \n"
-                        + "  'format' = 'json'\n"
-                        + ")\n"
-                        + "FRESHNESS = INTERVAL '30' SECOND\n"
-                        + "REFRESH_MODE = FULL\n"
-                        + "AS SELECT t1.* FROM t1";
-        Operation operation = parse(sql);
-
-        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
-
-        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
-        assertThat(op.getTableChanges())
-                .containsExactly(
-                        TableChange.modifyDefinitionQuery(
-                                "SELECT `t1`.*\n" + "FROM `t1`",
-                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
-                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
-                        TableChange.set("connector", "filesystem"),
-                        TableChange.set("format", "json"),
-                        TableChange.modify(TableDistribution.of(Kind.HASH, 5, 
List.of("a"))));
-        assertThat(operation.asSummaryString())
-                .isEqualTo(
-                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.base_mtbl_with_metadata\n"
-                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
-                                + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
-                                + "  SET 'connector' = 'filesystem',\n"
-                                + "  SET 'format' = 'json',\n"
-                                + "  MODIFY DISTRIBUTED BY HASH(`a`) INTO 5 
BUCKETS");
-    }
-
     private static Collection<TestSpec> 
testDataForCreateAlterMaterializedTableFailedCase() {
         final Collection<TestSpec> list = new ArrayList<>();
         list.addAll(createWithInvalidSchema());
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
new file mode 100644
index 00000000000..2d4940369f1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class SqlNodeToOperationSqlCreateOrAlterMaterializedTableConverterTest
+        extends SqlNodeToOperationConversionTestBase {
+    private static final String DEFAULT_MATERIALIZED_TABLE =
+            "CREATE MATERIALIZED TABLE mt (\n"
+                    + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                    + ")\n"
+                    + "COMMENT 'materialized table comment'\n"
+                    + "PARTITIONED BY (a, d)\n"
+                    + "WITH (\n"
+                    + "  'connector' = 'filesystem', \n"
+                    + "  'format' = 'json'\n"
+                    + ")\n"
+                    + "FRESHNESS = INTERVAL '30' SECOND\n"
+                    + "REFRESH_MODE = FULL\n"
+                    + "AS SELECT * FROM t1";
+
+    @BeforeEach
+    void before() throws TableAlreadyExistException, DatabaseNotExistException 
{
+        super.before();
+        createMaterializedTableInCatalog(DEFAULT_MATERIALIZED_TABLE, "mt");
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithDefinedSchema() {
+        String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt ("
+                        + "`a` BIGINT NOT NULL, `b` STRING, `c` INT, `d` 
STRING, `a1` BIGINT NOT NULL, `f` INT) "
+                        + "AS SELECT a, b, c, d, a as `a1`, 3 as f FROM t1";
+        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+                (FullAlterMaterializedTableOperation) parse(sql);
+
+        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+                .containsExactly(
+                        // If NOT NULL is defined in schema, it should stay
+                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT().notNull())),
+                        TableChange.add(Column.physical("f", DataTypes.INT())),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`, 3 AS 
`f`\nFROM `t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`, 3 AS `f`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
+                        TableChange.reset("connector"),
+                        TableChange.reset("format"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryWithoutDefinedSchema() {
+        String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt "
+                        + "AS SELECT a, b, c, d, a as `a1` FROM t1";
+        FullAlterMaterializedTableOperation sqlAlterMaterializedTableAsQuery =
+                (FullAlterMaterializedTableOperation) parse(sql);
+
+        assertThat(sqlAlterMaterializedTableAsQuery.getTableChanges())
+                .containsExactly(
+                        // No explicit schema, so nullable will be used
+                        TableChange.add(Column.physical("a1", 
DataTypes.BIGINT())),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `a`, `b`, `c`, `d`, `a` AS `a1`\nFROM 
`t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`a` AS `a1`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
+                        TableChange.reset("connector"),
+                        TableChange.reset("format"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("createOrAlterForExistingMaterializedTableFailedCaseSpecs")
+    void createOrAlterForExistingMaterializedTableFailedCase(TestSpec spec) {
+        Operation operation = parse(spec.sql);
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        // Will be invoked while operation#execute
+        assertThatThrownBy(op::getTableChanges)
+                .isInstanceOf(spec.expectedException)
+                .hasMessage(spec.errMessage);
+    }
+
+    private static Collection<TestSpec> 
createOrAlterForExistingMaterializedTableFailedCaseSpecs() {
+        return List.of(
+                TestSpec.of(
+                        "CREATE OR ALTER MATERIALIZED TABLE mt (\n"
+                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
+                                + ")\n"
+                                + "REFRESH_MODE = CONTINUOUS\n"
+                                + "AS SELECT * FROM t1",
+                        "Changing of REFRESH MODE is unsupported"),
+                TestSpec.of(
+                        "CREATE OR ALTER MATERIALIZED TABLE mt (\n"
+                                + "   a BIGINT, b INT, c INT, d INT, "
+                                + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT 
ENFORCED"
+                                + ")\n"
+                                + "AS SELECT * FROM t1",
+                        "Incompatible types for sink column 'b' at position 2. 
"
+                                + "The source column has type 'STRING', while 
the target column has type 'INT'."));
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableForExistingTable() throws 
TableNotExistException {
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt (\n"
+                        + "   CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
+                        + ")\n"
+                        + "COMMENT 'materialized table comment'\n"
+                        + "DISTRIBUTED BY HASH (b) INTO 7 BUCKETS\n"
+                        + "PARTITIONED BY (a, d)\n"
+                        + "WITH (\n"
+                        + "  'format' = 'json2'\n"
+                        + ")\n"
+                        + "FRESHNESS = INTERVAL '30' SECOND\n"
+                        + "REFRESH_MODE = FULL\n"
+                        + "AS SELECT a, b, c, d, d as e, cast('123' as string) 
as f FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.add(Column.physical("e", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.add(Column.physical("f", 
DataTypes.VARCHAR(Integer.MAX_VALUE))),
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `a`, `b`, `c`, `d`, `d` AS `e`, 
CAST('123' AS STRING) AS `f`\nFROM `t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) AS `f`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
+                        TableChange.set("format", "json2"),
+                        TableChange.reset("connector"),
+                        TableChange.add(
+                                TableDistribution.of(
+                                        TableDistribution.Kind.HASH, 7, 
List.of("b"))));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt\n"
+                                + "  ADD `e` STRING ,\n"
+                                + "  ADD `f` STRING ,\n"
+                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`, `t1`.`d` AS `e`, CAST('123' AS STRING) 
AS `f`\n"
+                                + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
+                                + "  SET 'format' = 'json2',\n"
+                                + "  RESET 'connector',\n"
+                                + "  ADD DISTRIBUTED BY HASH(`b`) INTO 7 
BUCKETS");
+
+        // new table only difference schema & definition query with old table.
+        CatalogMaterializedTable oldTable =
+                (CatalogMaterializedTable)
+                        catalog.getTable(new 
ObjectPath(catalogManager.getCurrentDatabase(), "mt"));
+        CatalogMaterializedTable newTable = op.getNewTable();
+
+        assertThat(newTable.getOptions()).containsExactly(Map.entry("format", 
"json2"));
+        
assertThat(oldTable.getUnresolvedSchema()).isNotEqualTo(newTable.getUnresolvedSchema());
+        assertThat(oldTable.getUnresolvedSchema().getPrimaryKey())
+                .isEqualTo(newTable.getUnresolvedSchema().getPrimaryKey());
+        assertThat(oldTable.getUnresolvedSchema().getWatermarkSpecs())
+                .isEqualTo(newTable.getUnresolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getOriginalQuery()).isNotEqualTo(newTable.getOriginalQuery());
+        
assertThat(oldTable.getExpandedQuery()).isNotEqualTo(newTable.getExpandedQuery());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+        
assertThat(oldTable.getRefreshMode()).isEqualTo(newTable.getRefreshMode());
+        
assertThat(oldTable.getRefreshStatus()).isEqualTo(newTable.getRefreshStatus());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+
+        List<Schema.UnresolvedColumn> addedColumn =
+                newTable.getUnresolvedSchema().getColumns().stream()
+                        .filter(
+                                column ->
+                                        !oldTable.getUnresolvedSchema()
+                                                .getColumns()
+                                                .contains(column))
+                        .collect(Collectors.toList());
+        // added column should be a nullable column.
+        assertThat(addedColumn)
+                .containsExactly(
+                        new Schema.UnresolvedPhysicalColumn(
+                                "e", DataTypes.VARCHAR(Integer.MAX_VALUE)),
+                        new Schema.UnresolvedPhysicalColumn(
+                                "f", DataTypes.VARCHAR(Integer.MAX_VALUE)));
+    }
+
+    @Test
+    void testCreateOrAlterMaterializedTableWithDistributionForExistingTable()
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final String prepSql =
+                "CREATE MATERIALIZED TABLE mt2\n"
+                        + "DISTRIBUTED BY HASH (a) INTO 5 BUCKETS\n"
+                        + "AS SELECT t1.* FROM t1";
+        createMaterializedTableInCatalog(prepSql, "mt2");
+
+        final String sql =
+                "CREATE OR ALTER MATERIALIZED TABLE mt2\n"
+                        + "DISTRIBUTED BY HASH (b) INTO 4 BUCKETS\n"
+                        + "AS SELECT t1.* FROM t1";
+        Operation operation = parse(sql);
+
+        
assertThat(operation).isInstanceOf(FullAlterMaterializedTableOperation.class);
+
+        FullAlterMaterializedTableOperation op = 
(FullAlterMaterializedTableOperation) operation;
+        assertThat(op.getTableChanges())
+                .containsExactly(
+                        TableChange.modifyDefinitionQuery(
+                                "SELECT `t1`.*\n" + "FROM `t1`",
+                                "SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, 
`t1`.`d`\n"
+                                        + "FROM `builtin`.`default`.`t1` AS 
`t1`"),
+                        TableChange.modify(
+                                TableDistribution.of(
+                                        TableDistribution.Kind.HASH, 4, 
List.of("b"))));
+        assertThat(operation.asSummaryString())
+                .isEqualTo(
+                        "CREATE OR ALTER MATERIALIZED TABLE 
builtin.default.mt2\n"
+                                + " MODIFY DEFINITION QUERY TO 'SELECT 
`t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+                                + "FROM `builtin`.`default`.`t1` AS `t1`',\n"
+                                + "  MODIFY DISTRIBUTED BY HASH(`b`) INTO 4 
BUCKETS");
+    }
+
+    private void createMaterializedTableInCatalog(String sql, String 
materializedTableName)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        final ObjectPath objectPath =
+                new ObjectPath(catalogManager.getCurrentDatabase(), 
materializedTableName);
+        final CreateMaterializedTableOperation operation = 
createMaterializedTableOperation(sql);
+        catalog.createTable(objectPath, 
operation.getCatalogMaterializedTable(), true);
+    }
+
+    private CreateMaterializedTableOperation 
createMaterializedTableOperation(String sql) {
+        final Operation operation = parse(sql);
+        
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
+        return (CreateMaterializedTableOperation) operation;
+    }
+
+    private static class TestSpec {
+        private final String sql;
+        private final Class<?> expectedException;
+        private final String errMessage;
+        private final String expectedSchema;
+
+        private TestSpec(String sql, Class<?> expectedException, String 
errMessage) {
+            this.sql = sql;
+            this.expectedException = expectedException;
+            this.errMessage = errMessage;
+            this.expectedSchema = null;
+        }
+
+        private TestSpec(String sql, String expectedSchema) {
+            this.sql = sql;
+            this.expectedException = null;
+            this.errMessage = null;
+            this.expectedSchema = expectedSchema;
+        }
+
+        public static TestSpec of(String sql, Class<?> expectedException, 
String errMessage) {
+            return new TestSpec(sql, expectedException, errMessage);
+        }
+
+        public static TestSpec of(String sql, String errMessage) {
+            return of(sql, ValidationException.class, errMessage);
+        }
+
+        public static TestSpec withExpectedSchema(String sql, String 
expectedSchema) {
+            return new TestSpec(sql, expectedSchema);
+        }
+
+        @Override
+        public String toString() {
+            return sql;
+        }
+    }
+}

Reply via email to