hackergin commented on code in PR #24844: URL: https://github.com/apache/flink/pull/24844#discussion_r1616897054
########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -909,47 +697,240 @@ void testDropMaterializedTable(@InjectClusterClient RestClusterClient<?> restClu .asSerializableString())); } - private SessionHandle initializeSession() { - SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); - String catalogDDL = - String.format( - "CREATE CATALOG %s\n" - + "WITH (\n" - + " 'type' = 'test-filesystem',\n" - + " 'path' = '%s',\n" - + " 'default-database' = '%s'\n" - + " )", - fileSystemCatalogName, fileSystemCatalogPath, TEST_DEFAULT_DATABASE); - service.configureSession(sessionHandle, catalogDDL, -1); - service.configureSession( - sessionHandle, String.format("USE CATALOG %s", fileSystemCatalogName), -1); - - // create source table - String dataGenSource = - "CREATE TABLE datagenSource (\n" - + " order_id BIGINT,\n" - + " order_number VARCHAR(20),\n" - + " user_id BIGINT,\n" - + " shop_id BIGINT,\n" - + " product_id BIGINT,\n" - + " status BIGINT,\n" - + " order_type BIGINT,\n" - + " order_created_at TIMESTAMP,\n" - + " payment_amount_cents BIGINT\n" - + ")\n" - + "WITH (\n" - + " 'connector' = 'datagen',\n" - + " 'rows-per-second' = '10'\n" - + ")"; - service.configureSession(sessionHandle, dataGenSource, -1); - return sessionHandle; + @Test + void testRefreshMaterializedTableWithStaticPartition() throws Exception { + List<Row> data = new ArrayList<>(); + data.add(Row.of(1L, 1L, 1L, "2024-01-01")); + data.add(Row.of(2L, 2L, 2L, "2024-01-02")); + String dataId = TestValuesTableFactory.registerData(data); + + createAndVerifyCreateMaterializedTableWithData( + "my_materialized_table", + dataId, + data, + Collections.singletonMap("ds", "yyyy-MM-dd")); + + ObjectIdentifier objectIdentifier = + ObjectIdentifier.of( + fileSystemCatalogName, TEST_DEFAULT_DATABASE, "my_materialized_table"); + + // add more data to all data list + data.add(Row.of(3L, 3L, 3L, "2024-01-01")); + data.add(Row.of(4L, 4L, 4L, "2024-01-02")); + + // refresh the materialized table with static partition + long startTime = System.currentTimeMillis(); + Map<String, String> staticPartitions = new HashMap<>(); + staticPartitions.put("ds", "2024-01-02"); + + OperationHandle refreshTableHandle = + service.refreshMaterializedTable( + sessionHandle, + objectIdentifier.asSerializableString(), + false, + null, + Collections.emptyMap(), + staticPartitions, + Collections.emptyMap()); + awaitOperationTermination(service, sessionHandle, refreshTableHandle); + List<RowData> result = fetchAllResults(service, sessionHandle, refreshTableHandle); + assertThat(result.size()).isEqualTo(1); + String jobId = result.get(0).getString(0).toString(); + + // 1. verify fresh job created + verifyRefreshJobCreated(restClusterClient, jobId, startTime); + + // 2. verify the new job overwrite the data + assertThat( + fetchTableData( + sessionHandle, + "SELECT * FROM my_materialized_table where ds = '2024-01-02'") + .size()) + .isEqualTo(getPartitionSize(data, "2024-01-02")); Review Comment: The current materialized table is the result of aggregating the original table, so here only the size of result is compared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org