hackergin commented on code in PR #24844: URL: https://github.com/apache/flink/pull/24844#discussion_r1616305580
########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient RestClusterClient<?> restClu .asSerializableString())); } - private SessionHandle initializeSession() { - SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); - String catalogDDL = - String.format( - "CREATE CATALOG %s\n" - + "WITH (\n" - + " 'type' = 'test-filesystem',\n" - + " 'path' = '%s',\n" - + " 'default-database' = '%s'\n" - + " )", - fileSystemCatalogName, fileSystemCatalogPath, TEST_DEFAULT_DATABASE); - service.configureSession(sessionHandle, catalogDDL, -1); - service.configureSession( - sessionHandle, String.format("USE CATALOG %s", fileSystemCatalogName), -1); - - // create source table - String dataGenSource = - "CREATE TABLE datagenSource (\n" - + " order_id BIGINT,\n" - + " order_number VARCHAR(20),\n" - + " user_id BIGINT,\n" - + " shop_id BIGINT,\n" - + " product_id BIGINT,\n" - + " status BIGINT,\n" - + " order_type BIGINT,\n" - + " order_created_at TIMESTAMP,\n" - + " payment_amount_cents BIGINT\n" - + ")\n" - + "WITH (\n" - + " 'connector' = 'datagen',\n" - + " 'rows-per-second' = '10'\n" - + ")"; - service.configureSession(sessionHandle, dataGenSource, -1); - return sessionHandle; + @Test + void testRefreshMaterializedTable() throws Exception { + long timeout = Duration.ofSeconds(20).toMillis(); + long pause = Duration.ofSeconds(2).toMillis(); + + List<Row> data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-01")); + data.add(Row.of(3L, 3L, 3L, "2024-01-02")); + data.add(Row.of(4L, 4L, 4L, "2024-01-02")); + data.add(Row.of(5L, 5L, 5L, "2024-01-03")); + data.add(Row.of(6L, 6L, 6L, "2024-01-03")); + String dataId = TestValuesTableFactory.registerData(data); + + createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId, data); + + // remove element of partition '2024-01-02' + removePartitionValue(data, "2024-01-02"); + + // refresh the materialized table with static partition + long startTime = System.currentTimeMillis(); + Map<String, String> staticPartitions = new HashMap<>(); + staticPartitions.put("ds", "2024-01-02"); + ObjectIdentifier objectIdentifier = + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table"); + OperationHandle refreshTableHandle = + service.refreshMaterializedTable( + sessionHandle, + objectIdentifier.asSerializableString(), + false, + null, + Collections.emptyMap(), + staticPartitions, + Collections.emptyMap()); + + awaitOperationTermination(service, sessionHandle, refreshTableHandle); + List<RowData> result = fetchAllResults(service, sessionHandle, refreshTableHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + // 1. verify fresh job created + verifyRefreshJobCreated(restClusterClient, jobId, startTime); + + // 2. verify the new job overwrite the data + CommonTestUtils.waitUtil( + () -> + fetchTableData(sessionHandle, "SELECT * FROM my_materialized_table").size() Review Comment: The data being queried here is for the entire table. To avoid misunderstanding, this test has been removed. In theory, we only need to check the partitions that have been refreshed and the partitions that have not been refreshed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org