This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 35d92c559 [flink] Allow pass lake catalog property info when create 
flink catalog (#1937)
35d92c559 is described below

commit 35d92c5594364d418059fb89daf3540f430dedc9
Author: xx789 <[email protected]>
AuthorDate: Wed Dec 17 11:32:26 2025 +0800

    [flink] Allow pass lake catalog property info when create flink catalog 
(#1937)
---
 .../apache/fluss/flink/catalog/Flink21Catalog.java | 14 +++++++--
 .../fluss/flink/catalog/Flink21CatalogFactory.java |  3 +-
 .../fluss/flink/catalog/Flink21CatalogITCase.java  |  3 +-
 .../fluss/flink/catalog/FlinkCatalog21Test.java    |  1 +
 .../apache/fluss/flink/catalog/FlinkCatalog.java   | 34 +++++++++++++++++++---
 .../fluss/flink/catalog/FlinkCatalogFactory.java   | 24 +++++++++++++--
 .../apache/fluss/flink/lake/LakeFlinkCatalog.java  | 25 ++++++++++------
 .../apache/fluss/flink/lake/LakeTableFactory.java  |  4 ++-
 .../flink/catalog/FlinkCatalogFactoryTest.java     |  9 +++++-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 23 +++++++++++++--
 .../fluss/flink/catalog/FlinkCatalogTest.java      | 13 ++++++---
 .../lake/iceberg/flink/FlinkCatalogLakeTest.java   |  3 +-
 website/docs/engine-flink/ddl.md                   | 15 +++++-----
 13 files changed, 135 insertions(+), 36 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
index 7a6f737e0..d14e3af6f 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Supplier;
 
 /** A {@link FlinkCatalog} used for Flink 2.1. */
 public class Flink21Catalog extends FlinkCatalog {
@@ -42,8 +43,15 @@ public class Flink21Catalog extends FlinkCatalog {
             String defaultDatabase,
             String bootstrapServers,
             ClassLoader classLoader,
-            Map<String, String> securityConfigs) {
-        super(name, defaultDatabase, bootstrapServers, classLoader, 
securityConfigs);
+            Map<String, String> securityConfigs,
+            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
+        super(
+                name,
+                defaultDatabase,
+                bootstrapServers,
+                classLoader,
+                securityConfigs,
+                lakeCatalogPropertiesSupplier);
     }
 
     @VisibleForTesting
@@ -53,6 +61,7 @@ public class Flink21Catalog extends FlinkCatalog {
             String bootstrapServers,
             ClassLoader classLoader,
             Map<String, String> securityConfigs,
+            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
             LakeFlinkCatalog lakeFlinkCatalog) {
         super(
                 name,
@@ -60,6 +69,7 @@ public class Flink21Catalog extends FlinkCatalog {
                 bootstrapServers,
                 classLoader,
                 securityConfigs,
+                lakeCatalogPropertiesSupplier,
                 lakeFlinkCatalog);
     }
 
diff --git 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
index 8557a552f..cff44ab86 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
@@ -29,6 +29,7 @@ public class Flink21CatalogFactory extends 
FlinkCatalogFactory {
                 catalog.defaultDatabase,
                 catalog.bootstrapServers,
                 catalog.classLoader,
-                catalog.securityConfigs);
+                catalog.securityConfigs,
+                catalog.lakeCatalogPropertiesSupplier);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
index c0b9b9196..62bf5b9aa 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -42,7 +42,8 @@ public class Flink21CatalogITCase extends FlinkCatalogITCase {
                         catalog.defaultDatabase,
                         catalog.bootstrapServers,
                         catalog.classLoader,
-                        catalog.securityConfigs);
+                        catalog.securityConfigs,
+                        catalog.lakeCatalogPropertiesSupplier);
         catalog.open();
     }
 
diff --git 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
index e66a625e5..d6aa6ef56 100644
--- 
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
+++ 
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
@@ -43,6 +43,7 @@ public class FlinkCatalog21Test extends FlinkCatalogTest {
                 bootstrapServers,
                 Thread.currentThread().getContextClassLoader(),
                 Collections.emptyMap(),
+                Collections::emptyMap,
                 lakeFlinkCatalog);
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 16b05089a..724983f9a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -80,6 +80,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -115,6 +116,8 @@ public class FlinkCatalog extends AbstractCatalog {
     protected final String bootstrapServers;
     protected final Map<String, String> securityConfigs;
     protected final LakeFlinkCatalog lakeFlinkCatalog;
+    protected volatile Map<String, String> lakeCatalogProperties;
+    protected final Supplier<Map<String, String>> 
lakeCatalogPropertiesSupplier;
     protected Connection connection;
     protected Admin admin;
 
@@ -123,13 +126,15 @@ public class FlinkCatalog extends AbstractCatalog {
             String defaultDatabase,
             String bootstrapServers,
             ClassLoader classLoader,
-            Map<String, String> securityConfigs) {
+            Map<String, String> securityConfigs,
+            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
         this(
                 name,
                 defaultDatabase,
                 bootstrapServers,
                 classLoader,
                 securityConfigs,
+                lakeCatalogPropertiesSupplier,
                 new LakeFlinkCatalog(name, classLoader));
     }
 
@@ -140,6 +145,7 @@ public class FlinkCatalog extends AbstractCatalog {
             String bootstrapServers,
             ClassLoader classLoader,
             Map<String, String> securityConfigs,
+            Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
             LakeFlinkCatalog lakeFlinkCatalog) {
         super(name, defaultDatabase);
         this.catalogName = name;
@@ -147,6 +153,7 @@ public class FlinkCatalog extends AbstractCatalog {
         this.bootstrapServers = bootstrapServers;
         this.classLoader = classLoader;
         this.securityConfigs = securityConfigs;
+        this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier;
         this.lakeFlinkCatalog = lakeFlinkCatalog;
     }
 
@@ -312,8 +319,12 @@ public class FlinkCatalog extends AbstractCatalog {
                                             objectPath.getDatabaseName(),
                                             tableName.split("\\" + 
LAKE_TABLE_SPLITTER)[0])));
                 }
