Copilot commented on code in PR #2572:
URL: https://github.com/apache/fluss/pull/2572#discussion_r2763647286


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -307,33 +301,23 @@ public boolean isBounded() {
             flussRowType = flussRowType.project(projectedFields);
         }
         OffsetsInitializer offsetsInitializer;
-        boolean enableLakeSource = lakeSource != null;
+        boolean enableLakeSource = false;
         switch (startupOptions.startupMode) {
             case EARLIEST:
                 offsetsInitializer = OffsetsInitializer.earliest();
                 break;
             case LATEST:
                 offsetsInitializer = OffsetsInitializer.latest();
-                // since it's scan from latest, don't consider lake data
-                enableLakeSource = false;
                 break;
             case FULL:
                 offsetsInitializer = OffsetsInitializer.full();
+                // when it's full mode and lake source is not null,
+                // enable lake source as the historical data
+                enableLakeSource = lakeSource != null;
                 break;
             case TIMESTAMP:
                 offsetsInitializer =
                         
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
-                if (hasPrimaryKey()) {
-                    // Currently, for primary key tables, we do not consider 
lake data
-                    // when reading from a given timestamp. This is because we 
will need
-                    // to read the change log of primary key table.
-                    // TODO: consider support it using paimon change log data?
-                    enableLakeSource = false;
-                } else {
-                    if (enableLakeSource) {
-                        enableLakeSource = 
pushTimeStampFilterToLakeSource(lakeSource);
-                    }
-                }
                 break;

Review Comment:
   The change removes lake source support for TIMESTAMP startup mode, aligning 
with the documented behavior that TIMESTAMP mode "starts reading logs from 
user-supplied timestamp" (not lake data). However, the deleted test file 
FlinkUnionReadFromTimestampITCase.java previously tested union reads from both 
lake and log sources with timestamp filtering. While the new behavior matches 
the documented semantics, no test coverage remains to verify that TIMESTAMP 
mode correctly reads from log sources starting at the specified timestamp when 
a lake is present but should be ignored. Consider adding test coverage to 
verify this behavior, particularly to ensure that the presence of a lake source 
doesn't interfere with timestamp-based log reading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to