wuchong commented on code in PR #2477:
URL: https://github.com/apache/fluss/pull/2477#discussion_r2778783386


##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -377,6 +377,25 @@ public void testChangelogWithScanStartupMode() throws 
Exception {
                 .containsExactlyInAnyOrder(
                         "+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
                         "+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+
+        // 3. Test scan.startup.mode='latest' - should only read new records 
after subscription
+        String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest') 
*/";
+        String queryLatest = "SELECT id FROM startup_mode_test$changelog" + 
optionsLatest;
+        CloseableIterator<Row> rowIterLatest = 
tEnv.executeSql(queryLatest).collect();
+        List<String> latestResults = new ArrayList<>();
+        for (int attempt = 0; attempt < 10; attempt++) {
+            // Write a new record (with id larger than 5)
+            int rowId = 6 + attempt;
+            writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)), 
false);
+
+            // Try to fetch one record with a 5-second timeout
+            latestResults = collectRowsWithTimeout(rowIterLatest, 1, 
Duration.ofSeconds(5));

Review Comment:
   I’ve thought about this again, and a more elegant approach might be to 
leverage **Flink’s savepoint mechanism** to explicitly force the initialization 
of the start offset. Here’s the refined idea:
   
   1. **Pre-populate** the source table with some existing records.  
   2. Launch a job:  
      ```sql
      INSERT INTO sink SELECT * FROM source /*+ OPTIONS('scan.startup.mode' = 
'latest') */;
      ```  
   3. **Stop the job with a savepoint** (e.g., using 
`FlinkOperator#stopWithSavepoint`, as shown in [this 
example](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java#L243)).
  
      → At this point, the “latest” offset has been **determined, initialized, 
and checkpointed** into the state.  
   4. **Restart the job from the savepoint**.  (see example in 
`FlinkTableSourceFailOverITCase#initTableEnvironment`)
   5. **Write new records** into the source table.  
   6. **Read from the sink table** — the output should contain **only the 
records written in step 5**, confirming that the job correctly started reading 
from the offset captured at savepoint time.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to