This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 593d27ead [hotfix] Flink catalog should pass to lake properties to 
lake table (#2205)
593d27ead is described below

commit 593d27ead54fb1f74c10654f9d35b3dd32d2b630
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Dec 19 12:53:01 2025 +0800

    [hotfix] Flink catalog should pass to lake properties to lake table (#2205)
---
 .../java/org/apache/fluss/flink/catalog/FlinkCatalog.java |  8 ++++++++
 .../apache/fluss/flink/catalog/FlinkCatalogITCase.java    | 15 ++++++++++++++-
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 724983f9a..fb7a41157 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -335,6 +335,14 @@ public class FlinkCatalog extends AbstractCatalog {
             Map<String, String> newOptions = new 
HashMap<>(catalogBaseTable.getOptions());
             newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
             newOptions.putAll(securityConfigs);
+            // add lake properties
+            if (tableInfo.getTableConfig().isDataLakeEnabled()) {
+                for (Map.Entry<String, String> lakePropertyEntry :
+                        getLakeCatalogProperties().entrySet()) {
+                    String key = "table.datalake." + 
lakePropertyEntry.getKey();
+                    newOptions.put(key, lakePropertyEntry.getValue());
+                }
+            }
             if (CatalogBaseTable.TableKind.TABLE == 
catalogBaseTable.getTableKind()) {
                 return ((CatalogTable) catalogBaseTable).copy(newOptions);
             } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 6e640959f..bc04543a4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -819,7 +820,7 @@ abstract class FlinkCatalogITCase {
     }
 
     @Test
-    void testCreateCatalogWithLakeProperties() {
+    void testCreateCatalogWithLakeProperties() throws Exception {
         Map<String, String> properties = new HashMap<>();
         properties.put("paimon.jdbc.password", "pass");
         tEnv.executeSql(
@@ -830,6 +831,18 @@ abstract class FlinkCatalogITCase {
                 (FlinkCatalog) 
tEnv.getCatalog("test_catalog_with_lake_properties").get();
 
         assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
+
+        String ddl =
+                "create table test_get_lake_table ("
+                        + "a string, "
+                        + "b int) "
+                        + "with ('bucket.num' = '5', 'table.datalake.enabled' 
= 'true')";
+        tEnv.executeSql(ddl);
+
+        CatalogBaseTable catalogTable =
+                catalog.getTable(new ObjectPath(DEFAULT_DB, 
"test_get_lake_table"));
+        assertThat(catalogTable.getOptions())
+                .containsEntry("table.datalake.paimon.jdbc.password", "pass");
     }
 
     /**

Reply via email to