This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 593d27ead [hotfix] Flink catalog should pass to lake properties to
lake table (#2205)
593d27ead is described below
commit 593d27ead54fb1f74c10654f9d35b3dd32d2b630
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Dec 19 12:53:01 2025 +0800
[hotfix] Flink catalog should pass to lake properties to lake table (#2205)
---
.../java/org/apache/fluss/flink/catalog/FlinkCatalog.java | 8 ++++++++
.../apache/fluss/flink/catalog/FlinkCatalogITCase.java | 15 ++++++++++++++-
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 724983f9a..fb7a41157 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -335,6 +335,14 @@ public class FlinkCatalog extends AbstractCatalog {
Map<String, String> newOptions = new
HashMap<>(catalogBaseTable.getOptions());
newOptions.put(BOOTSTRAP_SERVERS.key(), bootstrapServers);
newOptions.putAll(securityConfigs);
+ // add lake properties
+ if (tableInfo.getTableConfig().isDataLakeEnabled()) {
+ for (Map.Entry<String, String> lakePropertyEntry :
+ getLakeCatalogProperties().entrySet()) {
+ String key = "table.datalake." +
lakePropertyEntry.getKey();
+ newOptions.put(key, lakePropertyEntry.getValue());
+ }
+ }
if (CatalogBaseTable.TableKind.TABLE ==
catalogBaseTable.getTableKind()) {
return ((CatalogTable) catalogBaseTable).copy(newOptions);
} else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 6e640959f..bc04543a4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -819,7 +820,7 @@ abstract class FlinkCatalogITCase {
}
@Test
- void testCreateCatalogWithLakeProperties() {
+ void testCreateCatalogWithLakeProperties() throws Exception {
Map<String, String> properties = new HashMap<>();
properties.put("paimon.jdbc.password", "pass");
tEnv.executeSql(
@@ -830,6 +831,18 @@ abstract class FlinkCatalogITCase {
(FlinkCatalog)
tEnv.getCatalog("test_catalog_with_lake_properties").get();
assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
+
+ String ddl =
+ "create table test_get_lake_table ("
+ + "a string, "
+ + "b int) "
+ + "with ('bucket.num' = '5', 'table.datalake.enabled'
= 'true')";
+ tEnv.executeSql(ddl);
+
+ CatalogBaseTable catalogTable =
+ catalog.getTable(new ObjectPath(DEFAULT_DB,
"test_get_lake_table"));
+ assertThat(catalogTable.getOptions())
+ .containsEntry("table.datalake.paimon.jdbc.password", "pass");
}
/**