Copilot commented on code in PR #1598:
URL: https://github.com/apache/fluss/pull/1598#discussion_r2303016552


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -321,6 +340,34 @@ public boolean isBounded() {
         }
     }
 
+    private boolean pushTimeStampFilterToLakeSource(
+            LakeSource<?> lakeSource, RowType flussRowType) {
+        // will push timestamp to lake
+        Predicate timestampFilter =
+                new LeafPredicate(
+                        GreaterOrEqual.INSTANCE,
+                        DataTypes.TIMESTAMP_LTZ(),
+                        // the timestamp index
+                        flussRowType.getFieldCount() + 2,

Review Comment:
   The comment '// the timestamp index' is unclear. The magic number 
calculation 'flussRowType.getFieldCount() + 2' should be explained more clearly 
or extracted to a named constant.
   ```suggestion
           // The timestamp field is located after all fields in the Fluss row 
type,
           // plus two additional metadata fields. We extract the index 
calculation
           // to a named constant for clarity.
           final int TIMESTAMP_FIELD_INDEX = flussRowType.getFieldCount() + 2;
           Predicate timestampFilter =
                   new LeafPredicate(
                           GreaterOrEqual.INSTANCE,
                           DataTypes.TIMESTAMP_LTZ(),
                           TIMESTAMP_FIELD_INDEX,
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java:
##########
@@ -47,6 +59,13 @@ public static LakeSource<LakeSplit> createLakeSource(
         LakeStoragePlugin lakeStoragePlugin =
                 LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
         LakeStorage lakeStorage = 
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
-        return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+        try {
+            return (LakeSource<LakeSplit>) 
lakeStorage.createLakeSource(tablePath);
+        } catch (UnsupportedOperationException e) {
+            LOG.info(
+                    "method createLakeSource throw 
UnsupportedOperationException for datalake format {}, return null as lakeSource 
to disable reading from lake source.",

Review Comment:
   [nitpick] The log message is quite verbose and could be simplified. Consider 
using a more concise message or changing to DEBUG level since this is expected 
behavior in some cases.
   ```suggestion
               LOG.debug(
                       "Lake source not supported for datalake format {}. 
Returning null.",
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -258,19 +264,32 @@ public boolean isBounded() {
             flussRowType = flussRowType.project(projectedFields);
         }
         OffsetsInitializer offsetsInitializer;
+        boolean enableLakeSource = lakeSource != null;
         switch (startupOptions.startupMode) {
             case EARLIEST:
                 offsetsInitializer = OffsetsInitializer.earliest();
                 break;
             case LATEST:
                 offsetsInitializer = OffsetsInitializer.latest();
+                // since it's scan from latest, don't consider lake data
+                enableLakeSource = false;
                 break;
             case FULL:
                 offsetsInitializer = OffsetsInitializer.full();
                 break;
             case TIMESTAMP:
                 offsetsInitializer =
                         
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+                if (hasPrimaryKey()) {
+                    // currently, for primary key table, don't consider lake 
data
+                    // when read from a given timestamp, todo: consider 
support it?

Review Comment:
   The comment mentions 'todo: consider support it?' but lacks context about 
why primary key tables don't support lake data for timestamp reads and what 
would be needed to implement this feature.
   ```suggestion
                       // Currently, for primary key tables, we do not consider 
lake data
                       // when reading from a given timestamp. This is because 
reconstructing
                       // the state of a primary key table as of a specific 
timestamp would require
                       // merging the changelog (incremental updates) with the 
lake (snapshot) data,
                       // ensuring consistency and correctness. Implementing 
this would involve
                       // developing logic to merge and resolve potential 
conflicts between the
                       // changelog and lake data as of the target timestamp, 
which is non-trivial.
                       // TODO: Consider supporting this by implementing 
timestamp-based state
                       // reconstruction for primary key tables, ensuring 
correctness and performance.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to