xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); - throw new TableException( + throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } + private static ResultFetcher callAlterMaterializedTableRefreshOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { + ObjectIdentifier materializedTableIdentifier = + alterMaterializedTableRefreshOperation.getTableIdentifier(); + ResolvedCatalogBaseTable<?> table = operationExecutor.getTable(materializedTableIdentifier); + if (MATERIALIZED_TABLE != table.getTableKind()) { + throw new ValidationException( + String.format( + "The table '%s' is not a materialized table.", + materializedTableIdentifier)); + } + + ResolvedCatalogMaterializedTable materializedTable = + (ResolvedCatalogMaterializedTable) table; + + Map<String, String> partitionSpec = + alterMaterializedTableRefreshOperation.getPartitionSpec(); + + validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + + // Set job name, runtime mode + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_one_time_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, BATCH); + + StringBuilder insertStatement = + new StringBuilder( + String.format( + "INSERT OVERWRITE %s SELECT * FROM (%s)", + materializedTableIdentifier, + materializedTable.getDefinitionQuery())); + + if (!partitionSpec.isEmpty()) { + insertStatement.append(" WHERE "); + insertStatement.append( + partitionSpec.entrySet().stream() + .map( + entry -> + String.format( + "%s = '%s'", entry.getKey(), entry.getValue())) + .reduce((s1, s2) -> s1 + " AND " + s2) + .get()); + } + + try { + LOG.debug( + "Begin to manually refreshing the materialization table {}, statement: {}", + materializedTableIdentifier, + insertStatement); + return operationExecutor.executeStatement( + handle, customConfig, insertStatement.toString()); + } catch (Exception e) { + // log and throw exception + LOG.error( + "Manually refreshing the materialization table {} occur exception.", + materializedTableIdentifier, + e); + throw new SqlExecutionException( + String.format( + "Manually refreshing the materialization table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static void validatePartitionSpec( + Map<String, String> partitionSpec, ResolvedCatalogMaterializedTable table) { + ResolvedSchema schema = table.getResolvedSchema(); + Set<String> allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + + Set<String> unknownPartitionKeys = new HashSet<>(); + Set<String> nonStringPartitionKeys = new HashSet<>(); + + for (String partitionKey : partitionSpec.keySet()) { + if (!schema.getColumn(partitionKey).isPresent()) { + unknownPartitionKeys.add(partitionKey); + continue; + } + + if (!schema.getColumn(partitionKey) + .get() + .getDataType() + .getLogicalType() + .getTypeRoot() + .getFamilies() + .contains(LogicalTypeFamily.CHARACTER_STRING)) { + nonStringPartitionKeys.add(partitionKey); + } + } + + if (!unknownPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "The partition spec contains unknown partition keys: [%s]. All known partition keys are: [%s].", + unknownPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")), + allPartitionKeys.stream() + .collect(Collectors.joining("', '", "'", "'")))); + } + + if (!nonStringPartitionKeys.isEmpty()) { + throw new ValidationException( + String.format( + "Currently, specifying non-char or non-string type partition fields" Review Comment: what about `"Currently, manually refreshing materialized table only supports specifying char and string type partition keys. All specific partition keys with unsupported types are:\n\n%s."`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org