This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 35d92c559 [flink] Allow pass lake catalog property info when create
flink catalog (#1937)
35d92c559 is described below
commit 35d92c5594364d418059fb89daf3540f430dedc9
Author: xx789 <[email protected]>
AuthorDate: Wed Dec 17 11:32:26 2025 +0800
[flink] Allow pass lake catalog property info when create flink catalog
(#1937)
---
.../apache/fluss/flink/catalog/Flink21Catalog.java | 14 +++++++--
.../fluss/flink/catalog/Flink21CatalogFactory.java | 3 +-
.../fluss/flink/catalog/Flink21CatalogITCase.java | 3 +-
.../fluss/flink/catalog/FlinkCatalog21Test.java | 1 +
.../apache/fluss/flink/catalog/FlinkCatalog.java | 34 +++++++++++++++++++---
.../fluss/flink/catalog/FlinkCatalogFactory.java | 24 +++++++++++++--
.../apache/fluss/flink/lake/LakeFlinkCatalog.java | 25 ++++++++++------
.../apache/fluss/flink/lake/LakeTableFactory.java | 4 ++-
.../flink/catalog/FlinkCatalogFactoryTest.java | 9 +++++-
.../fluss/flink/catalog/FlinkCatalogITCase.java | 23 +++++++++++++--
.../fluss/flink/catalog/FlinkCatalogTest.java | 13 ++++++---
.../lake/iceberg/flink/FlinkCatalogLakeTest.java | 3 +-
website/docs/engine-flink/ddl.md | 15 +++++-----
13 files changed, 135 insertions(+), 36 deletions(-)
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
index 7a6f737e0..d14e3af6f 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
/** A {@link FlinkCatalog} used for Flink 2.1. */
public class Flink21Catalog extends FlinkCatalog {
@@ -42,8 +43,15 @@ public class Flink21Catalog extends FlinkCatalog {
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
- Map<String, String> securityConfigs) {
- super(name, defaultDatabase, bootstrapServers, classLoader,
securityConfigs);
+ Map<String, String> securityConfigs,
+ Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
+ super(
+ name,
+ defaultDatabase,
+ bootstrapServers,
+ classLoader,
+ securityConfigs,
+ lakeCatalogPropertiesSupplier);
}
@VisibleForTesting
@@ -53,6 +61,7 @@ public class Flink21Catalog extends FlinkCatalog {
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs,
+ Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
LakeFlinkCatalog lakeFlinkCatalog) {
super(
name,
@@ -60,6 +69,7 @@ public class Flink21Catalog extends FlinkCatalog {
bootstrapServers,
classLoader,
securityConfigs,
+ lakeCatalogPropertiesSupplier,
lakeFlinkCatalog);
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
index 8557a552f..cff44ab86 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21CatalogFactory.java
@@ -29,6 +29,7 @@ public class Flink21CatalogFactory extends
FlinkCatalogFactory {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
- catalog.securityConfigs);
+ catalog.securityConfigs,
+ catalog.lakeCatalogPropertiesSupplier);
}
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
index c0b9b9196..62bf5b9aa 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/Flink21CatalogITCase.java
@@ -42,7 +42,8 @@ public class Flink21CatalogITCase extends FlinkCatalogITCase {
catalog.defaultDatabase,
catalog.bootstrapServers,
catalog.classLoader,
- catalog.securityConfigs);
+ catalog.securityConfigs,
+ catalog.lakeCatalogPropertiesSupplier);
catalog.open();
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
index e66a625e5..d6aa6ef56 100644
---
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
@@ -43,6 +43,7 @@ public class FlinkCatalog21Test extends FlinkCatalogTest {
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
+ Collections::emptyMap,
lakeFlinkCatalog);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 16b05089a..724983f9a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -80,6 +80,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -115,6 +116,8 @@ public class FlinkCatalog extends AbstractCatalog {
protected final String bootstrapServers;
protected final Map<String, String> securityConfigs;
protected final LakeFlinkCatalog lakeFlinkCatalog;
+ protected volatile Map<String, String> lakeCatalogProperties;
+ protected final Supplier<Map<String, String>>
lakeCatalogPropertiesSupplier;
protected Connection connection;
protected Admin admin;
@@ -123,13 +126,15 @@ public class FlinkCatalog extends AbstractCatalog {
String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
- Map<String, String> securityConfigs) {
+ Map<String, String> securityConfigs,
+ Supplier<Map<String, String>> lakeCatalogPropertiesSupplier) {
this(
name,
defaultDatabase,
bootstrapServers,
classLoader,
securityConfigs,
+ lakeCatalogPropertiesSupplier,
new LakeFlinkCatalog(name, classLoader));
}
@@ -140,6 +145,7 @@ public class FlinkCatalog extends AbstractCatalog {
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs,
+ Supplier<Map<String, String>> lakeCatalogPropertiesSupplier,
LakeFlinkCatalog lakeFlinkCatalog) {
super(name, defaultDatabase);
this.catalogName = name;
@@ -147,6 +153,7 @@ public class FlinkCatalog extends AbstractCatalog {
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
+ this.lakeCatalogPropertiesSupplier = lakeCatalogPropertiesSupplier;
this.lakeFlinkCatalog = lakeFlinkCatalog;
}
@@ -312,8 +319,12 @@ public class FlinkCatalog extends AbstractCatalog {
objectPath.getDatabaseName(),
tableName.split("\\" +
LAKE_TABLE_SPLITTER)[0])));
}
+
return getLakeTable(
- objectPath.getDatabaseName(), tableName,
tableInfo.getProperties());
+ objectPath.getDatabaseName(),
+ tableName,
+ tableInfo.getProperties(),
+ getLakeCatalogProperties());
} else {
tableInfo = admin.getTableInfo(tablePath).get();
}
@@ -347,7 +358,10 @@ public class FlinkCatalog extends AbstractCatalog {
}
protected CatalogBaseTable getLakeTable(
- String databaseName, String tableName, Configuration properties)
+ String databaseName,
+ String tableName,
+ Configuration properties,
+ Map<String, String> lakeCatalogProperties)
throws TableNotExistException, CatalogException {
String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER);
if (tableComponents.length == 1) {
@@ -359,7 +373,7 @@ public class FlinkCatalog extends AbstractCatalog {
tableName = String.join("", tableComponents);
}
return lakeFlinkCatalog
- .getLakeCatalog(properties)
+ .getLakeCatalog(properties, lakeCatalogProperties)
.getTable(new ObjectPath(databaseName, tableName));
}
@@ -772,4 +786,16 @@ public class FlinkCatalog extends AbstractCatalog {
public Map<String, String> getSecurityConfigs() {
return securityConfigs;
}
+
+ @VisibleForTesting
+ public Map<String, String> getLakeCatalogProperties() {
+ if (lakeCatalogProperties == null) {
+ synchronized (this) {
+ if (lakeCatalogProperties == null) {
+ lakeCatalogProperties =
lakeCatalogPropertiesSupplier.get();
+ }
+ }
+ }
+ return lakeCatalogProperties;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
index 83bd3cd21..d1c9bfc48 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalogFactory.java
@@ -18,12 +18,16 @@
package org.apache.fluss.flink.catalog;
import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +39,15 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String IDENTIFIER = "fluss";
+ public static final List<String> PREFIXES_TO_SKIP_VALIDATE = new
ArrayList<>();
+
+ static {
+ PREFIXES_TO_SKIP_VALIDATE.add(CLIENT_SECURITY_PREFIX);
+ for (DataLakeFormat value : DataLakeFormat.values()) {
+ PREFIXES_TO_SKIP_VALIDATE.add(value.toString());
+ }
+ }
+
@Override
public String factoryIdentifier() {
return IDENTIFIER;
@@ -54,7 +67,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public FlinkCatalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtil.createCatalogFactoryHelper(this, context);
- helper.validateExcept(CLIENT_SECURITY_PREFIX);
+ helper.validateExcept(PREFIXES_TO_SKIP_VALIDATE.toArray(new
String[0]));
Map<String, String> options = context.getOptions();
Map<String, String> securityConfigs = extractPrefix(options,
CLIENT_SECURITY_PREFIX);
@@ -63,6 +76,13 @@ public class FlinkCatalogFactory implements CatalogFactory {
helper.getOptions().get(FlinkCatalogOptions.DEFAULT_DATABASE),
helper.getOptions().get(FlinkConnectorOptions.BOOTSTRAP_SERVERS),
context.getClassLoader(),
- securityConfigs);
+ securityConfigs,
+ () -> {
+ Map<String, String> lakeCatalogProperties = new
HashMap<>();
+ for (DataLakeFormat lakeFormat : DataLakeFormat.values()) {
+ lakeCatalogProperties.putAll(extractPrefix(options,
lakeFormat.toString()));
+ }
+ return lakeCatalogProperties;
+ });
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
index b51776419..6e8b2a5e2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeFlinkCatalog.java
@@ -21,6 +21,7 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.utils.DataLakeUtils;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.utils.PropertiesUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
@@ -49,7 +50,8 @@ public class LakeFlinkCatalog implements AutoCloseable {
this.classLoader = classLoader;
}
- public Catalog getLakeCatalog(Configuration tableOptions) {
+ public Catalog getLakeCatalog(
+ Configuration tableOptions, Map<String, String>
lakeCatalogProperties) {
// TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
// However, in the
// future, it may support multiple DataLakes. The following code
assumes
@@ -69,12 +71,19 @@ public class LakeFlinkCatalog implements AutoCloseable {
+
ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+ "' is set.");
}
+ Map<String, String> catalogProperties =
+ PropertiesUtils.extractAndRemovePrefix(
+ lakeCatalogProperties, lakeFormat + ".");
+
+ catalogProperties.putAll(
+
DataLakeUtils.extractLakeCatalogProperties(tableOptions));
if (lakeFormat == PAIMON) {
catalog =
- PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ PaimonCatalogFactory.create(
+ catalogName, catalogProperties,
classLoader);
this.lakeFormat = PAIMON;
} else if (lakeFormat == ICEBERG) {
- catalog = IcebergCatalogFactory.create(catalogName,
tableOptions);
+ catalog = IcebergCatalogFactory.create(catalogName,
catalogProperties);
this.lakeFormat = ICEBERG;
} else {
throw new UnsupportedOperationException(
@@ -111,9 +120,9 @@ public class LakeFlinkCatalog implements AutoCloseable {
private PaimonCatalogFactory() {}
public static Catalog create(
- String catalogName, Configuration tableOptions, ClassLoader
classLoader) {
- Map<String, String> catalogProperties =
- DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ String catalogName,
+ Map<String, String> catalogProperties,
+ ClassLoader classLoader) {
return FlinkCatalogFactory.createCatalog(
catalogName,
CatalogContext.create(
@@ -131,9 +140,7 @@ public class LakeFlinkCatalog implements AutoCloseable {
// requires Iceberg 1.5.0+.
// Using reflection to maintain Java 8 compatibility.
// Once Fluss drops Java 8, we can remove the reflection code
- public static Catalog create(String catalogName, Configuration
tableOptions) {
- Map<String, String> catalogProperties =
- DataLakeUtils.extractLakeCatalogProperties(tableOptions);
+ public static Catalog create(String catalogName, Map<String, String>
catalogProperties) {
// Map "type" to "catalog-type" (equivalent)
// Required: either "catalog-type" (standard type) or
"catalog-impl"
// (fully-qualified custom class, mandatory if "catalog-type" is
missing)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 3f0ff88c6..ad9918f38 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import java.util.Collections;
+
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
private final LakeFlinkCatalog lakeFlinkCatalog;
@@ -83,7 +85,7 @@ public class LakeTableFactory {
lakeFlinkCatalog.getLakeCatalog(
// we can pass empty configuration to get catalog
// since the catalog should already be initialized
- new Configuration());
+ new Configuration(), Collections.emptyMap());
// Create FlinkDynamicTableFactory with the catalog
Class<?> icebergFactoryClass =
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
index 4a17cc3c8..f47d9c56b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogFactoryTest.java
@@ -64,7 +64,8 @@ abstract class FlinkCatalogFactoryTest {
DB_NAME,
BOOTSTRAP_SERVERS_NAME,
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ Collections.emptyMap(),
+ Collections::emptyMap);
checkEquals(flinkCatalog, actualCatalog);
@@ -75,7 +76,12 @@ abstract class FlinkCatalogFactoryTest {
securityMap.put("client.security.sasl.username", "root");
securityMap.put("client.security.sasl.password", "password");
+ Map<String, String> lakeCatalogMap = new HashMap<>();
+ lakeCatalogMap.put("paimon.jdbc.user", "admin");
+ lakeCatalogMap.put("paimon.jdbc.password", "pass");
+
options.putAll(securityMap);
+ options.putAll(lakeCatalogMap);
FlinkCatalog actualCatalog2 =
(FlinkCatalog)
FactoryUtil.createCatalog(
@@ -85,6 +91,7 @@ abstract class FlinkCatalogFactoryTest {
Thread.currentThread().getContextClassLoader());
assertThat(actualCatalog2.getSecurityConfigs()).isEqualTo(securityMap);
+
assertThat(actualCatalog2.getLakeCatalogProperties()).isEqualTo(lakeCatalogMap);
}
@Test
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index 570d7121a..6e640959f 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -115,7 +115,8 @@ abstract class FlinkCatalogITCase {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ Collections.emptyMap(),
+ Collections::emptyMap);
catalog.open();
}
@@ -769,7 +770,8 @@ abstract class FlinkCatalogITCase {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ Collections.emptyMap(),
+ Collections::emptyMap);
Catalog finalAuthenticateCatalog = authenticateCatalog;
assertThatThrownBy(finalAuthenticateCatalog::open)
.cause()
@@ -787,7 +789,8 @@ abstract class FlinkCatalogITCase {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
- clientConfig);
+ clientConfig,
+ Collections::emptyMap);
authenticateCatalog.open();
assertThat(authenticateCatalog.listDatabases())
.containsExactlyInAnyOrderElementsOf(Collections.singletonList(DEFAULT_DB));
@@ -815,6 +818,20 @@ abstract class FlinkCatalogITCase {
"The configured default-database 'non-exist' does not
exist in the Fluss cluster.");
}
+ @Test
+ void testCreateCatalogWithLakeProperties() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("paimon.jdbc.password", "pass");
+ tEnv.executeSql(
+ String.format(
+ "create catalog test_catalog_with_lake_properties with
('type' = 'fluss', '%s' = '%s', 'paimon.jdbc.password' = 'pass')",
+ BOOTSTRAP_SERVERS.key(),
FLUSS_CLUSTER_EXTENSION.getBootstrapServers()));
+ FlinkCatalog catalog =
+ (FlinkCatalog)
tEnv.getCatalog("test_catalog_with_lake_properties").get();
+
+ assertOptionsEqual(catalog.getLakeCatalogProperties(), properties);
+ }
+
/**
* Before Flink 2.1, the {@link Schema} did not include an index field.
Starting from Flink 2.1,
* Flink introduced the concept of an index, and in Fluss, the primary key
is considered as an
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index c32492c4b..9b915a74d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -172,6 +172,7 @@ class FlinkCatalogTest {
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
Collections.emptyMap(),
+ Collections::emptyMap,
lakeFlinkCatalog);
}
@@ -645,7 +646,8 @@ class FlinkCatalogTest {
",",
flussConf.get(ConfigOptions.BOOTSTRAP_SERVERS)),
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap()))
+ Collections.emptyMap(),
+ Collections::emptyMap))
.hasMessageContaining("defaultDatabase cannot be null or
empty");
}
@@ -833,7 +835,8 @@ class FlinkCatalogTest {
"default",
"invalid-bootstrap-server:9092",
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ Collections.emptyMap(),
+ Collections::emptyMap);
// Test open() throws proper exception
assertThatThrownBy(() -> badCatalog.open())
@@ -963,7 +966,8 @@ class FlinkCatalogTest {
DEFAULT_DB,
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
Thread.currentThread().getContextClassLoader(),
- securityConfigs);
+ securityConfigs,
+ Collections::emptyMap);
securedCatalog.open();
try {
@@ -1006,7 +1010,8 @@ class FlinkCatalogTest {
}
@Override
- public Catalog getLakeCatalog(Configuration tableOptions) {
+ public Catalog getLakeCatalog(
+ Configuration tableOptions, Map<String, String>
lakeCatalogProperties) {
return catalog;
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
index f1305b99d..a6380cc44 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkCatalogLakeTest.java
@@ -102,7 +102,8 @@ class FlinkCatalogLakeTest extends
FlinkIcebergTieringTestBase {
DEFAULT_DB,
bootstrapServers,
Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ Collections.emptyMap(),
+ Collections::emptyMap);
catalog.open();
}
}
diff --git a/website/docs/engine-flink/ddl.md b/website/docs/engine-flink/ddl.md
index a542db143..cc5bfcd97 100644
--- a/website/docs/engine-flink/ddl.md
+++ b/website/docs/engine-flink/ddl.md
@@ -21,13 +21,14 @@ USE CATALOG fluss_catalog;
The following properties can be set if using the Fluss catalog:
-| Option | Required | Default | Description
|
-|--------------------------------|----------|-----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| type | required | (none) | Catalog type, must
be 'fluss' here.
|
-| bootstrap.servers | required | (none) | Comma separated list
of Fluss servers.
|
-| default-database | optional | fluss | The default database
to use when switching to this catalog.
|
-| client.security.protocol | optional | PLAINTEXT | The security
protocol used to communicate with brokers. Currently, only `PLAINTEXT` and
`SASL` are supported, the configuration value is case insensitive.
|
-| `client.security.{protocol}.*` | optional | (none) | Client-side
configuration properties for a specific authentication protocol. E.g.,
client.security.sasl.jaas.config. More Details in
[authentication](../security/authentication.md) |
+| Option | Required | Default | Description
|
+|--------------------------------|----------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| type | required | (none) | Catalog type, must
be 'fluss' here.
|
+| bootstrap.servers | required | (none) | Comma separated list
of Fluss servers.
|
+| default-database | optional | fluss | The default database
to use when switching to this catalog.
|
+| client.security.protocol | optional | PLAINTEXT | The security
protocol used to communicate with brokers. Currently, only `PLAINTEXT` and
`SASL` are supported, the configuration value is case insensitive.
|
+| `client.security.{protocol}.*` | optional | (none) | Client-side
configuration properties for a specific authentication protocol. E.g.,
client.security.sasl.jaas.config. More Details in
[authentication](../security/authentication.md)
|
+| `{lake-format}.*` | optional | (none) | Extra properties to
be passed to the lake catalog. This is useful for configuring sensitive
settings, such as the username and password required for lake catalog
authentication. E.g., `paimon.jdbc.password = pass`. |
The following statements assume that the current catalog has been switched to
the Fluss catalog using the `USE CATALOG <catalog_name>` statement.