lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1593542865
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -65,6 +72,11 @@ public static ResultFetcher callMaterializedTableOperation( return callCreateMaterializedTableOperation( operationExecutor, handle, (CreateMaterializedTableOperation) op); } + if (op instanceof AlterMaterializedTableRefreshOperation) { Review Comment: ```suggestion else if (op instanceof AlterMaterializedTableRefreshOperation) { ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", Review Comment: This should be: ```INSERT OVERWRITE %s SELECT * FROM (%s)``` Refresh manually should be a table or partition granularity overwrite. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc + // job. + return operationExecutor.executeStatement(handle, insertStatement.toString()); Review Comment: ```suggestion return operationExecutor.executeStatement(handle, customConfig, insertStatement.toString()); ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc + // job. + return operationExecutor.executeStatement(handle, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Refresh job manually for materialized table {} occur exception.", + materializedTableIdentifier, + e); + throw new TableException( + String.format( + "Refresh job manually for materialized table %s occur exception.", Review Comment: ditto ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // check unknown partition keys + String alterStatementWithUnknownPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = '2023-01-01')"; + OperationHandle alterStatementWithUnknownPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithUnknownPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithUnknownPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(TableException.class) + .hasMessage("The partition spec contains unknown partition keys: [ds2]."); + + // check valid statement + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = '2023-01-01')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List<RowData> result = fetchAllResults(sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); Review Comment: Can you check whether this job is a Flink batch job? Can you also check the result counts? ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" Review Comment: I think we should control the ds value generated by datagen, then, so we can the result for the refresh statement. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc + // job. + return operationExecutor.executeStatement(handle, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Refresh job manually for materialized table {} occur exception.", Review Comment: ```suggestion "Manually refreshing the materialization table {} occur exception.", ``` ########## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java: ########## @@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); } } + + @Nullable Review Comment: `partitionSpec` must not be null, so this annotation is not necessary. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableRefreshOperation.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Map; + +/** + * Operation to describe clause like: ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name + * REFRESH [PARTITION (key1=val1, key2=val2, ...)]. + */ +@Internal +public class AlterMaterializedTableRefreshOperation extends AlterMaterializedTableOperation { Review Comment: please add some test in `SqlMaterializedTableNodeToOperationConverterTest`. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval Review Comment: ```suggestion // Set job name, runtime mode ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableRefreshConverter.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.SqlProperty; +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableRefresh; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; +import org.apache.flink.util.Preconditions; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** A converter for {@link SqlAlterMaterializedTableRefresh}. */ +public class SqlAlterMaterializedTableRefreshConverter + implements SqlNodeConverter<SqlAlterMaterializedTableRefresh> { + + @Override + public Operation convertSqlNode(SqlAlterMaterializedTableRefresh node, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + Map<String, String> partitionSpec = new LinkedHashMap<>(); + if (node.getPartitionSpec() != null) { Review Comment: Please use `SqlPartitionUtils#getPartitionKVs` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( Review Comment: I think we should unify the exception type to `SqlExecutionException` in SqlGateway. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { Review Comment: ```suggestion if (MATERIALIZED_TABLE != table.getTableKind()) { ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { Review Comment: Only this check is not enough. Here I need to add some more context. When creating a Materialized Table, the specified partition key field may contain both String and non-String types. Suppose `p1` is string and `p2` is int. The user writes the following refresh statement: ``` ALTER MATERIALIZED TABLE t1 REFRESH PARTITION(p1 = '20240507', p2 = 2) ``` Although p2 is not a string type, we uniformly converted it to string type to handle it at the time of Parser. Based on the above background, there are two strategies for handling WHERE conditions when splicing INSERT statements: 1. the first strategy is to check the type of the partition key field, if it is not string type, directly thrown an error, that is, does not support the Refresh partition specified in the non-String type of the partition key. in fact, this demand should also be rare 2. when splicing WHERE conditions, for String type fields, you need to splice into “%s = ‘%s’”, non-String type splice into “%s = %s”, of course, I'm not sure if this can completely cover all non-String types. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // check unknown partition keys + String alterStatementWithUnknownPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = '2023-01-01')"; + OperationHandle alterStatementWithUnknownPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithUnknownPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithUnknownPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(TableException.class) + .hasMessage("The partition spec contains unknown partition keys: [ds2]."); + + // check valid statement + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = '2023-01-01')"; + OperationHandle alterHandle = + service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterHandle); + List<RowData> result = fetchAllResults(sessionHandle, alterHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + OperationHandle describeJobOperationHandle = + service.executeStatement( + sessionHandle, + String.format("DESCRIBE JOB '%s'", jobId), + -1, + new Configuration()); + + result = fetchAllResults(sessionHandle, describeJobOperationHandle); + assertThat(result.size()).isEqualTo(1); + RowData jobRow = result.get(0); + assertThat(jobRow.getTimestamp(3, 3).getMillisecond()).isGreaterThan(currentTime); Review Comment: I think the job status should be finished, it also needs to be checked. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -271,4 +350,17 @@ private SessionHandle initializeSession() { service.configureSession(sessionHandle, dataGenSource, -1); return sessionHandle; } + + private List<RowData> fetchAllResults( Review Comment: This should be extracted as a util method, SqlGatewayServiceITCase also has this method. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } + @Test + void testAlterMaterializedTableRefresh() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // check unknown partition keys + String alterStatementWithUnknownPartitionKey = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds2 = '2023-01-01')"; + OperationHandle alterStatementWithUnknownPartitionKeyHandle = + service.executeStatement( + sessionHandle, + alterStatementWithUnknownPartitionKey, + -1, + new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + alterStatementWithUnknownPartitionKeyHandle)) + .isInstanceOf(SqlExecutionException.class) + .rootCause() + .isInstanceOf(TableException.class) + .hasMessage("The partition spec contains unknown partition keys: [ds2]."); + + // check valid statement + long currentTime = System.currentTimeMillis(); + String alterStatement = + "ALTER MATERIALIZED TABLE users_shops REFRESH PARTITION (ds = '2023-01-01')"; Review Comment: We should also check the non-string partition key. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc + // job. + return operationExecutor.executeStatement(handle, insertStatement.toString()); Review Comment: Please also log the insert overwrite statement for troubleshooting. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); Review Comment: This error msg looks a bit strange. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (!(table instanceof ResolvedCatalogMaterializedTable)) { + throw new TableException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + Set<String> allPartitionKeys = + new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); + Set<String> unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); + unknownPartitionKeys.removeAll(allPartitionKeys); + if (!unknownPartitionKeys.isEmpty()) { + throw new TableException( + String.format( + "The partition spec contains unknown partition keys: %s.", + unknownPartitionKeys)); + } + + // Set job name, runtime mode, checkpoint interval + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT INTO %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .orElseThrow(() -> new TableException("Could not happen"))); + } + + try { + // return jobId for one time refresh, user should get the refresh job info via desc Review Comment: This comment is redundant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org