hackergin commented on code in PR #24844:
URL: https://github.com/apache/flink/pull/24844#discussion_r1616305580


##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -909,47 +694,172 @@ void testDropMaterializedTable(@InjectClusterClient 
RestClusterClient<?> restClu
                                         .asSerializableString()));
     }
 
-    private SessionHandle initializeSession() {
-        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
-        String catalogDDL =
-                String.format(
-                        "CREATE CATALOG %s\n"
-                                + "WITH (\n"
-                                + "  'type' = 'test-filesystem',\n"
-                                + "  'path' = '%s',\n"
-                                + "  'default-database' = '%s'\n"
-                                + "  )",
-                        fileSystemCatalogName, fileSystemCatalogPath, 
TEST_DEFAULT_DATABASE);
-        service.configureSession(sessionHandle, catalogDDL, -1);
-        service.configureSession(
-                sessionHandle, String.format("USE CATALOG %s", 
fileSystemCatalogName), -1);
-
-        // create source table
-        String dataGenSource =
-                "CREATE TABLE datagenSource (\n"
-                        + "  order_id BIGINT,\n"
-                        + "  order_number VARCHAR(20),\n"
-                        + "  user_id BIGINT,\n"
-                        + "  shop_id BIGINT,\n"
-                        + "  product_id BIGINT,\n"
-                        + "  status BIGINT,\n"
-                        + "  order_type BIGINT,\n"
-                        + "  order_created_at TIMESTAMP,\n"
-                        + "  payment_amount_cents BIGINT\n"
-                        + ")\n"
-                        + "WITH (\n"
-                        + "  'connector' = 'datagen',\n"
-                        + "  'rows-per-second' = '10'\n"
-                        + ")";
-        service.configureSession(sessionHandle, dataGenSource, -1);
-        return sessionHandle;
+    @Test
+    void testRefreshMaterializedTable() throws Exception {
+        long timeout = Duration.ofSeconds(20).toMillis();
+        long pause = Duration.ofSeconds(2).toMillis();
+
+        List<Row> data = new ArrayList<>();
+        data.add(Row.of(1L, 1L, 1L, "2024-01-01"));
+        data.add(Row.of(2L, 2L, 2L, "2024-01-01"));
+        data.add(Row.of(3L, 3L, 3L, "2024-01-02"));
+        data.add(Row.of(4L, 4L, 4L, "2024-01-02"));
+        data.add(Row.of(5L, 5L, 5L, "2024-01-03"));
+        data.add(Row.of(6L, 6L, 6L, "2024-01-03"));
+        String dataId = TestValuesTableFactory.registerData(data);
+
+        
createAndVerifyCreateMaterializedTableWithData("my_materialized_table", dataId, 
data);
+
+        // remove element of partition '2024-01-02'
+        removePartitionValue(data, "2024-01-02");
+
+        // refresh the materialized table with static partition
+        long startTime = System.currentTimeMillis();
+        Map<String, String> staticPartitions = new HashMap<>();
+        staticPartitions.put("ds", "2024-01-02");
+        ObjectIdentifier objectIdentifier =
+                ObjectIdentifier.of(
+                        fileSystemCatalogName, TEST_DEFAULT_DATABASE, 
"my_materialized_table");
+        OperationHandle refreshTableHandle =
+                service.refreshMaterializedTable(
+                        sessionHandle,
+                        objectIdentifier.asSerializableString(),
+                        false,
+                        null,
+                        Collections.emptyMap(),
+                        staticPartitions,
+                        Collections.emptyMap());
+
+        awaitOperationTermination(service, sessionHandle, refreshTableHandle);
+        List<RowData> result = fetchAllResults(service, sessionHandle, 
refreshTableHandle);
+        assertThat(result.size()).isEqualTo(1);
+        String jobId = result.get(0).getString(0).toString();
+
+        // 1. verify fresh job created
+        verifyRefreshJobCreated(restClusterClient, jobId, startTime);
+
+        // 2. verify the new job overwrite the data
+        CommonTestUtils.waitUtil(
+                () ->
+                        fetchTableData(sessionHandle, "SELECT * FROM 
my_materialized_table").size()

Review Comment:
   The data being queried here is for the entire table. To avoid 
misunderstanding, this test has been removed. In theory, we only need to check 
the partitions that have been refreshed and the partitions that have not been 
refreshed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to