Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu merged PR #24760: URL: https://github.com/apache/flink/pull/24760 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597382234 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -236,6 +252,213 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } +@Test +void testAlterMaterializedTableRefresh() throws Exception { +long timeout = Duration.ofSeconds(20).toMillis(); +long pause = Duration.ofSeconds(2).toMillis(); +// initialize session handle, create test-filesystem catalog and register it to catalog +// store +SessionHandle sessionHandle = initializeSession(); + +List data = new ArrayList<>(); +data.add(Row.of(1L, 1L, 1L, "2024-01-01")); +data.add(Row.of(2L, 2L, 2L, "2024-01-02")); +data.add(Row.of(3L, 3L, 3L, "2024-01-02")); +String dataId = TestValuesTableFactory.registerData(data); + +String sourceDdl = +String.format( +"CREATE TABLE my_source (\n" ++ " order_id BIGINT,\n" ++ " user_id BIGINT,\n" ++ " shop_id BIGINT,\n" ++ " order_created_at STRING\n" ++ ")\n" ++ "WITH (\n" ++ " 'connector' = 'values',\n" ++ " 'bounded' = 'true',\n" ++ " 'data-id' = '%s'\n" ++ ")", +dataId); +service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + +String materializedTableDDL = +"CREATE MATERIALIZED TABLE my_materialized_table" ++ " PARTITIONED BY (ds)\n" ++ " WITH(\n" ++ " 'format' = 'debezium-json'\n" ++ " )\n" ++ " FRESHNESS = INTERVAL '2' SECOND\n" ++ " AS SELECT \n" ++ " user_id,\n" ++ " shop_id,\n" ++ " ds,\n" ++ " COUNT(order_id) AS order_cnt\n" ++ " FROM (\n" ++ "SELECT user_id, shop_id, order_created_at AS ds, order_id FROM my_source" ++ " ) AS tmp\n" ++ " GROUP BY (user_id, shop_id, ds)"; + +OperationHandle materializedTableHandle = +service.executeStatement( +sessionHandle, materializedTableDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, materializedTableHandle); + +// verify data exists in materialized table +CommonTestUtils.waitUtil( +() -> +fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() +== data.size(), +Duration.ofMillis(timeout), +Duration.ofMillis(pause), +"Failed to verify the data in materialized table."); +assertThat( +fetchTableData( +sessionHandle, +"SELECT * FROM my_materialized_table where ds = '2024-01-02'") +.size()) +.isEqualTo(2); + +// remove the last element +data.remove(2); + +long currentTime = System.currentTimeMillis(); +String alterStatement = +"ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds = '2024-01-02')"; +OperationHandle alterHandle = +service.executeStatement(sessionHandle, alterStatement, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, alterHandle); +List result = fetchAllResults(service, sessionHandle, alterHandle); +assertThat(result.size()).isEqualTo(1); +String jobId = result.get(0).getString(0).toString(); + +MiniCluster miniCluster = MINI_CLUSTER.getMiniCluster(); Review Comment: Got it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597376170 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597372269 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamil
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597329094 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.get()); +} + +try { +LOG.debug( +"Begin to manually refreshing the materialization table {}, statement: {}", +materializedTableIdentifier, +insertStatement); +return operationExecutor.executeStatement( +handle, customConfig, insertStatement.toString()); +} catch (Exception e) { +// log and throw exception +LOG.error( +"Manually refreshing the materialization table {} occur exception.", +materializedTableIdentifier, +e); +throw new SqlExecutionException( +String.format( +"Manually refreshing the materialization table %s occur exception.", +materializedTableIdentifier), +e); +} +} + +private static void validatePartitionSpec( +Map partitionSpec, ResolvedCatalogMaterializedTable table) { +ResolvedSchema schema = table.getResolvedSchema(); +Set allPartitionKeys = new HashSet<>(table.getPartitionKeys()); + +Set unknownPartitionKeys = new HashSet<>(); +Set nonStringPartitionKeys = new HashSet<>(); + +for (String partitionKey : partitionSpec.keySet()) { +if (!schema.getColumn(partitionKey).isPresent()) { +unknownPartitionKeys.add(partitionKey); +continue; +} + +if (!schema.getColumn(partitionKey) +.get() +.getDataType() +.getLogicalType() +.getTypeRoot() +.getFamilies() +.contains(LogicalTypeFamily.C
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
lsyldliu commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1597326389 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT OVERWRITE %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); Review Comment: I think the WHERE clause could be better formatted on a new line ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -161,14 +176,131 @@ private static void createMaterializedInContinuousMode( "Submit continuous refresh job for materialized table {} occur exception.", materializedTableIdentifier, e); -throw new TableException( +throw new SqlExecutionException( String.format( "Submit continuous refresh job for materialized table %s occur exception.", materializedTableIdentifier), e); } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (MATERIALIZED_TABLE != table.getTableKind()) { +throw new ValidationException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +validatePartitionSpec(partitionSpec, (ResolvedCatalogMaterializedTable) table); + +// Set job name, runtime mode +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableSt
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on PR #24760: URL: https://github.com/apache/flink/pull/24760#issuecomment-2104649932 Hi, @lsyldliu all comments have been addressed. Can you take a look again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1596779798 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -236,6 +241,80 @@ void testCreateMaterializedTableInFullMode() { "Only support create materialized table in continuous refresh mode currently."); } +@Test +void testAlterMaterializedTableRefresh() throws Exception { +// initialize session handle, create test-filesystem catalog and register it to catalog +// store +SessionHandle sessionHandle = initializeSession(); + +String materializedTableDDL = +"CREATE MATERIALIZED TABLE users_shops" ++ " PARTITIONED BY (ds)\n" ++ " WITH(\n" ++ " 'format' = 'debezium-json'\n" ++ " )\n" ++ " FRESHNESS = INTERVAL '30' SECOND\n" ++ " AS SELECT \n" ++ " user_id,\n" ++ " shop_id,\n" ++ " ds,\n" Review Comment: use values source to check data overwrite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1594070565 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (!(table instanceof ResolvedCatalogMaterializedTable)) { +throw new TableException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +Set allPartitionKeys = +new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); +Set unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); +unknownPartitionKeys.removeAll(allPartitionKeys); +if (!unknownPartitionKeys.isEmpty()) { +throw new TableException( +String.format( +"The partition spec contains unknown partition keys: %s.", +unknownPartitionKeys)); +} + +// Set job name, runtime mode, checkpoint interval +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT INTO %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.orElseThrow(() -> new TableException("Could not happen"))); +} + +try { +// return jobId for one time refresh, user should get the refresh job info via desc Review Comment: I want to note the developer the information about the syntax "alter materialized table ... refresh ..." returns before. I'll delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1595409695 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (!(table instanceof ResolvedCatalogMaterializedTable)) { +throw new TableException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +Set allPartitionKeys = +new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); +Set unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); +unknownPartitionKeys.removeAll(allPartitionKeys); +if (!unknownPartitionKeys.isEmpty()) { Review Comment: Choose 1. We can only support string type at first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1594070565 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (!(table instanceof ResolvedCatalogMaterializedTable)) { +throw new TableException( +String.format( +"The table '%s' is not a materialized table.", +materializedTableIdentifier)); +} + +ResolvedCatalogMaterializedTable materializedTable = +(ResolvedCatalogMaterializedTable) table; + +Map partitionSpec = +alterMaterializedTableRefreshOperation.getPartitionSpec(); + +Set allPartitionKeys = +new HashSet<>(((ResolvedCatalogMaterializedTable) table).getPartitionKeys()); +Set unknownPartitionKeys = new HashSet<>(partitionSpec.keySet()); +unknownPartitionKeys.removeAll(allPartitionKeys); +if (!unknownPartitionKeys.isEmpty()) { +throw new TableException( +String.format( +"The partition spec contains unknown partition keys: %s.", +unknownPartitionKeys)); +} + +// Set job name, runtime mode, checkpoint interval +Configuration customConfig = new Configuration(); +String jobName = +String.format( +"Materialized_table_%s_one_time_refresh_job", +materializedTableIdentifier.asSerializableString()); +customConfig.set(NAME, jobName); +customConfig.set(RUNTIME_MODE, BATCH); + +StringBuilder insertStatement = +new StringBuilder( +String.format( +"INSERT INTO %s SELECT * FROM (%s)", +materializedTableIdentifier, +materializedTable.getDefinitionQuery())); + +if (!partitionSpec.isEmpty()) { +insertStatement.append(" WHERE "); +insertStatement.append( +partitionSpec.entrySet().stream() +.map( +entry -> +String.format( +"%s = '%s'", entry.getKey(), entry.getValue())) +.reduce((s1, s2) -> s1 + " AND " + s2) +.orElseThrow(() -> new TableException("Could not happen"))); +} + +try { +// return jobId for one time refresh, user should get the refresh job info via desc Review Comment: I want to notice the developer the information about the syntax "alter materialized table ... refresh ..." returns before. I'll delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1594064792 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -169,6 +181,83 @@ private static void createMaterializedInContinuousMode( } } +private static ResultFetcher callAlterMaterializedTableRefreshOperation( +OperationExecutor operationExecutor, +OperationHandle handle, +AlterMaterializedTableRefreshOperation alterMaterializedTableRefreshOperation) { +ObjectIdentifier materializedTableIdentifier = +alterMaterializedTableRefreshOperation.getTableIdentifier(); +ResolvedCatalogBaseTable table = operationExecutor.getTable(materializedTableIdentifier); +if (!(table instanceof ResolvedCatalogMaterializedTable)) { +throw new TableException( Review Comment: Got it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35198][table] Support the execution of refresh materialized table [flink]
xuyangzhong commented on code in PR #24760: URL: https://github.com/apache/flink/pull/24760#discussion_r1594018523 ## flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterMaterializedTableRefresh.java: ## @@ -58,4 +58,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer, getOperator().getLeftPrec(), getOperator().getRightPrec()); } } + +@Nullable Review Comment: I just notice that in construct, `partitionSpec` is tagged `@Nullable`. I'll update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org