This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new cb11779c7 [flink] Disable count push down for lake enabled table and
catch exception when create lake source (#1679)
cb11779c7 is described below
commit cb11779c7e439efe05664e97ad7bb2be3090a751
Author: CaoZhen <[email protected]>
AuthorDate: Fri Sep 12 09:57:02 2025 +0800
[flink] Disable count push down for lake enabled table and catch exception
when create lake source (#1679)
---
.../org/apache/fluss/flink/source/FlinkTableSource.java | 11 +++++++++--
.../org/apache/fluss/flink/utils/LakeSourceUtils.java | 17 +++++++++--------
2 files changed, 18 insertions(+), 10 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 5739381d3..0f6c4c7e5 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -185,7 +185,10 @@ public class FlinkTableSource
this.mergeEngineType = mergeEngineType;
this.tableOptions = tableOptions;
if (isDataLakeEnabled) {
- this.lakeSource = createLakeSource(tablePath, tableOptions);
+ this.lakeSource =
+ checkNotNull(
+ createLakeSource(tablePath, tableOptions),
+ "LakeSource must not be null if enable datalake");
}
}
@@ -567,7 +570,11 @@ public class FlinkTableSource
|| aggregateExpressions.size() != 1
|| hasPrimaryKey()
|| groupingSets.size() > 1
- || (groupingSets.size() == 1 && groupingSets.get(0).length >
0)) {
+ || (groupingSets.size() == 1 && groupingSets.get(0).length > 0)
+ // The count pushdown feature is not supported when the data
lake is enabled.
+ // Otherwise, it'll cause miss count data in lake. But In the
future, we can push
+ // down count into lake.
+ || isDataLakeEnabled) {
return false;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 43bcf5fce..fcc8db29c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -60,19 +60,20 @@ public class LakeSourceUtils {
try {
lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
} catch (UnsupportedOperationException e) {
- LOG.info(
- "No LakeStoragePlugin can be found for datalake format:
{}, return null to disable reading from lake source.",
- dataLake);
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "No LakeStoragePlugin available for data lake
format: %s. "
+ + "To resolve this, ensure
fluss-lake-%s.jar is in the classpath.",
+ dataLake, dataLake.toLowerCase()));
}
LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
try {
return (LakeSource<LakeSplit>)
lakeStorage.createLakeSource(tablePath);
} catch (UnsupportedOperationException e) {
- LOG.info(
- "method createLakeSource throw
UnsupportedOperationException for datalake format {}, return null as lakeSource
to disable reading from lake source.",
- dataLake);
- return null;
+ throw new UnsupportedOperationException(
+ String.format(
+ "Table using '%s' data lake format cannot be used
as historical data in Fluss.",
+ dataLake));
}
}
}