+
                 return getLakeTable(
-                        objectPath.getDatabaseName(), tableName, 
tableInfo.getProperties());
+                        objectPath.getDatabaseName(),
+                        tableName,
+                        tableInfo.getProperties(),
+                        getLakeCatalogProperties());
             } else {
                 tableInfo = admin.getTableInfo(tablePath).get();
             }
@@ -347,7 +358,10 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     protected CatalogBaseTable getLakeTable(
-            String databaseName, String tableName, Configuration properties)
+            String databaseName,
+            String tableName,
+            Configuration properties,
+            Map<String, String> lakeCatalogProperties)
             throws TableNotExistException, CatalogException {
         String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
         if (tableComponents.length == 1) {
@@ -359,7 +373,7 @@ public class FlinkCatalog extends AbstractCatalog {
             tableName = String.join("", tableComponents);
         }
         return lakeFlinkCatalog
-                .getLakeCatalog(properties)
+                .getLakeCatalog(properties, lakeCatalogProperties)
                 .getTable(new ObjectPath(databaseName, tableName));
     }
 
@@ -772,4 +786,16 @@ public class FlinkCatalog extends AbstractCatalog {
     public Map<String, String> getSecurityConfigs() {
         return securityConfigs;
     }
+
+    @VisibleForTesting
+    public Map<String, String> getLakeCatalogProperties() {
+        if (lakeCatalogProperties == null) {
+            synchronized (this) {
+                if (lakeCatalogProperties == null) {
+                    lakeCatalogProperties = 
lakeCatalogPropertiesSupplier.get();
+                }
+            }
+        }
+        return lakeCatalogProperties;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
index 83bd3cd21..d1c9bfc48 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
@@ -18,12 +18,16 @@
 package org.apache.fluss.flink.catalog;
 
 import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.metadata.DataLakeFormat;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -35,6 +39,15 @@ public class FlinkCatalogFactory implements CatalogFactory {
 
     public static final String IDENTIFIER = "fluss";
 
+    public static final List<String> PREFIXES_TO_SKIP_VALIDATE = new 
ArrayList<>();
+
+    static {
+        PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX);
+        for (DataLakeFormat value : DataLakeFormat.values()) {
+            PREFIXES_TO_SKIP_VALIDATE.add(value.toString());
+        }
+    }
+
     @Override
     public String factoryIdentifier() {
         return IDENTIFIER;
@@ -54,7 +67,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
     public FlinkCatalog createCatalog(Context context) {
         final FactoryUtil.CatalogFactoryHelper helper =
                 FactoryUtil.createCatalogFactoryHelper(this, context);
-        helper.validateExcept(CLIENT_SECURITY_PREFIX);
+        helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new 
String[0]));
         Map<String, String> options = context.getOptions();
         Map<String, String> securityConfigs = extractPrefix(options, 
CLIENT_SECURITY_PREFIX);
 
@@ -63,6 +76,13 @@ public class FlinkCatalogFactory implements CatalogFactory {
                 helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
                 
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
                 context.getClassLoader(),
-                securityConfigs);
+                securityConfigs,
+                () -> {
+                    Map<String, String> lakeCatalogProperties = new 
HashMap<>();
+                    for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
+                        lakeCatalogProperties.putAll(extractPrefix(options, 
lakeFormat.toString()));
+                    }
+                    return lakeCatalogProperties;
+                });
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
index b51776419..6e8b2a5e2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.utils.DataLakeUtils;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.PropertiesUtils;
 
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
@@ -49,7 +50,8 @@ public class LakeFlinkCatalog implements AutoCloseable {
         this.classLoader = classLoader;
     }
 
-    public Catalog getLakeCatalog(Configuration tableOptions) {
+    public Catalog getLakeCatalog(
+            Configuration tableOptions, Map<String, String> 
lakeCatalogProperties) {
         // TODO: Currently, a Fluss cluster only supports a single DataLake 
storage.
         // However, in the
         //  future, it may support multiple DataLakes. The following code 
assumes
@@ -69,12 +71,19 @@ public class LakeFlinkCatalog implements AutoCloseable {
                                         + 
ConfigOptions.TABLE_DATALAKE_FORMAT.key()
                                         + "' is set.");
                     }
+                    Map<String, String> catalogProperties =
+                            PropertiesUtils.extractAndRemovePrefix(
+                                    lakeCatalogProperties, lakeFormat + ".");
+
+                    catalogProperties.putAll(
+                            
DataLakeUtils.extractLakeCatalogProperties(tableOptions));
                     if (lakeFormat == PAIMON) {
                         catalog =
-                                PaimonCatalogFactory.create(catalogName, 
tableOptions, classLoader);
+                                PaimonCatalogFactory.create(
+                                        catalogName, catalogProperties, 
classLoader);
                         this.lakeFormat = PAIMON;
                     } else if (lakeFormat == ICEBERG) {
-                        catalog = IcebergCatalogFactory.create(catalogName, 
tableOptions);
+                        catalog = IcebergCatalogFactory.create(catalogName, 
catalogProperties);
                         this.lakeFormat = ICEBERG;
                     } else {
                         throw new UnsupportedOperationException(
@@ -111,9 +120,9 @@ public class LakeFlinkCatalog implements AutoCloseable {
         private PaimonCatalogFactory() {}
 
         public static Catalog create(
-                String catalogName, Configuration tableOptions, ClassLoader 
classLoader) {
-            Map<String, String> catalogProperties =
-                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+                String catalogName,
+                Map<String, String> catalogProperties,
+                ClassLoader classLoader) {
             return FlinkCatalogFactory.createCatalog(
                     catalogName,
                     CatalogContext.create(
@@ -131,9 +140,7 @@ public class LakeFlinkCatalog implements AutoCloseable {
         // requires Iceberg 1.5.0+.
         // Using reflection to maintain Java 8 compatibility.
         // Once Fluss drops Java 8, we can remove the reflection code
-        public static Catalog create(String catalogName, Configuration 
tableOptions) {
-            Map<String, String> catalogProperties =
-                    DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+        public static Catalog create(String catalogName, Map<String, String> 
catalogProperties) {
             // Map "type" to "catalog-type" (equivalent)
             // Required: either "catalog-type" (standard type) or 
"catalog-impl"
             // (fully-qualified custom class, mandatory if "catalog-type" is 
missing)
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 3f0ff88c6..ad9918f38 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 
+import java.util.Collections;
+
 /** A factory to create {@link DynamicTableSource} for lake table. */
 public class LakeTableFactory {
     private final LakeFlinkCatalog lakeFlinkCatalog;
@@ -83,7 +85,7 @@ public class LakeTableFactory {
                     lakeFlinkCatalog.getLakeCatalog(
                             // we can pass empty configuration to get catalog
                             // since the catalog should already be initialized
-                            new Configuration());
+                            new Configuration(), Collections.emptyMap());
 
             // Create FlinkDynamicTableFactory with the catalog
             Class<?> icebergFactoryClass =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
index 4a17cc3c8..f47d9c56b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
@@ -64,7 +64,8 @@ abstract class FlinkCatalogFactoryTest {
                         DB_NAME,
                         BOOTSTRAP_SERVERS_NAME,
                         Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections::emptyMap);
 
         checkEquals(flinkCatalog, actualCatalog);
 
@@ -75,7 +76,12 @@ abstract class FlinkCatalogFactoryTest {
         securityMap.put("client.security.sasl.username", "root");
         securityMap.put("client.security.sasl.password", "password");
 
+        Map<String, String> lakeCatalogMap = new HashMap<>();
+        lakeCatalogMap.put("paimon.jdbc.user", "admin");
+        lakeCatalogMap.put("paimon.jdbc.password", "pass");
+
         options.putAll(securityMap);
+        options.putAll(lakeCatalogMap);
         FlinkCatalog actualCatalog2 =
                 (FlinkCatalog)
                         FactoryUtil.createCatalog(
@@ -85,6 +91,7 @@ abstract class FlinkCatalogFactoryTest {
                                 
Thread.currentThread().getContextClassLoader());
 
         assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
+        
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 570d7121a..6e640959f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -115,7 +115,8 @@ abstract class FlinkCatalogITCase {
                         DEFAULT_DB,
                         bootstrapServers,
                         Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections::emptyMap);
         catalog.open();
     }
 
@@ -769,7 +770,8 @@ abstract class FlinkCatalogITCase {
                             DEFAULT_DB,
                             bootstrapServers,
                             Thread.currentThread().getContextClassLoader(),
-                            Collections.emptyMap());
+                            Collections.emptyMap(),
+                            Collections::emptyMap);
             Catalog finalAuthenticateCatalog = authenticateCatalog;
             assertThatThrownBy(finalAuthenticateCatalog::open)
                     .cause()
@@ -787,7 +789,8 @@ abstract class FlinkCatalogITCase {
                             DEFAULT_DB,
                             bootstrapServers,
                             Thread.currentThread().getContextClassLoader(),
-                            clientConfig);
+                            clientConfig,
+                            Collections::emptyMap);
             authenticateCatalog.open();
             assertThat(authenticateCatalog.listDatabases())
                     
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
@@ -815,6 +818,20 @@ abstract class FlinkCatalogITCase {
                         "The configured default-database 'non-exist' does not 
exist in the Fluss cluster.");
     }
 
+    @Test
+    void testCreateCatalogWithLakeProperties() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("paimon.jdbc.password", "pass");
+        tEnv.executeSql(
+                String.format(
+                        "create catalog test_catalog_with_lake_properties with 
('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')",
+                        BOOTSTRAP_SERVERS.key(), 
FLUSS_CLUSTER_EXTENSION.getBootstrapServers()));
+        FlinkCatalog catalog =
+                (FlinkCatalog) 
tEnv.getCatalog("test_catalog_with_lake_properties").get();
+
+        assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
+    }
+
     /**
      * Before Flink 2.1, the {@link Schema} did not include an index field. 
Starting from Flink 2.1,
      * Flink introduced the concept of an index, and in Fluss, the primary key 
is considered as an
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index c32492c4b..9b915a74d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -172,6 +172,7 @@ class FlinkCatalogTest {
                 bootstrapServers,
                 Thread.currentThread().getContextClassLoader(),
                 Collections.emptyMap(),
+                Collections::emptyMap,
                 lakeFlinkCatalog);
     }
 
@@ -645,7 +646,8 @@ class FlinkCatalogTest {
                                                 ",",
                                                 
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
                                         
Thread.currentThread().getContextClassLoader(),
-                                        Collections.emptyMap()))
+                                        Collections.emptyMap(),
+                                        Collections::emptyMap))
                 .hasMessageContaining("defaultDatabase cannot be null or 
empty");
     }
 
@@ -833,7 +835,8 @@ class FlinkCatalogTest {
                         "default",
                         "invalid-bootstrap-server:9092",
                         Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections::emptyMap);
 
         // Test open() throws proper exception
         assertThatThrownBy(() -> badCatalog.open())
@@ -963,7 +966,8 @@ class FlinkCatalogTest {
                         DEFAULT_DB,
                         String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
                         Thread.currentThread().getContextClassLoader(),
-                        securityConfigs);
+                        securityConfigs,
+                        Collections::emptyMap);
         securedCatalog.open();
 
         try {
@@ -1006,7 +1010,8 @@ class FlinkCatalogTest {
         }
 
         @Override
-        public Catalog getLakeCatalog(Configuration tableOptions) {
+        public Catalog getLakeCatalog(
+                Configuration tableOptions, Map<String, String> 
lakeCatalogProperties) {
             return catalog;
         }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
index f1305b99d..a6380cc44 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
@@ -102,7 +102,8 @@ class FlinkCatalogLakeTest extends 
FlinkIcebergTieringTestBase {
                         DEFAULT_DB,
                         bootstrapServers,
                         Thread.currentThread().getContextClassLoader(),
-                        Collections.emptyMap());
+                        Collections.emptyMap(),
+                        Collections::emptyMap);
         catalog.open();
     }
 }
diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md
index a542db143..cc5bfcd97 100644
--- a/website/docs/engine-flink/ddl.md
+++ b/website/docs/engine-flink/ddl.md
@@ -21,13 +21,14 @@ USE CATALOG fluss_catalog;
 
 The following properties can be set if using the Fluss catalog:
 
-| Option                         | Required | Default   | Description          
                                                                                
                                                                                
|
-|--------------------------------|----------|-----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| type                           | required | (none)    | Catalog type, must 
be 'fluss' here.                                                                
                                                                               |
-| bootstrap.servers              | required | (none)    | Comma separated list 
of Fluss servers.                                                               
                                                                                
|
-| default-database               | optional | fluss     | The default database 
to use when switching to this catalog.                                          
                                                                                
|
-| client.security.protocol       | optional | PLAINTEXT | The security 
protocol used to communicate with brokers. Currently, only `PLAINTEXT` and 
`SASL` are supported, the configuration value is case insensitive.              
             |
-| `client.security.{protocol}.*` | optional | (none)    | Client-side 
configuration properties for a specific authentication protocol. E.g., 
client.security.sasl.jaas.config. More Details in 
[authentication](../security/authentication.md) |
+| Option                         | Required | Default   | Description          
                                                                                
                                                                                
                                  |
+|--------------------------------|----------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type                           | required | (none)    | Catalog type, must 
be 'fluss' here.                                                                
                                                                                
                                    |
+| bootstrap.servers              | required | (none)    | Comma separated list 
of Fluss servers.                                                               
                                                                                
                                  |
+| default-database               | optional | fluss     | The default database 
to use when switching to this catalog.                                          
                                                                                
                                  |
+| client.security.protocol       | optional | PLAINTEXT | The security 
protocol used to communicate with brokers. Currently, only `PLAINTEXT` and 
`SASL` are supported, the configuration value is case insensitive.              
                                               |
+| `client.security.{protocol}.*` | optional | (none)    | Client-side 
configuration properties for a specific authentication protocol. E.g., 
client.security.sasl.jaas.config. More Details in 
[authentication](../security/authentication.md)                                 
  |
+| `{lake-format}.*`              | optional | (none)    | Extra properties to 
be passed to the lake catalog. This is useful for configuring sensitive 
settings, such as the username and password required for lake catalog 
authentication. E.g., `paimon.jdbc.password = pass`. |
 
 The following statements assume that the current catalog has been switched to 
the Fluss catalog using the `USE CATALOG <catalog_name>` statement.
 

Reply via email to