This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new dbf6f61d7 [server] Tablet server should register
lakeCatalogDynamicLoader in dynamicConfigManager (#1879)
dbf6f61d7 is described below
commit dbf6f61d77ff38a069785a00da16c9c907d8f641
Author: Hongshun Wang <[email protected]>
AuthorDate: Thu Oct 30 14:14:27 2025 +0800
[server] Tablet server should register lakeCatalogDynamicLoader in
dynamicConfigManager (#1879)
---
.../fluss/client/admin/FlussAdminITCase.java | 44 ++++++++++++++++------
.../apache/fluss/server/tablet/TabletServer.java | 1 +
2 files changed, 33 insertions(+), 12 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 860d63f36..8e2b6a329 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -86,9 +86,12 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1070,7 +1073,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
}
@Test
- void testDynamicConfigs() throws ExecutionException, InterruptedException {
+ void testDynamicConfigs() throws Exception {
assertThat(
FLUSS_CLUSTER_EXTENSION
.getCoordinatorServer()
@@ -1092,20 +1095,37 @@ class FlussAdminITCase extends ClientToServerITCaseBase
{
assertConfigEntry(
DATALAKE_FORMAT.key(), null,
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
- // Delete dynamic configs to use the initial value(from server.yaml)
admin.alterClusterConfigs(
- Collections.singletonList(
+ Arrays.asList(
+ new AlterConfig(
+ DATALAKE_FORMAT.key(), "paimon",
AlterConfigOpType.SET),
new AlterConfig(
- DATALAKE_FORMAT.key(), null,
AlterConfigOpType.DELETE)))
+ "datalake.paimon.warehouse",
+ "test-warehouse",
+ AlterConfigOpType.SET)))
.get();
- assertThat(
- FLUSS_CLUSTER_EXTENSION
- .getCoordinatorServer()
- .getCoordinatorService()
- .getDataLakeFormat())
- .isEqualTo(PAIMON);
- assertConfigEntry(
- DATALAKE_FORMAT.key(), "paimon",
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
+ TablePath tablePath = TablePath.of("test_db", "test_table");
+ createTable(
+ tablePath,
+ TableDescriptor.builder()
+ .schema(DEFAULT_SCHEMA)
+ .property(TABLE_DATALAKE_ENABLED, true)
+ .build(),
+ true);
+
+ waitUntil(
+ () -> {
+ TableInfo tableInfo = admin.getTableInfo(tablePath).get();
+ Map<String, String> tableProperties =
tableInfo.getProperties().toMap();
+ return
tableProperties.containsKey(TABLE_DATALAKE_FORMAT.key())
+ &&
tableProperties.containsKey("table.datalake.paimon.warehouse")
+ && PAIMON.toString()
+
.equals(tableProperties.get(TABLE_DATALAKE_FORMAT.key()))
+ && "test-warehouse"
+
.equals(tableProperties.get("table.datalake.paimon.warehouse"));
+ },
+ Duration.ofMinutes(1),
+ "Get lakehouse info");
}
private void assertConfigEntry(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 7c02f993c..eef925b50 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -203,6 +203,7 @@ public class TabletServer extends ServerBase {
MetadataManager metadataManager =
new MetadataManager(zkClient, conf,
lakeCatalogDynamicLoader);
this.dynamicConfigManager = new DynamicConfigManager(zkClient,
conf, false);
+ dynamicConfigManager.register(lakeCatalogDynamicLoader);
dynamicConfigManager.startup();
this.metadataCache = new
TabletServerMetadataCache(metadataManager);