lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1593542865


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -65,6 +72,11 @@ public static ResultFetcher callMaterializedTableOperation(
             return callCreateMaterializedTableOperation(
                     operationExecutor, handle, 
(CreateMaterializedTableOperation) op);
         }
+        if (op instanceof AlterMaterializedTableRefreshOperation) {

Review Comment:
   ```suggestion
           else if (op instanceof AlterMaterializedTableRefreshOperation) {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",

Review Comment:
   This should be:
   ```INSERT OVERWRITE %s SELECT * FROM (%s)```
   
   Refresh manually should be a table or partition granularity overwrite.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));
+        }
+
+        try {
+            // return jobId for one time refresh, user should get the refresh 
job info via desc
+            // job.
+            return operationExecutor.executeStatement(handle, 
insertStatement.toString());

Review Comment:
   ```suggestion
               return operationExecutor.executeStatement(handle, customConfig, 
insertStatement.toString());
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));
+        }
+
+        try {
+            // return jobId for one time refresh, user should get the refresh 
job info via desc
+            // job.
+            return operationExecutor.executeStatement(handle, 
insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Refresh job manually for materialized table {} occur 
exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new TableException(
+                    String.format(
+                            "Refresh job manually for materialized table %s 
occur exception.",

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // check unknown partition keys
+        String alterStatementWithUnknownPartitionKey =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 
'2023-01-01')";
+        OperationHandle alterStatementWithUnknownPartitionKeyHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        alterStatementWithUnknownPartitionKey,
+                        -1,
+                        new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
alterStatementWithUnknownPartitionKeyHandle))
+                .isInstanceOf(SqlExecutionException.class)
+                .rootCause()
+                .isInstanceOf(TableException.class)
+                .hasMessage("The partition spec contains unknown partition 
keys: [ds2].");
+
+        // check valid statement
+        long currentTime = System.currentTimeMillis();
+        String alterStatement =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = 
'2023-01-01')";
+        OperationHandle alterHandle =
+                service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, alterHandle);
+        List<RowData> result = fetchAllResults(sessionHandle, alterHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();

Review Comment:
   Can you check whether this job is a Flink batch job? Can you also check the 
result counts?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"

Review Comment:
   I think we should control the ds value generated by datagen, then, so we can 
the result for the refresh statement.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));
+        }
+
+        try {
+            // return jobId for one time refresh, user should get the refresh 
job info via desc
+            // job.
+            return operationExecutor.executeStatement(handle, 
insertStatement.toString());
+        } catch (Exception e) {
+            // log and throw exception
+            LOG.error(
+                    "Refresh job manually for materialized table {} occur 
exception.",

Review Comment:
   ```suggestion
                       "Manually refreshing the materialization table {} occur 
exception.",
   ```



##########
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java:
##########
@@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int 
rightPrec) {
                     writer, getOperator().getLeftPrec(), 
getOperator().getRightPrec());
         }
     }
+
+    @Nullable

Review Comment:
   `partitionSpec` must not be null, so this annotation is not necessary.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Map;
