Copilot commented on code in PR #1598:
URL: https://github.com/apache/fluss/pull/1598#discussion_r2303016552
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -321,6 +340,34 @@ public boolean isBounded() {
}
}
+ private boolean pushTimeStampFilterToLakeSource(
+ LakeSource<?> lakeSource, RowType flussRowType) {
+ // will push timestamp to lake
+ Predicate timestampFilter =
+ new LeafPredicate(
+ GreaterOrEqual.INSTANCE,
+ DataTypes.TIMESTAMP_LTZ(),
+ // the timestamp index
+ flussRowType.getFieldCount() + 2,
Review Comment:
The comment '// the timestamp index' is unclear. The magic number
calculation 'flussRowType.getFieldCount() + 2' should be explained more clearly
or extracted to a named constant.
```suggestion
// The timestamp field is located after all fields in the Fluss row
type,
// plus two additional metadata fields. We extract the index
calculation
// to a named constant for clarity.
final int TIMESTAMP_FIELD_INDEX = flussRowType.getFieldCount() + 2;
Predicate timestampFilter =
new LeafPredicate(
GreaterOrEqual.INSTANCE,
DataTypes.TIMESTAMP_LTZ(),
TIMESTAMP_FIELD_INDEX,
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java:
##########
@@ -47,6 +59,13 @@ public static LakeSource<LakeSplit> createLakeSource(
LakeStoragePlugin lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
- return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+ try {
+ return (LakeSource<LakeSplit>)
lakeStorage.createLakeSource(tablePath);
+ } catch (UnsupportedOperationException e) {
+ LOG.info(
+ "method createLakeSource throw
UnsupportedOperationException for datalake format {}, return null as lakeSource
to disable reading from lake source.",
Review Comment:
[nitpick] The log message is quite verbose and could be simplified. Consider
using a more concise message or changing to DEBUG level since this is expected
behavior in some cases.
```suggestion
LOG.debug(
"Lake source not supported for datalake format {}.
Returning null.",
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java:
##########
@@ -258,19 +264,32 @@ public boolean isBounded() {
flussRowType = flussRowType.project(projectedFields);
}
OffsetsInitializer offsetsInitializer;
+ boolean enableLakeSource = lakeSource != null;
switch (startupOptions.startupMode) {
case EARLIEST:
offsetsInitializer = OffsetsInitializer.earliest();
break;
case LATEST:
offsetsInitializer = OffsetsInitializer.latest();
+ // since it's scan from latest, don't consider lake data
+ enableLakeSource = false;
break;
case FULL:
offsetsInitializer = OffsetsInitializer.full();
break;
case TIMESTAMP:
offsetsInitializer =
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
+ if (hasPrimaryKey()) {
+ // currently, for primary key table, don't consider lake
data
+ // when read from a given timestamp, todo: consider
support it?
Review Comment:
The comment mentions 'todo: consider support it?' but lacks context about
why primary key tables don't support lake data for timestamp reads and what
would be needed to implement this feature.
```suggestion
// Currently, for primary key tables, we do not consider
lake data
// when reading from a given timestamp. This is because
reconstructing
// the state of a primary key table as of a specific
timestamp would require
// merging the changelog (incremental updates) with the
lake (snapshot) data,
// ensuring consistency and correctness. Implementing
this would involve
// developing logic to merge and resolve potential
conflicts between the
// changelog and lake data as of the target timestamp,
which is non-trivial.
// TODO: Consider supporting this by implementing
timestamp-based state
// reconstruction for primary key tables, ensuring
correctness and performance.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]