AHeise commented on code in PR #27594:
URL: https://github.com/apache/flink/pull/27594#discussion_r2872931934


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableChangeOperation.java:
##########
@@ -112,47 +64,20 @@ public List<TableChange> getTableChanges() {
 
     public AlterMaterializedTableChangeOperation copyAsTableChangeOperation() {
         return new AlterMaterializedTableChangeOperation(
-                tableIdentifier, tableChanges, oldTable, 
materializedTableWithAppliedChanges);
+                tableIdentifier, tableChanges, oldTable, newTable, 
validationErrors);
     }
 
-    public CatalogMaterializedTable getMaterializedTableWithAppliedChanges() {
-        // The only case when materializedTableWithAppliedChanges is not null 
from the beginning
-        // is copyAsTableChangeOperation where it copies already evaluated 
materialized table
-        if (oldTable == null || materializedTableWithAppliedChanges != null) {
-            return materializedTableWithAppliedChanges;
+    public CatalogMaterializedTable getNewTable() {
+        if (validationErrors.isEmpty()) {
+            return newTable;
         }
-
-        ChangeContext changeContext = new ChangeContext(oldTable);
-        changeContext.applyTableChanges(tableChanges);
-
-        materializedTableWithAppliedChanges =
-                CatalogMaterializedTable.newBuilder()
-                        .schema(changeContext.retrieveSchema())
-                        .comment(oldTable.getComment())
-                        .partitionKeys(oldTable.getPartitionKeys())
-                        .options(oldTable.getOptions())
-                        .originalQuery(changeContext.originalQuery)
-                        .expandedQuery(changeContext.expandedQuery)
-                        .distribution(changeContext.distribution)
-                        .freshness(oldTable.getDefinitionFreshness())
-                        .logicalRefreshMode(oldTable.getLogicalRefreshMode())
-                        .refreshMode(oldTable.getRefreshMode())
-                        .refreshStatus(changeContext.refreshStatus)
-                        
.refreshHandlerDescription(changeContext.refreshHandlerDesc)
-                        
.serializedRefreshHandler(changeContext.refreshHandlerBytes)
-                        .build();
-
-        return materializedTableWithAppliedChanges;
+        throw new ValidationException(validationErrors.get(0));

Review Comment:
   Would it make sense to join all erors into a big string?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeUtils.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utils for materialized table change calculation. */
+@Internal
+public class MaterializedTableChangeUtils {
+
+    public static List<TableChange> buildTableChanges(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        List<TableChange> tableChanges = new 
ArrayList<>(calculateSchemaChange(oldTable, newTable));
+
+        // Distribution
+        tableChanges.addAll(calculateDistributionChange(oldTable, newTable));
+
+        // Options
+        tableChanges.addAll(calculateOptionsChange(oldTable, newTable));
+
+        // Query
+        tableChanges.addAll(calculateQueryChange(oldTable, newTable));
+        return tableChanges;
+    }
+
+    public static List<TableChange> buildSchemaTableChanges(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Column> oldColumns = oldSchema.getColumns();
+        // Schema retrieved from query doesn't count existing non persisted 
columns
+        final List<Column> newColumns =
+                enrichWithOldNonPersistedColumns(oldSchema, 
newSchema).getColumns();
+
+        final List<Column> oldPersistedColumns = new ArrayList<>();
+        final Map<String, Tuple2<Column, Integer>> oldColumnSet = new 
HashMap<>();
+        for (int i = 0; i < oldColumns.size(); i++) {
+            Column column = oldColumns.get(i);
+            if (column.isPersisted()) {
+                oldPersistedColumns.add(oldColumns.get(i));
+            }

Review Comment:
   Let me recap, how I understand the treatment of non-physical columns.
   
   Let's say if I have 
   
   ```
   CREATE MATERIALIZED TABLE
   (
     physical1,
     metadata1,
     physical2,
     virtual1,
     physical3,
     computed1,
     physical4
   )
   ...
   ```
   
   Then, all changes to non-physical columns will not end up changing the 
storage.
   
   So if we remove metadata1 (non-virtual), the physical format will change, we 
get a bunch of column position changes, and we will fail later.
   
   If virtual1 or computed1 are removed, the physical format is stable, and 
thus we do not column position changes.
   
   So far, so solid.
   
   But when we swap virtual1 with physical3, we wouldn't get any TableChange. 
However, we get a schema change when we add a non-persisted column... I want to 
avoid inconsistencies.
   
   I wonder if we shouldn't just treat all columns the same, generate 
respective TableChanges and then leave it to the catalog to decide to ignore 
them or not.
   
   



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeUtils.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utils for materialized table change calculation. */
+@Internal
+public class MaterializedTableChangeUtils {
+
+    public static List<TableChange> buildTableChanges(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        List<TableChange> tableChanges = new 
ArrayList<>(calculateSchemaChange(oldTable, newTable));
+
+        // Distribution
+        tableChanges.addAll(calculateDistributionChange(oldTable, newTable));
+
+        // Options
+        tableChanges.addAll(calculateOptionsChange(oldTable, newTable));
+
+        // Query
+        tableChanges.addAll(calculateQueryChange(oldTable, newTable));
+        return tableChanges;
+    }
+
+    public static List<TableChange> buildSchemaTableChanges(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Column> oldColumns = oldSchema.getColumns();
+        // Schema retrieved from query doesn't count existing non persisted 
columns
+        final List<Column> newColumns =
+                enrichWithOldNonPersistedColumns(oldSchema, 
newSchema).getColumns();
+
+        final List<Column> oldPersistedColumns = new ArrayList<>();
+        final Map<String, Tuple2<Column, Integer>> oldColumnSet = new 
HashMap<>();
+        for (int i = 0; i < oldColumns.size(); i++) {
+            Column column = oldColumns.get(i);
+            if (column.isPersisted()) {
+                oldPersistedColumns.add(oldColumns.get(i));
+            }
+            oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i), 
i));
+        }
+
+        if (oldPersistedColumns.equals(newSchema.getColumns())) {
+            // No changes for persisted columns
+            return new ArrayList<>();
+        }
+
+        List<TableChange> changes = new ArrayList<>();
+        for (int i = 0; i < newColumns.size(); i++) {
+            Column newColumn = newColumns.get(i);
+            Tuple2<Column, Integer> oldColumnToPosition = 
oldColumnSet.get(newColumn.getName());
+
+            if (oldColumnToPosition == null) {
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if position changed
+            applyPositionChanges(newColumns, oldColumnToPosition, i, changes);
+
+            Column oldColumn = oldColumnToPosition.f0;
+            // Check if column changed
+            // Note: it could be unchanged while the position is changed
+            if (oldColumn.equals(newColumn)) {
+                // no changes
+                continue;
+            }
+
+            // Check if kind changed
+            if (oldColumn.getClass() != newColumn.getClass()) {
+                changes.add(TableChange.dropColumn(oldColumn.getName()));
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if comment is changed
+            if (!Objects.equals(
+                    oldColumn.getComment().orElse(null), 
newColumn.getComment().orElse(null))) {
+                changes.add(
+                        TableChange.modifyColumnComment(
+                                oldColumn, 
newColumn.getComment().orElse(null)));
+            }
+
+            // Check if physical column type changed
+            if (oldColumn.isPhysical()
+                    && newColumn.isPhysical()
+                    && 
!oldColumn.getDataType().equals(newColumn.getDataType())) {
+                changes.add(
+                        TableChange.modifyPhysicalColumnType(oldColumn, 
newColumn.getDataType()));
+            }
+
+            // Check if metadata fields changed
+            if (oldColumn instanceof Column.MetadataColumn) {
+                applyMetadataColumnChanges(
+                        (Column.MetadataColumn) oldColumn,
+                        (Column.MetadataColumn) newColumn,
+                        changes);
+            }
+
+            // Check if computed expression changed
+            if (oldColumn instanceof Column.ComputedColumn) {
+                applyComputedColumnChanges(
+                        (Column.ComputedColumn) oldColumn,
+                        (Column.ComputedColumn) newColumn,
+                        changes);
+            }
+        }
+
+        for (Column newColumn : newColumns) {
+            oldColumnSet.remove(newColumn.getName());
+        }
+
+        for (Map.Entry<String, Tuple2<Column, Integer>> entry : 
oldColumnSet.entrySet()) {
+            changes.add(TableChange.dropColumn(entry.getKey()));
+        }
+
+        return changes;
+    }
+
+    private static ResolvedSchema enrichWithOldNonPersistedColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Integer> nonPersistedColumnIndexes = 
getNonPersistedColumnIndexes(oldSchema);
+        if (nonPersistedColumnIndexes.isEmpty()) {
+            return newSchema;
+        }
+
+        final Column[] newColumns =
+                new Column[newSchema.getColumnCount() + 
nonPersistedColumnIndexes.size()];
+        int nextFreePosition = 0;
+
+        // Preserve initial positions of non persisted columns
+        // It will allow to not generate extra column position changes
+        for (int index : nonPersistedColumnIndexes) {
+            newColumns[index] = oldSchema.getColumn(index).get();
+            if (index == nextFreePosition) {
+                nextFreePosition++;
+            }
+        }
+
+        for (int i = 0; i < newSchema.getColumnCount(); i++) {
+            newColumns[nextFreePosition] = newSchema.getColumn(i).get();
+            while (nextFreePosition < newColumns.length && 
newColumns[nextFreePosition] != null) {
+                nextFreePosition++;
+            }
+        }
+
+        return ResolvedSchema.of(newColumns);
+    }
+
+    private static List<Integer> getNonPersistedColumnIndexes(ResolvedSchema 
schema) {
+        final List<Integer> nonPersistedColumnIndexes = new ArrayList<>();
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            if (!schema.getColumn(i).get().isPersisted()) {
+                nonPersistedColumnIndexes.add(i);
+            }
+        }
+        return nonPersistedColumnIndexes;
+    }
+
+    private static void applyComputedColumnChanges(
+            Column.ComputedColumn oldColumn,
+            Column.ComputedColumn newColumn,
+            List<TableChange> changes) {
+        if (!oldColumn
+                        .getExpression()
+                        .asSerializableString()
+                        
.equals(newColumn.getExpression().asSerializableString())
+                && !Objects.equals(
+                        oldColumn.explainExtras().orElse(null),
+                        newColumn.explainExtras().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyMetadataColumnChanges(
+            Column.MetadataColumn oldColumn,
+            Column.MetadataColumn newColumn,
+            List<TableChange> changes) {
+        if (oldColumn.isVirtual() != newColumn.isVirtual()
+                || !Objects.equals(
+                        oldColumn.getMetadataKey().orElse(null),
+                        newColumn.getMetadataKey().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyPositionChanges(
+            List<Column> newColumns,
+            Tuple2<Column, Integer> oldColumnToPosition,
+            int currentPosition,
+            List<TableChange> changes) {
+        Column oldColumn = oldColumnToPosition.f0;
+        int oldPosition = oldColumnToPosition.f1;
+        if (oldPosition != currentPosition) {
+            TableChange.ColumnPosition position =
+                    currentPosition == 0
+                            ? TableChange.ColumnPosition.first()
+                            : TableChange.ColumnPosition.after(
+                                    newColumns.get(currentPosition - 
1).getName());
+            changes.add(TableChange.modifyColumnPosition(oldColumn, position));
+        }
+    }
+
+    private static List<TableChange> calculateSchemaChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+        ResolvedSchema newSchema = newTable.getResolvedSchema();
+        return MaterializedTableChangeUtils.buildSchemaTableChanges(oldSchema, 
newSchema);
+    }
+
+    private static List<TableChange> calculateDistributionChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Optional<TableDistribution> oldTableDistribution = 
oldTable.getDistribution();
+        Optional<TableDistribution> newTableDistribution = 
newTable.getDistribution();
+        if (oldTableDistribution.isEmpty() && newTableDistribution.isEmpty()) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent()
+                && newTableDistribution.isPresent()
+                && 
oldTableDistribution.get().equals(newTableDistribution.get())) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent() && 
newTableDistribution.isEmpty()) {
+            return List.of(TableChange.dropDistribution());
+        }
+
+        if (oldTableDistribution.isEmpty()) {
+            return List.of(TableChange.add(newTableDistribution.get()));
+        }
+
+        return List.of(TableChange.modify(newTableDistribution.get()));
+    }
+
+    private static List<TableChange> calculateQueryChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        String oldQuery = oldTable.getOriginalQuery();
+        String newQuery = newTable.getOriginalQuery();
+        if (!oldQuery.equals(newQuery)) {
+            return List.of(
+                    TableChange.modifyDefinitionQuery(newQuery, 
newTable.getExpandedQuery()));
+        }
+        return List.of();
+    }
+
+    private static List<TableChange> calculateOptionsChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Map<String, String> oldOptions = new HashMap<>(oldTable.getOptions());
+        Map<String, String> newOptions = new HashMap<>(newTable.getOptions());
+        if (oldOptions.equals(newOptions)) {

Review Comment:
   You can do the check before copying.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/FullAlterMaterializedTableOperation.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+
+import java.util.List;
+
+/**
+ * Operation for CREATE OR ALTER MATERIALIZED TABLE ... in case materialized 
table is present and
+ * full materialized table changes should be calculated.
+ */
+@Internal
+public class FullAlterMaterializedTableOperation extends 
AlterMaterializedTableChangeOperation {
+
+    public FullAlterMaterializedTableOperation(
+            final ResolvedCatalogMaterializedTable oldTable,
+            final ResolvedCatalogMaterializedTable newTable,
+            ObjectIdentifier tableIdentifier) {
+        super(
+                tableIdentifier,
+                MaterializedTableChangeUtils.buildTableChanges(oldTable, 
newTable),

Review Comment:
   I'd propose to build this lazily on access of the tableChanges.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -39,8 +39,10 @@ public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTab
     public AlterMaterializedTableAsQueryOperation(
             ObjectIdentifier tableIdentifier,
             List<TableChange> tableChanges,
-            CatalogMaterializedTable oldTable) {
-        super(tableIdentifier, tableChanges, oldTable);
+            CatalogMaterializedTable oldTable,
+            CatalogMaterializedTable newTable,

Review Comment:
   We can build that table lazily on access of getNewTable()



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeUtils.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utils for materialized table change calculation. */
+@Internal
+public class MaterializedTableChangeUtils {
+
+    public static List<TableChange> buildTableChanges(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        List<TableChange> tableChanges = new 
ArrayList<>(calculateSchemaChange(oldTable, newTable));
+
+        // Distribution
+        tableChanges.addAll(calculateDistributionChange(oldTable, newTable));
+
+        // Options
+        tableChanges.addAll(calculateOptionsChange(oldTable, newTable));
+
+        // Query
+        tableChanges.addAll(calculateQueryChange(oldTable, newTable));
+        return tableChanges;
+    }
+
+    public static List<TableChange> buildSchemaTableChanges(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Column> oldColumns = oldSchema.getColumns();
+        // Schema retrieved from query doesn't count existing non persisted 
columns
+        final List<Column> newColumns =
+                enrichWithOldNonPersistedColumns(oldSchema, 
newSchema).getColumns();
+
+        final List<Column> oldPersistedColumns = new ArrayList<>();
+        final Map<String, Tuple2<Column, Integer>> oldColumnSet = new 
HashMap<>();
+        for (int i = 0; i < oldColumns.size(); i++) {
+            Column column = oldColumns.get(i);
+            if (column.isPersisted()) {
+                oldPersistedColumns.add(oldColumns.get(i));
+            }
+            oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i), 
i));
+        }
+
+        if (oldPersistedColumns.equals(newSchema.getColumns())) {
+            // No changes for persisted columns
+            return new ArrayList<>();
+        }
+
+        List<TableChange> changes = new ArrayList<>();
+        for (int i = 0; i < newColumns.size(); i++) {
+            Column newColumn = newColumns.get(i);
+            Tuple2<Column, Integer> oldColumnToPosition = 
oldColumnSet.get(newColumn.getName());
+
+            if (oldColumnToPosition == null) {
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if position changed
+            applyPositionChanges(newColumns, oldColumnToPosition, i, changes);
+
+            Column oldColumn = oldColumnToPosition.f0;
+            // Check if column changed
+            // Note: it could be unchanged while the position is changed
+            if (oldColumn.equals(newColumn)) {
+                // no changes
+                continue;
+            }
+
+            // Check if kind changed
+            if (oldColumn.getClass() != newColumn.getClass()) {
+                changes.add(TableChange.dropColumn(oldColumn.getName()));
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if comment is changed
+            if (!Objects.equals(
+                    oldColumn.getComment().orElse(null), 
newColumn.getComment().orElse(null))) {
+                changes.add(
+                        TableChange.modifyColumnComment(
+                                oldColumn, 
newColumn.getComment().orElse(null)));
+            }
+
+            // Check if physical column type changed
+            if (oldColumn.isPhysical()
+                    && newColumn.isPhysical()
+                    && 
!oldColumn.getDataType().equals(newColumn.getDataType())) {
+                changes.add(
+                        TableChange.modifyPhysicalColumnType(oldColumn, 
newColumn.getDataType()));
+            }
+
+            // Check if metadata fields changed
+            if (oldColumn instanceof Column.MetadataColumn) {
+                applyMetadataColumnChanges(
+                        (Column.MetadataColumn) oldColumn,
+                        (Column.MetadataColumn) newColumn,
+                        changes);
+            }
+
+            // Check if computed expression changed
+            if (oldColumn instanceof Column.ComputedColumn) {
+                applyComputedColumnChanges(
+                        (Column.ComputedColumn) oldColumn,
+                        (Column.ComputedColumn) newColumn,
+                        changes);
+            }
+        }
+
+        for (Column newColumn : newColumns) {
+            oldColumnSet.remove(newColumn.getName());
+        }
+
+        for (Map.Entry<String, Tuple2<Column, Integer>> entry : 
oldColumnSet.entrySet()) {
+            changes.add(TableChange.dropColumn(entry.getKey()));
+        }
+
+        return changes;
+    }
+
+    private static ResolvedSchema enrichWithOldNonPersistedColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Integer> nonPersistedColumnIndexes = 
getNonPersistedColumnIndexes(oldSchema);
+        if (nonPersistedColumnIndexes.isEmpty()) {
+            return newSchema;
+        }
+
+        final Column[] newColumns =
+                new Column[newSchema.getColumnCount() + 
nonPersistedColumnIndexes.size()];
+        int nextFreePosition = 0;
+
+        // Preserve initial positions of non persisted columns
+        // It will allow to not generate extra column position changes
+        for (int index : nonPersistedColumnIndexes) {
+            newColumns[index] = oldSchema.getColumn(index).get();
+            if (index == nextFreePosition) {
+                nextFreePosition++;
+            }
+        }
+
+        for (int i = 0; i < newSchema.getColumnCount(); i++) {
+            newColumns[nextFreePosition] = newSchema.getColumn(i).get();
+            while (nextFreePosition < newColumns.length && 
newColumns[nextFreePosition] != null) {
+                nextFreePosition++;
+            }
+        }
+
+        return ResolvedSchema.of(newColumns);
+    }
+
+    private static List<Integer> getNonPersistedColumnIndexes(ResolvedSchema 
schema) {
+        final List<Integer> nonPersistedColumnIndexes = new ArrayList<>();
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            if (!schema.getColumn(i).get().isPersisted()) {
+                nonPersistedColumnIndexes.add(i);
+            }
+        }
+        return nonPersistedColumnIndexes;
+    }
+
+    private static void applyComputedColumnChanges(
+            Column.ComputedColumn oldColumn,
+            Column.ComputedColumn newColumn,
+            List<TableChange> changes) {
+        if (!oldColumn
+                        .getExpression()
+                        .asSerializableString()
+                        
.equals(newColumn.getExpression().asSerializableString())
+                && !Objects.equals(
+                        oldColumn.explainExtras().orElse(null),
+                        newColumn.explainExtras().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyMetadataColumnChanges(
+            Column.MetadataColumn oldColumn,
+            Column.MetadataColumn newColumn,
+            List<TableChange> changes) {
+        if (oldColumn.isVirtual() != newColumn.isVirtual()
+                || !Objects.equals(
+                        oldColumn.getMetadataKey().orElse(null),
+                        newColumn.getMetadataKey().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyPositionChanges(
+            List<Column> newColumns,
+            Tuple2<Column, Integer> oldColumnToPosition,
+            int currentPosition,
+            List<TableChange> changes) {
+        Column oldColumn = oldColumnToPosition.f0;
+        int oldPosition = oldColumnToPosition.f1;
+        if (oldPosition != currentPosition) {
+            TableChange.ColumnPosition position =
+                    currentPosition == 0
+                            ? TableChange.ColumnPosition.first()
+                            : TableChange.ColumnPosition.after(
+                                    newColumns.get(currentPosition - 
1).getName());
+            changes.add(TableChange.modifyColumnPosition(oldColumn, position));
+        }
+    }
+
+    private static List<TableChange> calculateSchemaChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+        ResolvedSchema newSchema = newTable.getResolvedSchema();
+        return MaterializedTableChangeUtils.buildSchemaTableChanges(oldSchema, 
newSchema);
+    }
+
+    private static List<TableChange> calculateDistributionChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Optional<TableDistribution> oldTableDistribution = 
oldTable.getDistribution();
+        Optional<TableDistribution> newTableDistribution = 
newTable.getDistribution();
+        if (oldTableDistribution.isEmpty() && newTableDistribution.isEmpty()) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent()
+                && newTableDistribution.isPresent()
+                && 
oldTableDistribution.get().equals(newTableDistribution.get())) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent() && 
newTableDistribution.isEmpty()) {
+            return List.of(TableChange.dropDistribution());
+        }
+
+        if (oldTableDistribution.isEmpty()) {
+            return List.of(TableChange.add(newTableDistribution.get()));
+        }
+
+        return List.of(TableChange.modify(newTableDistribution.get()));
+    }
+
+    private static List<TableChange> calculateQueryChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        String oldQuery = oldTable.getOriginalQuery();
+        String newQuery = newTable.getOriginalQuery();
+        if (!oldQuery.equals(newQuery)) {
+            return List.of(
+                    TableChange.modifyDefinitionQuery(newQuery, 
newTable.getExpandedQuery()));
+        }
+        return List.of();
+    }
+
+    private static List<TableChange> calculateOptionsChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Map<String, String> oldOptions = new HashMap<>(oldTable.getOptions());
+        Map<String, String> newOptions = new HashMap<>(newTable.getOptions());
+        if (oldOptions.equals(newOptions)) {
+            return List.of();
+        }
+
+        List<TableChange> changes = new ArrayList<>();
+        for (Map.Entry<String, String> option : oldOptions.entrySet()) {
+            if (newOptions.containsKey(option.getKey())) {

Review Comment:
   You can replace both map accesses with a simple get and !=null check.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableSchemaConverter.java:
##########
@@ -69,10 +71,15 @@ protected Operation convertToOperation(
 
         validateChanges(oldTable.getResolvedSchema(), schema, context);
 
+        final List<TableChange> tableChanges = converter.getChangesCollector();
+        final MaterializedTableChangeHandler.MaterializedTableChangeResult 
result =
+                
MaterializedTableChangeHandler.buildNewMaterializedTable(oldTable, 
tableChanges);

Review Comment:
   As mentioned above, we could do these calculations lazily. That would allow 
various places to hook into the changes / newTable before the expensive 
calculations are performed.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/MaterializedTableChangeUtils.java:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableDistribution;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Utils for materialized table change calculation. */
+@Internal
+public class MaterializedTableChangeUtils {
+
+    public static List<TableChange> buildTableChanges(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        List<TableChange> tableChanges = new 
ArrayList<>(calculateSchemaChange(oldTable, newTable));
+
+        // Distribution
+        tableChanges.addAll(calculateDistributionChange(oldTable, newTable));
+
+        // Options
+        tableChanges.addAll(calculateOptionsChange(oldTable, newTable));
+
+        // Query
+        tableChanges.addAll(calculateQueryChange(oldTable, newTable));
+        return tableChanges;
+    }
+
+    public static List<TableChange> buildSchemaTableChanges(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Column> oldColumns = oldSchema.getColumns();
+        // Schema retrieved from query doesn't count existing non persisted 
columns
+        final List<Column> newColumns =
+                enrichWithOldNonPersistedColumns(oldSchema, 
newSchema).getColumns();
+
+        final List<Column> oldPersistedColumns = new ArrayList<>();
+        final Map<String, Tuple2<Column, Integer>> oldColumnSet = new 
HashMap<>();
+        for (int i = 0; i < oldColumns.size(); i++) {
+            Column column = oldColumns.get(i);
+            if (column.isPersisted()) {
+                oldPersistedColumns.add(oldColumns.get(i));
+            }
+            oldColumnSet.put(column.getName(), Tuple2.of(oldColumns.get(i), 
i));
+        }
+
+        if (oldPersistedColumns.equals(newSchema.getColumns())) {
+            // No changes for persisted columns
+            return new ArrayList<>();
+        }
+
+        List<TableChange> changes = new ArrayList<>();
+        for (int i = 0; i < newColumns.size(); i++) {
+            Column newColumn = newColumns.get(i);
+            Tuple2<Column, Integer> oldColumnToPosition = 
oldColumnSet.get(newColumn.getName());
+
+            if (oldColumnToPosition == null) {
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if position changed
+            applyPositionChanges(newColumns, oldColumnToPosition, i, changes);
+
+            Column oldColumn = oldColumnToPosition.f0;
+            // Check if column changed
+            // Note: it could be unchanged while the position is changed
+            if (oldColumn.equals(newColumn)) {
+                // no changes
+                continue;
+            }
+
+            // Check if kind changed
+            if (oldColumn.getClass() != newColumn.getClass()) {
+                changes.add(TableChange.dropColumn(oldColumn.getName()));
+                
changes.add(TableChange.add(newColumn.copy(newColumn.getDataType())));
+                continue;
+            }
+
+            // Check if comment is changed
+            if (!Objects.equals(
+                    oldColumn.getComment().orElse(null), 
newColumn.getComment().orElse(null))) {
+                changes.add(
+                        TableChange.modifyColumnComment(
+                                oldColumn, 
newColumn.getComment().orElse(null)));
+            }
+
+            // Check if physical column type changed
+            if (oldColumn.isPhysical()
+                    && newColumn.isPhysical()
+                    && 
!oldColumn.getDataType().equals(newColumn.getDataType())) {
+                changes.add(
+                        TableChange.modifyPhysicalColumnType(oldColumn, 
newColumn.getDataType()));
+            }
+
+            // Check if metadata fields changed
+            if (oldColumn instanceof Column.MetadataColumn) {
+                applyMetadataColumnChanges(
+                        (Column.MetadataColumn) oldColumn,
+                        (Column.MetadataColumn) newColumn,
+                        changes);
+            }
+
+            // Check if computed expression changed
+            if (oldColumn instanceof Column.ComputedColumn) {
+                applyComputedColumnChanges(
+                        (Column.ComputedColumn) oldColumn,
+                        (Column.ComputedColumn) newColumn,
+                        changes);
+            }
+        }
+
+        for (Column newColumn : newColumns) {
+            oldColumnSet.remove(newColumn.getName());
+        }
+
+        for (Map.Entry<String, Tuple2<Column, Integer>> entry : 
oldColumnSet.entrySet()) {
+            changes.add(TableChange.dropColumn(entry.getKey()));
+        }
+
+        return changes;
+    }
+
+    private static ResolvedSchema enrichWithOldNonPersistedColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        final List<Integer> nonPersistedColumnIndexes = 
getNonPersistedColumnIndexes(oldSchema);
+        if (nonPersistedColumnIndexes.isEmpty()) {
+            return newSchema;
+        }
+
+        final Column[] newColumns =
+                new Column[newSchema.getColumnCount() + 
nonPersistedColumnIndexes.size()];
+        int nextFreePosition = 0;
+
+        // Preserve initial positions of non persisted columns
+        // It will allow to not generate extra column position changes
+        for (int index : nonPersistedColumnIndexes) {
+            newColumns[index] = oldSchema.getColumn(index).get();
+            if (index == nextFreePosition) {
+                nextFreePosition++;
+            }
+        }
+
+        for (int i = 0; i < newSchema.getColumnCount(); i++) {
+            newColumns[nextFreePosition] = newSchema.getColumn(i).get();
+            while (nextFreePosition < newColumns.length && 
newColumns[nextFreePosition] != null) {
+                nextFreePosition++;
+            }
+        }
+
+        return ResolvedSchema.of(newColumns);
+    }
+
+    private static List<Integer> getNonPersistedColumnIndexes(ResolvedSchema 
schema) {
+        final List<Integer> nonPersistedColumnIndexes = new ArrayList<>();
+        for (int i = 0; i < schema.getColumnCount(); i++) {
+            if (!schema.getColumn(i).get().isPersisted()) {
+                nonPersistedColumnIndexes.add(i);
+            }
+        }
+        return nonPersistedColumnIndexes;
+    }
+
+    private static void applyComputedColumnChanges(
+            Column.ComputedColumn oldColumn,
+            Column.ComputedColumn newColumn,
+            List<TableChange> changes) {
+        if (!oldColumn
+                        .getExpression()
+                        .asSerializableString()
+                        
.equals(newColumn.getExpression().asSerializableString())
+                && !Objects.equals(
+                        oldColumn.explainExtras().orElse(null),
+                        newColumn.explainExtras().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyMetadataColumnChanges(
+            Column.MetadataColumn oldColumn,
+            Column.MetadataColumn newColumn,
+            List<TableChange> changes) {
+        if (oldColumn.isVirtual() != newColumn.isVirtual()
+                || !Objects.equals(
+                        oldColumn.getMetadataKey().orElse(null),
+                        newColumn.getMetadataKey().orElse(null))) {
+            // for now there is no dedicated table change
+            changes.add(TableChange.dropColumn(oldColumn.getName()));
+            changes.add(TableChange.add(newColumn));
+        }
+    }
+
+    private static void applyPositionChanges(
+            List<Column> newColumns,
+            Tuple2<Column, Integer> oldColumnToPosition,
+            int currentPosition,
+            List<TableChange> changes) {
+        Column oldColumn = oldColumnToPosition.f0;
+        int oldPosition = oldColumnToPosition.f1;
+        if (oldPosition != currentPosition) {
+            TableChange.ColumnPosition position =
+                    currentPosition == 0
+                            ? TableChange.ColumnPosition.first()
+                            : TableChange.ColumnPosition.after(
+                                    newColumns.get(currentPosition - 
1).getName());
+            changes.add(TableChange.modifyColumnPosition(oldColumn, position));
+        }
+    }
+
+    private static List<TableChange> calculateSchemaChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        ResolvedSchema oldSchema = oldTable.getResolvedSchema();
+        ResolvedSchema newSchema = newTable.getResolvedSchema();
+        return MaterializedTableChangeUtils.buildSchemaTableChanges(oldSchema, 
newSchema);
+    }
+
+    private static List<TableChange> calculateDistributionChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Optional<TableDistribution> oldTableDistribution = 
oldTable.getDistribution();
+        Optional<TableDistribution> newTableDistribution = 
newTable.getDistribution();
+        if (oldTableDistribution.isEmpty() && newTableDistribution.isEmpty()) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent()
+                && newTableDistribution.isPresent()
+                && 
oldTableDistribution.get().equals(newTableDistribution.get())) {
+            return List.of();
+        }
+
+        if (oldTableDistribution.isPresent() && 
newTableDistribution.isEmpty()) {
+            return List.of(TableChange.dropDistribution());
+        }
+
+        if (oldTableDistribution.isEmpty()) {
+            return List.of(TableChange.add(newTableDistribution.get()));
+        }
+
+        return List.of(TableChange.modify(newTableDistribution.get()));
+    }
+
+    private static List<TableChange> calculateQueryChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        String oldQuery = oldTable.getOriginalQuery();
+        String newQuery = newTable.getOriginalQuery();
+        if (!oldQuery.equals(newQuery)) {
+            return List.of(
+                    TableChange.modifyDefinitionQuery(newQuery, 
newTable.getExpandedQuery()));
+        }
+        return List.of();
+    }
+
+    private static List<TableChange> calculateOptionsChange(
+            ResolvedCatalogMaterializedTable oldTable, 
ResolvedCatalogMaterializedTable newTable) {
+        Map<String, String> oldOptions = new HashMap<>(oldTable.getOptions());

Review Comment:
   You never modify it, so a copy is not really needed.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/materializedtable/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -55,9 +61,38 @@ protected Operation convertToOperation(
         ResolvedSchema newSchema = queryOperation.getResolvedSchema();
 
         List<TableChange> tableChanges =
-                MaterializedTableUtils.buildSchemaTableChanges(oldSchema, 
newSchema);
+                
MaterializedTableChangeUtils.buildSchemaTableChanges(oldSchema, newSchema);
         tableChanges.add(TableChange.modifyDefinitionQuery(originalQuery, 
definitionQuery));
 
-        return new AlterMaterializedTableAsQueryOperation(identifier, 
tableChanges, oldTable);
+        MaterializedTableChangeHandler.MaterializedTableChangeResult result =
+                
MaterializedTableChangeHandler.buildNewMaterializedTable(oldTable, 
tableChanges);
+        return new AlterMaterializedTableAsQueryOperation(
+                identifier,
+                tableChanges,
+                oldTable,
+                result.getNewMaterializedTable(),
+                result.getValidationErrors());
+    }
+
+    private ResolvedSchema ensureTypesNullable(ResolvedSchema schema) {

Review Comment:
   I understood that we don't want to do this as of now. I'll double-check. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to