lsyldliu commented on code in PR #24760:
URL: https://github.com/apache/flink/pull/24760#discussion_r1593542865
##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -65,6 +72,11 @@ public static ResultFetcher callMaterializedTableOperation(
return callCreateMaterializedTableOperation(
operationExecutor, handle,
(CreateMaterializedTableOperation) op);
}
+if (op instanceof AlterMaterializedTableRefreshOperation) {
Review Comment:
```suggestion
else if (op instanceof AlterMaterializedTableRefreshOperation) {
```
##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
}
}
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table =
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable)
table).getPartitionKeys());
+Set unknownPartitionKeys = new
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown partition
keys: %s.",
+unknownPartitionKeys));
+}
+
+// Set job name, runtime mode, checkpoint interval
+Configuration customConfig = new Configuration();
+String jobName =
+String.format(
+"Materialized_table_%s_one_time_refresh_job",
+materializedTableIdentifier.asSerializableString());
+customConfig.set(NAME, jobName);
+customConfig.set(RUNTIME_MODE, BATCH);
+
+StringBuilder insertStatement =
+new StringBuilder(
+String.format(
+"INSERT INTO %s SELECT * FROM (%s)",
Review Comment:
This should be:
```INSERT OVERWRITE %s SELECT * FROM (%s)```
Refresh manually should be a table or partition granularity overwrite.
##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode(
}
}
+private static ResultFetcher callAlterMaterializedTableRefreshOperation(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+AlterMaterializedTableRefreshOperation
alterMaterializedTableRefreshOperation) {
+ObjectIdentifier materializedTableIdentifier =
+alterMaterializedTableRefreshOperation.getTableIdentifier();
+ResolvedCatalogBaseTable table =
operationExecutor.getTable(materializedTableIdentifier);
+if (!(table instanceof ResolvedCatalogMaterializedTable)) {
+throw new TableException(
+String.format(
+"The table '%s' is not a materialized table.",
+materializedTableIdentifier));
+}
+
+ResolvedCatalogMaterializedTable materializedTable =
+(ResolvedCatalogMaterializedTable) table;
+
+Map partitionSpec =
+alterMaterializedTableRefreshOperation.getPartitionSpec();
+
+Set allPartitionKeys =
+new HashSet<>(((ResolvedCatalogMaterializedTable)
table).getPartitionKeys());
+Set unknownPartitionKeys = new
HashSet<>(partitionSpec.keySet());
+unknownPartitionKeys.removeAll(allPartitionKeys);
+if (!unknownPartitionKeys.isEmpty()) {
+throw new TableException(
+String.format(
+"The partition spec contains unknown part