This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new cb11779c7 [flink] Disable count push down for lake enabled table and 
catch exception when create lake source (#1679)
cb11779c7 is described below

commit cb11779c7e439efe05664e97ad7bb2be3090a751
Author: CaoZhen <[email protected]>
AuthorDate: Fri Sep 12 09:57:02 2025 +0800

    [flink] Disable count push down for lake enabled table and catch exception 
when create lake source (#1679)
---
 .../org/apache/fluss/flink/source/FlinkTableSource.java | 11 +++++++++--
 .../org/apache/fluss/flink/utils/LakeSourceUtils.java   | 17 +++++++++--------
 2 files changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 5739381d3..0f6c4c7e5 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -185,7 +185,10 @@ public class FlinkTableSource
         this.mergeEngineType = mergeEngineType;
         this.tableOptions = tableOptions;
         if (isDataLakeEnabled) {
-            this.lakeSource = createLakeSource(tablePath, tableOptions);
+            this.lakeSource =
+                    checkNotNull(
+                            createLakeSource(tablePath, tableOptions),
+                            "LakeSource must not be null if enable datalake");
         }
     }
 
@@ -567,7 +570,11 @@ public class FlinkTableSource
                 || aggregateExpressions.size() != 1
                 || hasPrimaryKey()
                 || groupingSets.size() > 1
-                || (groupingSets.size() == 1 && groupingSets.get(0).length > 
0)) {
+                || (groupingSets.size() == 1 && groupingSets.get(0).length > 0)
+                // The count pushdown feature is not supported when the data 
lake is enabled.
+                // Otherwise, it'll cause miss count data in lake. But In the 
future, we can push
+                // down count into lake.
+                || isDataLakeEnabled) {
             return false;
         }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 43bcf5fce..fcc8db29c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -60,19 +60,20 @@ public class LakeSourceUtils {
         try {
             lakeStoragePlugin = 
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
         } catch (UnsupportedOperationException e) {
-            LOG.info(
-                    "No LakeStoragePlugin can be found for datalake format: 
{}, return null to disable reading from lake source.",
-                    dataLake);
-            return null;
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "No LakeStoragePlugin available for data lake 
format: %s. "
+                                    + "To resolve this, ensure 
fluss-lake-%s.jar is in the classpath.",
+                            dataLake, dataLake.toLowerCase()));
         }
         LakeStorage lakeStorage = 
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
         try {
             return (LakeSource<LakeSplit>) 
lakeStorage.createLakeSource(tablePath);
         } catch (UnsupportedOperationException e) {
-            LOG.info(
-                    "method createLakeSource throw 
UnsupportedOperationException for datalake format {}, return null as lakeSource 
to disable reading from lake source.",
-                    dataLake);
-            return null;
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Table using '%s' data lake format cannot be used 
as historical data in Fluss.",
+                            dataLake));
         }
     }
 }

Reply via email to