+
+/**
+ * Operation to describe clause like: ALTER MATERIALIZED TABLE 
[catalog_name.][db_name.]table_name
+ * REFRESH [PARTITION (key1=val1, key2=val2, ...)].
+ */
+@Internal
+public class AlterMaterializedTableRefreshOperation extends 
AlterMaterializedTableOperation {

Review Comment:
   please add some test in `SqlMaterializedTableNodeToOperationConverterTest`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval

Review Comment:
   ```suggestion
           // Set job name, runtime mode
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** A converter for {@link SqlAlterMaterializedTableRefresh}. */
+public class SqlAlterMaterializedTableRefreshConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableRefresh> {
+
+    @Override
+    public Operation convertSqlNode(SqlAlterMaterializedTableRefresh node, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier = 
UnresolvedIdentifier.of(node.fullTableName());
+        ObjectIdentifier identifier =
+                
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+
+        Map<String, String> partitionSpec = new LinkedHashMap<>();
+        if (node.getPartitionSpec() != null) {

Review Comment:
   Please use `SqlPartitionUtils#getPartitionKVs`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(

Review Comment:
   I think we should unify the exception type to `SqlExecutionException` in 
SqlGateway.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {

Review Comment:
   ```suggestion
           if (MATERIALIZED_TABLE != table.getTableKind()) {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {

Review Comment:
   Only this check is not enough. Here I need to add some more context. When 
creating a Materialized Table, the specified partition key field may contain 
both String and non-String types. Suppose `p1` is string and `p2` is int. The 
user writes the following refresh statement:
   ```
   ALTER MATERIALIZED TABLE t1 REFRESH PARTITION(p1 = '20240507', p2 = 2)
   ```
   Although p2 is not a string type, we uniformly converted it to string type 
to handle it at the time of Parser.
   
   Based on the above background, there are two strategies for handling WHERE 
conditions when splicing INSERT statements:
   1. the first strategy is to check the type of the partition key field, if it 
is not string type, directly thrown an error, that is, does not support the 
Refresh partition specified in the non-String type of the partition key. in 
fact, this demand should also be rare
   2. when splicing WHERE conditions, for String type fields, you need to 
splice into “%s = ‘%s’”, non-String type splice into “%s = %s”, of course, I'm 
not sure if this can completely cover all non-String types.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // check unknown partition keys
+        String alterStatementWithUnknownPartitionKey =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 
'2023-01-01')";
+        OperationHandle alterStatementWithUnknownPartitionKeyHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        alterStatementWithUnknownPartitionKey,
+                        -1,
+                        new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
alterStatementWithUnknownPartitionKeyHandle))
+                .isInstanceOf(SqlExecutionException.class)
+                .rootCause()
+                .isInstanceOf(TableException.class)
+                .hasMessage("The partition spec contains unknown partition 
keys: [ds2].");
+
+        // check valid statement
+        long currentTime = System.currentTimeMillis();
+        String alterStatement =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = 
'2023-01-01')";
+        OperationHandle alterHandle =
+                service.executeStatement(sessionHandle, alterStatement, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, alterHandle);
+        List<RowData> result = fetchAllResults(sessionHandle, alterHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();
+
+        OperationHandle describeJobOperationHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        String.format("DESCRIBE JOB '%s'", jobId),
+                        -1,
+                        new Configuration());
+
+        result = fetchAllResults(sessionHandle, describeJobOperationHandle);
+        assertThat(result.size()).isEqualTo(1);
+        RowData jobRow = result.get(0);
+        assertThat(jobRow.getTimestamp(3, 
3).getMillisecond()).isGreaterThan(currentTime);

Review Comment:
   I think the job status should be finished, it also needs to be checked.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -271,4 +350,17 @@ private SessionHandle initializeSession() {
         service.configureSession(sessionHandle, dataGenSource, -1);
         return sessionHandle;
     }
+
+    private List<RowData> fetchAllResults(

Review Comment:
   This should be extracted as a util method, SqlGatewayServiceITCase also has 
this method.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableRefresh() throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // check unknown partition keys
+        String alterStatementWithUnknownPartitionKey =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = 
'2023-01-01')";
+        OperationHandle alterStatementWithUnknownPartitionKeyHandle =
+                service.executeStatement(
+                        sessionHandle,
+                        alterStatementWithUnknownPartitionKey,
+                        -1,
+                        new Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
alterStatementWithUnknownPartitionKeyHandle))
+                .isInstanceOf(SqlExecutionException.class)
+                .rootCause()
+                .isInstanceOf(TableException.class)
+                .hasMessage("The partition spec contains unknown partition 
keys: [ds2].");
+
+        // check valid statement
+        long currentTime = System.currentTimeMillis();
+        String alterStatement =
+                "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = 
'2023-01-01')";

Review Comment:
   We should also check the non-string partition key.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));
+        }
+
+        try {
+            // return jobId for one time refresh, user should get the refresh 
job info via desc
+            // job.
+            return operationExecutor.executeStatement(handle, 
insertStatement.toString());

Review Comment:
   Please also log the insert overwrite statement for troubleshooting.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));

Review Comment:
   This error msg looks a bit strange.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableRefreshOperation 
alterMaterializedTableRefreshOperation) {
+        ObjectIdentifier materializedTableIdentifier =
+                alterMaterializedTableRefreshOperation.getTableIdentifier();
+        ResolvedCatalogBaseTable<?> table = 
operationExecutor.getTable(materializedTableIdentifier);
+        if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+            throw new TableException(
+                    String.format(
+                            "The table '%s' is not a materialized table.",
+                            materializedTableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable materializedTable =
+                (ResolvedCatalogMaterializedTable) table;
+
+        Map<String, String> partitionSpec =
+                alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+        Set<String> allPartitionKeys =
+                new HashSet<>(((ResolvedCatalogMaterializedTable) 
table).getPartitionKeys());
+        Set<String> unknownPartitionKeys = new 
HashSet<>(partitionSpec.keySet());
+        unknownPartitionKeys.removeAll(allPartitionKeys);
+        if (!unknownPartitionKeys.isEmpty()) {
+            throw new TableException(
+                    String.format(
+                            "The partition spec contains unknown partition 
keys: %s.",
+                            unknownPartitionKeys));
+        }
+
+        // Set job name, runtime mode, checkpoint interval
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_one_time_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, BATCH);
+
+        StringBuilder insertStatement =
+                new StringBuilder(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM (%s)",
+                                materializedTableIdentifier,
+                                materializedTable.getDefinitionQuery()));
+
+        if (!partitionSpec.isEmpty()) {
+            insertStatement.append(" WHERE ");
+            insertStatement.append(
+                    partitionSpec.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "%s = '%s'", 
entry.getKey(), entry.getValue()))
+                            .reduce((s1, s2) -> s1 + " AND " + s2)
+                            .orElseThrow(() -> new TableException("Could not 
happen")));
+        }
+
+        try {
+            // return jobId for one time refresh, user should get the refresh 
job info via desc

Review Comment:
   This comment is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to