This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 14f0f1283 [server] Support alter database comment and custom 
properties (#1172)
14f0f1283 is described below

commit 14f0f1283ec8f45aa1ac532dc98db322c5e9a269
Author: Liebing <[email protected]>
AuthorDate: Thu Mar 19 11:07:27 2026 +0800

    [server] Support alter database comment and custom properties (#1172)
---
 .../java/org/apache/fluss/client/admin/Admin.java  |  19 +++
 .../org/apache/fluss/client/admin/FlussAdmin.java  |  12 ++
 .../fluss/client/utils/ClientRpcMessageUtils.java  |  43 +++++
 .../fluss/client/admin/FlussAdminITCase.java       |  87 ++++++++++
 .../org/apache/fluss/metadata/DatabaseChange.java  | 178 +++++++++++++++++++++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  51 +++++-
 .../fluss/flink/catalog/FlinkCatalogITCase.java    |  32 ++++
 .../fluss/flink/catalog/FlinkCatalogTest.java      |  76 ++++++++-
 .../flink/sink/testutils/TestAdminAdapter.java     |   7 +
 .../org/apache/fluss/rpc/gateway/AdminGateway.java |  10 ++
 .../org/apache/fluss/rpc/protocol/ApiKeys.java     |   3 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |  11 ++
 .../server/coordinator/CoordinatorService.java     |  47 ++++++
 .../fluss/server/coordinator/MetadataManager.java  |  94 ++++++++++-
 .../server/entity/DatabasePropertyChanges.java     |  72 +++++++++
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  34 +++-
 .../apache/fluss/server/zk/ZooKeeperClient.java    |   7 +
 .../fluss/server/zk/data/DatabaseRegistration.java |   8 +
 .../server/coordinator/TestCoordinatorGateway.java |   7 +
 19 files changed, 786 insertions(+), 12 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index d8fe57660..ea28a246b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -53,6 +53,7 @@ import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.exception.TooManyPartitionsException;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.DatabaseSummary;
@@ -137,6 +138,24 @@ public interface Admin extends AutoCloseable {
     CompletableFuture<Void> createDatabase(
             String databaseName, DatabaseDescriptor databaseDescriptor, 
boolean ignoreIfExists);
 
+    /**
+     * Alter a database with the given {@code databaseChanges}.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future.
+     *
+     * <ul>
+     *   <li>{@link DatabaseNotExistException} when the database does not 
exist and {@code
+     *       ignoreIfNotExists} is false.
+     * </ul>
+     *
+     * @param databaseName The name of the database.
+     * @param databaseChanges The database changes.
+     * @param ignoreIfNotExists if it is true, do nothing if database does not 
exist. If false,
+     *     throw a {@link DatabaseNotExistException}.
+     */
+    CompletableFuture<Void> alterDatabase(
+            String databaseName, List<DatabaseChange> databaseChanges, boolean 
ignoreIfNotExists);
+
     /**
      * Get the database with the given database name asynchronously.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 8f39a6e42..e72b96ef8 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -32,6 +32,7 @@ import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.DatabaseSummary;
@@ -53,6 +54,7 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.AddServerTagRequest;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
 import org.apache.fluss.rpc.messages.CreateAclsRequest;
@@ -109,6 +111,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterDatabaseRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -283,6 +286,15 @@ public class FlussAdmin implements Admin {
         return gateway.alterTable(request).thenApply(r -> null);
     }
 
+    @Override
+    public CompletableFuture<Void> alterDatabase(
+            String databaseName, List<DatabaseChange> databaseChanges, boolean 
ignoreIfNotExists) {
+        TablePath.validateDatabaseName(databaseName);
+        AlterDatabaseRequest request =
+                makeAlterDatabaseRequest(databaseName, databaseChanges, 
ignoreIfNotExists);
+        return gateway.alterDatabase(request).thenApply(r -> null);
+    }
+
     @Override
     public CompletableFuture<TableInfo> getTableInfo(TablePath tablePath) {
         GetTableInfoRequest request = new GetTableInfoRequest();
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index a7dbec47c..4c2e236c5 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -37,6 +37,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.FsPathAndFileName;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
@@ -46,6 +47,7 @@ import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.CreatePartitionRequest;
 import org.apache.fluss.rpc.messages.DropPartitionRequest;
@@ -420,6 +422,47 @@ public class ClientRpcMessageUtils {
         return request;
     }
 
+    public static AlterDatabaseRequest makeAlterDatabaseRequest(
+            String databaseName, List<DatabaseChange> databaseChanges, boolean 
ignoreIfNotExists) {
+        AlterDatabaseRequest request = new AlterDatabaseRequest();
+
+        List<PbAlterConfig> pbDatabaseChanges = new 
ArrayList<>(databaseChanges.size());
+        String comment = null;
+        for (DatabaseChange databaseChange : databaseChanges) {
+            PbAlterConfig info = new PbAlterConfig();
+            if (databaseChange instanceof DatabaseChange.SetOption) {
+                DatabaseChange.SetOption setOption = 
(DatabaseChange.SetOption) databaseChange;
+                info.setConfigKey(setOption.getKey());
+                info.setConfigValue(setOption.getValue());
+                info.setOpType(AlterConfigOpType.SET.value());
+                pbDatabaseChanges.add(info);
+            } else if (databaseChange instanceof DatabaseChange.ResetOption) {
+                DatabaseChange.ResetOption resetOption =
+                        (DatabaseChange.ResetOption) databaseChange;
+                info.setConfigKey(resetOption.getKey());
+                info.setOpType(AlterConfigOpType.DELETE.value());
+                pbDatabaseChanges.add(info);
+            } else if (databaseChange instanceof DatabaseChange.UpdateComment) 
{
+                DatabaseChange.UpdateComment updateComment =
+                        (DatabaseChange.UpdateComment) databaseChange;
+                comment = updateComment.getComment();
+            } else {
+                throw new IllegalArgumentException(
+                        "Unsupported database change: " + 
databaseChange.getClass());
+            }
+        }
+
+        request.addAllConfigChanges(pbDatabaseChanges)
+                .setDatabaseName(databaseName)
+                .setIgnoreIfNotExists(ignoreIfNotExists);
+
+        if (comment != null) {
+            request.setComment(comment);
+        }
+
+        return request;
+    }
+
     public static AcquireKvSnapshotLeaseRequest 
makeAcquireKvSnapshotLeaseRequest(
             String leaseId, Map<TableBucket, Long> snapshotIds, long 
leaseDuration) {
         AcquireKvSnapshotLeaseRequest request = new 
AcquireKvSnapshotLeaseRequest();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index f8ae1dbf6..ad3bfd775 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -56,6 +56,7 @@ import org.apache.fluss.exception.TooManyPartitionsException;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.FsPathAndFileName;
 import org.apache.fluss.metadata.AggFunctions;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.DatabaseSummary;
@@ -181,6 +182,92 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isBetween(timestampBeforeCreate, timestampAfterCreate);
     }
 
+    @Test
+    void testAlterDatabase() throws Exception {
+        // create database
+        String dbName = "test_alter_db";
+        admin.createDatabase(
+                        dbName,
+                        DatabaseDescriptor.builder()
+                                .comment("original comment")
+                                .customProperty("key1", "value1")
+                                .customProperty("key2", "value2")
+                                .build(),
+                        false)
+                .get();
+
+        DatabaseInfo databaseInfo = admin.getDatabaseInfo(dbName).get();
+        DatabaseDescriptor existingDescriptor = 
databaseInfo.getDatabaseDescriptor();
+
+        // Verify initial state
+        assertThat(existingDescriptor.getComment().get()).isEqualTo("original 
comment");
+        
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key1", 
"value1");
+        
assertThat(existingDescriptor.getCustomProperties()).containsEntry("key2", 
"value2");
+
+        // Alter database: add and modify custom properties
+        List<DatabaseChange> databaseChanges = new ArrayList<>();
+        databaseChanges.add(DatabaseChange.set("key3", "value3"));
+        databaseChanges.add(DatabaseChange.set("key1", "updated_value1"));
+        databaseChanges.add(DatabaseChange.updateComment("updated comment"));
+        admin.alterDatabase(dbName, databaseChanges, false).get();
+
+        // Verify alterations
+        DatabaseInfo alteredDatabaseInfo = admin.getDatabaseInfo(dbName).get();
+        DatabaseDescriptor alteredDescriptor = 
alteredDatabaseInfo.getDatabaseDescriptor();
+        assertThat(alteredDescriptor.getComment().get()).isEqualTo("updated 
comment");
+        
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key1", 
"updated_value1");
+        
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key2", 
"value2");
+        
assertThat(alteredDescriptor.getCustomProperties()).containsEntry("key3", 
"value3");
+        assertThat(alteredDescriptor.getCustomProperties()).hasSize(3);
+
+        // Alter database: reset a property
+        databaseChanges = new ArrayList<>();
+        databaseChanges.add(DatabaseChange.reset("key2"));
+        admin.alterDatabase(dbName, databaseChanges, false).get();
+
+        // Verify reset
+        DatabaseInfo resetDatabaseInfo = admin.getDatabaseInfo(dbName).get();
+        DatabaseDescriptor resetDescriptor = 
resetDatabaseInfo.getDatabaseDescriptor();
+        assertThat(resetDescriptor.getComment().get()).isEqualTo("updated 
comment");
+        
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key1", 
"updated_value1");
+        
assertThat(resetDescriptor.getCustomProperties()).containsEntry("key3", 
"value3");
+        
assertThat(resetDescriptor.getCustomProperties()).doesNotContainKey("key2");
+        assertThat(resetDescriptor.getCustomProperties()).hasSize(2);
+
+        // Alter database: reset comment
+        databaseChanges = new ArrayList<>();
+        // Empty string means reset comment
+        databaseChanges.add(DatabaseChange.updateComment(""));
+        admin.alterDatabase(dbName, databaseChanges, false).get();
+
+        // Verify reset
+        DatabaseInfo resetCommentDatabaseInfo = 
admin.getDatabaseInfo(dbName).get();
+        DatabaseDescriptor resetCommentDescriptor =
+                resetCommentDatabaseInfo.getDatabaseDescriptor();
+        assertThat(resetCommentDescriptor.getComment()).isEmpty();
+        assertThat(resetCommentDescriptor.getCustomProperties())
+                .containsEntry("key1", "updated_value1");
+        
assertThat(resetCommentDescriptor.getCustomProperties()).containsEntry("key3", 
"value3");
+        
assertThat(resetCommentDescriptor.getCustomProperties()).doesNotContainKey("key2");
+        assertThat(resetCommentDescriptor.getCustomProperties()).hasSize(2);
+
+        // throw exception if database not exist
+        List<DatabaseChange> finalDatabaseChanges = databaseChanges;
+        assertThatThrownBy(
+                        () ->
+                                admin.alterDatabase(
+                                                "test_alter_db_not_exist",
+                                                finalDatabaseChanges,
+                                                false)
+                                        .get())
+                .cause()
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessage(String.format("Database %s not exists.", 
"test_alter_db_not_exist"));
+
+        // should success if ignore not exist
+        admin.alterDatabase("test_alter_db_not_exist", databaseChanges, 
true).get();
+    }
+
     @Test
     void testGetTableInfoAndSchema() throws Exception {
         SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java
new file mode 100644
index 000000000..a228e613b
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/DatabaseChange.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** {@link DatabaseChange} represents the modification of the Fluss Database. 
*/
+public interface DatabaseChange {
+
+    static SetOption set(String key, String value) {
+        return new SetOption(key, value);
+    }
+
+    static ResetOption reset(String key) {
+        return new ResetOption(key);
+    }
+
+    static UpdateComment updateComment(String comment) {
+        return new UpdateComment(comment);
+    }
+
+    /**
+     * A database change to set the database option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER DATABASE &lt;database_name&gt; SET '&lt;key&gt;' = 
'&lt;value&gt;';
+     * </pre>
+     */
+    class SetOption implements DatabaseChange {
+
+        private final String key;
+        private final String value;
+
+        private SetOption(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        /** Returns the Option key to set. */
+        public String getKey() {
+            return key;
+        }
+
+        /** Returns the Option value to set. */
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof SetOption)) {
+                return false;
+            }
+            SetOption setOption = (SetOption) o;
+            return Objects.equals(key, setOption.key) && Objects.equals(value, 
setOption.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+
+        @Override
+        public String toString() {
+            return "SetOption{" + "key='" + key + '\'' + ", value='" + value + 
'\'' + '}';
+        }
+    }
+
+    /**
+     * A database change to reset the database option.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER DATABASE &lt;database_name&gt; RESET '&lt;key&gt;'
+     * </pre>
+     */
+    class ResetOption implements DatabaseChange {
+
+        private final String key;
+
+        public ResetOption(String key) {
+            this.key = key;
+        }
+
+        /** Returns the Option key to reset. */
+        public String getKey() {
+            return key;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ResetOption)) {
+                return false;
+            }
+            ResetOption that = (ResetOption) o;
+            return Objects.equals(key, that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key);
+        }
+
+        @Override
+        public String toString() {
+            return "ResetOption{" + "key='" + key + '\'' + '}';
+        }
+    }
+
+    /**
+     * A database change to set the database comment.
+     *
+     * <p>It is equal to the following statement:
+     *
+     * <pre>
+     *    ALTER DATABASE &lt;database_name&gt; SET COMMENT '&lt;comment&gt;';
+     * </pre>
+     */
+    class UpdateComment implements DatabaseChange {
+
+        private final String comment;
+
+        private UpdateComment(String comment) {
+            this.comment = comment;
+        }
+
+        /** Returns the comment to set. */
+        public String getComment() {
+            return comment;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof UpdateComment)) {
+                return false;
+            }
+            UpdateComment that = (UpdateComment) o;
+            return Objects.equals(comment, that.comment);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(comment);
+        }
+
+        @Override
+        public String toString() {
+            return "UpdateComment{" + "comment='" + comment + '\'' + '}';
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 5d81c5e5e..3cb46aa78 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -29,6 +29,7 @@ import org.apache.fluss.flink.lake.LakeFlinkCatalog;
 import org.apache.fluss.flink.procedure.ProcedureManager;
 import org.apache.fluss.flink.utils.CatalogExceptionUtils;
 import org.apache.fluss.flink.utils.FlinkConversions;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.PartitionSpec;
@@ -278,9 +279,55 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     @Override
-    public void alterDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean b)
+    public void alterDatabase(
+            String databaseName, CatalogDatabase catalogDatabase, boolean 
ignoreIfNotExists)
             throws DatabaseNotExistException, CatalogException {
-        throw new UnsupportedOperationException();
+        try {
+            // Get current database info
+            DatabaseDescriptor currentDescriptor =
+                    
admin.getDatabaseInfo(databaseName).get().getDatabaseDescriptor();
+
+            List<DatabaseChange> databaseChanges = new ArrayList<>();
+
+            // Check comment changes
+            String oldComment = currentDescriptor.getComment().orElse(null);
+            String newComment = catalogDatabase.getComment();
+            if (!Objects.equals(oldComment, newComment)) {
+                databaseChanges.add(DatabaseChange.updateComment(newComment));
+            }
+
+            // Check custom properties changes
+            Map<String, String> oldProps = 
currentDescriptor.getCustomProperties();
+            Map<String, String> newProps = catalogDatabase.getProperties();
+
+            newProps.forEach(
+                    (k, v) -> {
+                        if (!oldProps.containsKey(k) || 
!oldProps.get(k).equals(v)) {
+                            databaseChanges.add(DatabaseChange.set(k, v));
+                        }
+                    });
+
+            oldProps.keySet()
+                    .forEach(
+                            (k) -> {
+                                if (!newProps.containsKey(k)) {
+                                    
databaseChanges.add(DatabaseChange.reset(k));
+                                }
+                            });
+
+            admin.alterDatabase(databaseName, databaseChanges, 
ignoreIfNotExists).get();
+        } catch (Exception e) {
+            Throwable t = ExceptionUtils.stripExecutionException(e);
+            if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
+                if (!ignoreIfNotExists) {
+                    throw new DatabaseNotExistException(getName(), 
databaseName);
+                }
+            } else {
+                throw new CatalogException(
+                        String.format("Failed to alter database %s in %s", 
databaseName, getName()),
+                        t);
+            }
+        }
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index af1e2eaaa..8f3ed9d3c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -691,6 +692,37 @@ abstract class FlinkCatalogITCase {
         assertThat(databases.toString()).isEqualTo(String.format("[+I[%s]]", 
DEFAULT_DB));
     }
 
+    @Test
+    void testAlterDatabase() throws Exception {
+        String dbName = "test_alter_db";
+        // Create database with initial properties
+        tEnv.executeSql(
+                String.format(
+                        "create database %s comment 'initial comment' with 
('key1' = 'value1', 'key2' = 'value2')",
+                        dbName));
+
+        // Verify initial state
+        CatalogDatabase currentDb = catalog.getDatabase(dbName);
+        assertThat(currentDb.getProperties()).containsEntry("key1", "value1");
+        assertThat(currentDb.getProperties()).containsEntry("key2", "value2");
+        assertThat(currentDb.getComment()).isEqualTo("initial comment");
+
+        // Alter database: add new property and update existing property
+        String alterSql1 =
+                "alter database " + dbName + " set ('key3' = 'value3', 'key1' 
= 'updated_value1')";
+        tEnv.executeSql(alterSql1);
+
+        // Verify first alteration
+        CatalogDatabase alteredDb1 = catalog.getDatabase(dbName);
+        assertThat(alteredDb1.getProperties()).containsEntry("key1", 
"updated_value1");
+        assertThat(alteredDb1.getProperties()).containsEntry("key2", "value2");
+        assertThat(alteredDb1.getProperties()).containsEntry("key3", "value3");
+        assertThat(alteredDb1.getComment()).isEqualTo("initial comment");
+
+        // Drop database for cleanup
+        tEnv.executeSql("drop database " + dbName);
+    }
+
     @Test
     void testFactoryCannotFindForCreateTemporaryTable() {
         // create fluss temporary table is not supported
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 775182b5d..0c8d91918 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -636,9 +636,83 @@ class FlinkCatalogTest {
                 .isInstanceOf(DatabaseNotExistException.class)
                 .hasMessage("Database %s does not exist in Catalog %s.", 
"unknown", CATALOG_NAME);
         assertThatThrownBy(() -> catalog.alterDatabase("db2", null, false))
-                .isInstanceOf(UnsupportedOperationException.class);
+                .isInstanceOf(DatabaseNotExistException.class);
         assertThat(catalog.getDefaultDatabase()).isEqualTo(DEFAULT_DB);
+    }
+
+    @Test
+    void testAlterDatabase() throws Exception {
+        // Create database with initial properties
+        String dbName = "test_alter_db";
+        Map<String, String> initialProps = new HashMap<>();
+        initialProps.put("key1", "value1");
+        initialProps.put("key2", "value2");
+
+        catalog.createDatabase(dbName, new CatalogDatabaseImpl(initialProps, 
null), false);
+
+        // Verify initial state
+        CatalogDatabase currentDb = catalog.getDatabase(dbName);
+        assertThat(currentDb.getProperties()).containsEntry("key1", "value1");
+        assertThat(currentDb.getProperties()).containsEntry("key2", "value2");
+        assertThat(currentDb.getComment()).isNull();
+
+        // Alter database: add new property and update existing property
+        Map<String, String> newProps1 = new 
HashMap<>(currentDb.getProperties());
+        newProps1.put("key3", "value3");
+        newProps1.put("key1", "updated_value1");
+
+        CatalogDatabase newDb1 = new CatalogDatabaseImpl(newProps1, null);
+        catalog.alterDatabase(dbName, newDb1, false);
+
+        // Verify first alteration
+        CatalogDatabase alteredDb1 = catalog.getDatabase(dbName);
+        assertThat(alteredDb1.getProperties()).containsEntry("key1", 
"updated_value1");
+        assertThat(alteredDb1.getProperties()).containsEntry("key2", "value2");
+        assertThat(alteredDb1.getProperties()).containsEntry("key3", "value3");
+        assertThat(alteredDb1.getComment()).isNull();
+
+        // Alter database: add comment
+        Map<String, String> newProps2 = new 
HashMap<>(alteredDb1.getProperties());
+        CatalogDatabase newDb2 = new CatalogDatabaseImpl(newProps2, "test 
comment");
+        catalog.alterDatabase(dbName, newDb2, false);
+
+        // Verify comment change
+        CatalogDatabase alteredDb2 = catalog.getDatabase(dbName);
+        assertThat(alteredDb2.getProperties()).containsEntry("key1", 
"updated_value1");
+        assertThat(alteredDb2.getProperties()).containsEntry("key2", "value2");
+        assertThat(alteredDb2.getProperties()).containsEntry("key3", "value3");
+        assertThat(alteredDb2.getComment()).isEqualTo("test comment");
+
+        // Alter database: reset a property (remove key2)
+        Map<String, String> newProps3 = new HashMap<>();
+        newProps3.put("key1", "updated_value1");
+        newProps3.put("key3", "value3");
+
+        CatalogDatabase newDb3 = new CatalogDatabaseImpl(newProps3, "test 
comment");
+        catalog.alterDatabase(dbName, newDb3, false);
+
+        // Verify reset
+        CatalogDatabase alteredDb3 = catalog.getDatabase(dbName);
+        assertThat(alteredDb3.getProperties()).containsEntry("key1", 
"updated_value1");
+        assertThat(alteredDb3.getProperties()).containsEntry("key3", "value3");
+        assertThat(alteredDb3.getProperties()).doesNotContainKey("key2");
+        assertThat(alteredDb3.getComment()).isEqualTo("test comment");
+
+        // Test database not exist
+        assertThatThrownBy(() -> catalog.alterDatabase("non_exist_db", newDb3, 
false))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessage(
+                        "Database %s does not exist in Catalog %s.", 
"non_exist_db", CATALOG_NAME);
 
+        // Test with ignoreIfNotExists = true
+        catalog.alterDatabase("non_exist_db", newDb3, true);
+
+        // Clean up
+        catalog.dropDatabase(dbName, false, true);
+    }
+
+    @Test
+    void testCatalogWithNullDefaultDatabase() throws Exception {
         // Test catalog with null default database
         Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
         assertThatThrownBy(
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
index c8dbc0dc0..708226545 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
@@ -34,6 +34,7 @@ import org.apache.fluss.cluster.rebalance.RebalanceProgress;
 import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.ConfigEntry;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.DatabaseSummary;
@@ -100,6 +101,12 @@ public class TestAdminAdapter implements Admin {
         throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
     }
 
+    @Override
+    public CompletableFuture<Void> alterDatabase(
+            String databaseName, List<DatabaseChange> databaseChanges, boolean 
ignoreIfNotExists) {
+        throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
+    }
+
     @Override
     public CompletableFuture<DatabaseInfo> getDatabaseInfo(String 
databaseName) {
         throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
index d8ef3fde6..21d286aa4 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java
@@ -23,6 +23,8 @@ import org.apache.fluss.rpc.messages.AddServerTagRequest;
 import org.apache.fluss.rpc.messages.AddServerTagResponse;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.AlterTableResponse;
 import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
@@ -74,6 +76,14 @@ public interface AdminGateway extends AdminReadOnlyGateway {
     @RPC(api = ApiKeys.CREATE_DATABASE)
     CompletableFuture<CreateDatabaseResponse> 
createDatabase(CreateDatabaseRequest request);
 
+    /**
+     * Alter a database.
+     *
+     * @param request the request to alter a database.
+     */
+    @RPC(api = ApiKeys.ALTER_DATABASE)
+    CompletableFuture<AlterDatabaseResponse> 
alterDatabase(AlterDatabaseRequest request);
+
     /**
      * Drop a database.
      *
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index f3fadec64..605967f67 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -101,7 +101,8 @@ public enum ApiKeys {
     ACQUIRE_KV_SNAPSHOT_LEASE(1056, 0, 0, PUBLIC),
     RELEASE_KV_SNAPSHOT_LEASE(1057, 0, 0, PUBLIC),
     DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC),
-    GET_TABLE_STATS(1059, 0, 0, PUBLIC);
+    GET_TABLE_STATS(1059, 0, 0, PUBLIC),
+    ALTER_DATABASE(1060, 0, 0, PUBLIC);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 20b699fb1..db78f65fb 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -64,6 +64,17 @@ message CreateDatabaseRequest {
 message CreateDatabaseResponse {
 }
 
+// alter database request and response
+message AlterDatabaseRequest {
+  required string database_name = 1;
+  required bool ignore_if_not_exists = 2;
+  repeated PbAlterConfig config_changes = 3;
+  optional string comment = 4;
+}
+
+message AlterDatabaseResponse {
+}
+
 // get table request and response
 message GetDatabaseInfoRequest {
   required string database_name = 1;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7c6040d8c..83016cb52 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -44,6 +44,7 @@ import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DeleteBehavior;
 import org.apache.fluss.metadata.MergeEngineType;
@@ -63,6 +64,8 @@ import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.AlterTableResponse;
 import org.apache.fluss.rpc.messages.CancelRebalanceRequest;
@@ -152,6 +155,7 @@ import 
org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager;
 import org.apache.fluss.server.coordinator.producer.ProducerOffsetsManager;
 import org.apache.fluss.server.coordinator.rebalance.goal.Goal;
 import org.apache.fluss.server.entity.CommitKvSnapshotData;
+import org.apache.fluss.server.entity.DatabasePropertyChanges;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
 import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
@@ -209,6 +213,7 @@ import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeCreateAcls
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeDropAclsResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableConfigChanges;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toAlterTableSchemaChanges;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toDatabaseChanges;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucketOffsets;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
 import static 
org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment;
@@ -365,6 +370,48 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
+    @Override
+    public CompletableFuture<AlterDatabaseResponse> 
alterDatabase(AlterDatabaseRequest request) {
+        String databaseName = request.getDatabaseName();
+        if (authorizer != null) {
+            authorizer.authorize(
+                    currentSession(), OperationType.ALTER, 
Resource.database(databaseName));
+        }
+
+        List<DatabaseChange> databaseChanges = toDatabaseChanges(request);
+        DatabasePropertyChanges databasePropertyChanges =
+                toDatabasePropertyChanges(databaseChanges);
+
+        metadataManager.alterDatabaseProperties(
+                databaseName, databasePropertyChanges, 
request.isIgnoreIfNotExists());
+
+        return CompletableFuture.completedFuture(new AlterDatabaseResponse());
+    }
+
+    private DatabasePropertyChanges toDatabasePropertyChanges(
+            List<DatabaseChange> databaseChanges) {
+        DatabasePropertyChanges.Builder builder = 
DatabasePropertyChanges.builder();
+        if (databaseChanges.isEmpty()) {
+            return builder.build();
+        }
+
+        for (DatabaseChange databaseChange : databaseChanges) {
+            if (databaseChange instanceof DatabaseChange.SetOption) {
+                DatabaseChange.SetOption setOption = 
(DatabaseChange.SetOption) databaseChange;
+                builder.setCustomProperty(setOption.getKey(), 
setOption.getValue());
+            } else if (databaseChange instanceof DatabaseChange.ResetOption) {
+                DatabaseChange.ResetOption resetOption =
+                        (DatabaseChange.ResetOption) databaseChange;
+                builder.resetCustomProperty(resetOption.getKey());
+            } else if (databaseChange instanceof DatabaseChange.UpdateComment) 
{
+                DatabaseChange.UpdateComment updateComment =
+                        (DatabaseChange.UpdateComment) databaseChange;
+                builder.setComment(updateComment.getComment());
+            }
+        }
+        return builder.build();
+    }
+
     @Override
     public CompletableFuture<DropDatabaseResponse> 
dropDatabase(DropDatabaseRequest request) {
         authorizeDatabase(OperationType.DROP, request.getDatabaseName());
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index cc3808d87..43029c3fc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -47,6 +47,7 @@ import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePartition;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.security.acl.FlussPrincipal;
+import org.apache.fluss.server.entity.DatabasePropertyChanges;
 import org.apache.fluss.server.entity.TablePropertyChanges;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.DatabaseRegistration;
@@ -136,8 +137,93 @@ public class MetadataManager {
         }
     }
 
+    public void alterDatabaseProperties(
+            String databaseName,
+            DatabasePropertyChanges databasePropertyChanges,
+            boolean ignoreIfNotExists) {
+        try {
+            // Check if database exists
+            if (!databaseExists(databaseName)) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw new DatabaseNotExistException("Database " + databaseName 
+ " not exists.");
+            }
+
+            DatabaseRegistration databaseRegistration = 
getDatabaseRegistration(databaseName);
+            DatabaseDescriptor currentDescriptor = 
databaseRegistration.toDatabaseDescriptor();
+
+            // Create updated descriptor
+            DatabaseDescriptor newDescriptor =
+                    getUpdatedDatabaseDescriptor(currentDescriptor, 
databasePropertyChanges);
+
+            if (newDescriptor != null) {
+                // Update the database in ZooKeeper
+                DatabaseRegistration updatedRegistration =
+                        databaseRegistration.newProperties(newDescriptor);
+                zookeeperClient.updateDatabase(databaseName, 
updatedRegistration);
+                LOG.info("Successfully altered database properties for 
database: {}", databaseName);
+            } else {
+                LOG.info(
+                        "No properties changed when alter database {}, skip 
update.", databaseName);
+            }
+        } catch (Exception e) {
+            if (e instanceof DatabaseNotExistException) {
+                if (ignoreIfNotExists) {
+                    return;
+                }
+                throw (DatabaseNotExistException) e;
+            } else if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            } else {
+                throw new FlussRuntimeException("Failed to alter database: " + 
databaseName, e);
+            }
+        }
+    }
+
+    @Nullable
+    private DatabaseDescriptor getUpdatedDatabaseDescriptor(
+            DatabaseDescriptor currentDescriptor, DatabasePropertyChanges 
changes) {
+        Map<String, String> newCustomProperties =
+                new HashMap<>(currentDescriptor.getCustomProperties());
+        // set properties
+        newCustomProperties.putAll(changes.customPropertiesToSet);
+        // reset properties
+        
newCustomProperties.keySet().removeAll(changes.customPropertiesToReset);
+
+        if (newCustomProperties.equals(currentDescriptor.getCustomProperties())
+                && changes.commentToSet == null) {
+            return null;
+        }
+
+        String newComment;
+        if (changes.commentToSet != null) {
+            // If comment is set to empty string, it means to reset the comment
+            if (changes.commentToSet.isEmpty()) {
+                newComment = null;
+            } else {
+                newComment = changes.commentToSet;
+            }
+        } else {
+            newComment = currentDescriptor.getComment().orElse(null);
+        }
+
+        return DatabaseDescriptor.builder()
+                .customProperties(newCustomProperties)
+                .comment(newComment)
+                .build();
+    }
+
     public DatabaseInfo getDatabase(String databaseName) throws 
DatabaseNotExistException {
+        DatabaseRegistration databaseReg = 
getDatabaseRegistration(databaseName);
+        return new DatabaseInfo(
+                databaseName,
+                databaseReg.toDatabaseDescriptor(),
+                databaseReg.createdTime,
+                databaseReg.modifiedTime);
+    }
 
+    public DatabaseRegistration getDatabaseRegistration(String databaseName) {
         Optional<DatabaseRegistration> optionalDB;
         try {
             optionalDB = zookeeperClient.getDatabase(databaseName);
@@ -149,13 +235,7 @@ public class MetadataManager {
         if (!optionalDB.isPresent()) {
             throw new DatabaseNotExistException("Database '" + databaseName + 
"' does not exist.");
         }
-
-        DatabaseRegistration databaseReg = optionalDB.get();
-        return new DatabaseInfo(
-                databaseName,
-                databaseReg.toDatabaseDescriptor(),
-                databaseReg.createdTime,
-                databaseReg.modifiedTime);
+        return optionalDB.get();
     }
 
     public boolean databaseExists(String databaseName) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
new file mode 100644
index 000000000..01fe96ec7
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/entity/DatabasePropertyChanges.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.entity;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** To describe the changes of the properties of a database. */
+public class DatabasePropertyChanges {
+
+    public final Map<String, String> customPropertiesToSet;
+    public final Set<String> customPropertiesToReset;
+
+    public final String commentToSet;
+
+    protected DatabasePropertyChanges(
+            Map<String, String> customPropertiesToSet,
+            Set<String> customPropertiesToReset,
+            @Nullable String commentToSet) {
+        this.customPropertiesToSet = customPropertiesToSet;
+        this.customPropertiesToReset = customPropertiesToReset;
+        this.commentToSet = commentToSet;
+    }
+
+    public static DatabasePropertyChanges.Builder builder() {
+        return new DatabasePropertyChanges.Builder();
+    }
+
+    /** The builder for {@link DatabasePropertyChanges}. */
+    public static class Builder {
+        private final Map<String, String> customPropertiesToSet = new 
HashMap<>();
+        private final Set<String> customPropertiesToReset = new HashSet<>();
+
+        private String commentToSet = null;
+
+        public void setCustomProperty(String key, String value) {
+            customPropertiesToSet.put(key, value);
+        }
+
+        public void resetCustomProperty(String key) {
+            customPropertiesToReset.add(key);
+        }
+
+        public void setComment(String comment) {
+            this.commentToSet = comment;
+        }
+
+        public DatabasePropertyChanges build() {
+            return new DatabasePropertyChanges(
+                    customPropertiesToSet, customPropertiesToReset, 
commentToSet);
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 6f5672ff4..bb48bbec1 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -30,6 +30,7 @@ import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.fs.token.ObtainedSecurityToken;
 import org.apache.fluss.lake.committer.LakeCommitResult;
+import org.apache.fluss.metadata.DatabaseChange;
 import org.apache.fluss.metadata.DatabaseSummary;
 import org.apache.fluss.metadata.PartitionSpec;
 import org.apache.fluss.metadata.PhysicalTablePath;
@@ -62,6 +63,7 @@ import 
org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest;
 import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -292,7 +294,7 @@ public class ServerRpcMessageUtils {
             case SUBTRACT:
             default:
                 throw new IllegalArgumentException(
-                        "Unsupported alter configs op type " + 
pbAlterConfig.getOpType());
+                        "Unsupported alter table configs op type " + 
pbAlterConfig.getOpType());
         }
     }
 
@@ -303,6 +305,36 @@ public class ServerRpcMessageUtils {
                 .collect(Collectors.toList());
     }
 
+    private static DatabaseChange toDatabaseChange(PbAlterConfig 
pbAlterConfig) {
+        AlterConfigOpType opType = 
AlterConfigOpType.from(pbAlterConfig.getOpType());
+        String configKey = pbAlterConfig.getConfigKey();
+        switch (opType) {
+            case SET: // SET_OPTION or SET_COMMENT
+                return DatabaseChange.set(configKey, 
pbAlterConfig.getConfigValue());
+            case DELETE: // RESET_OPTION or RESET_COMMENT
+                return DatabaseChange.reset(configKey);
+            case APPEND:
+            case SUBTRACT:
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported alter database configs op type " + 
pbAlterConfig.getOpType());
+        }
+    }
+
+    public static List<DatabaseChange> toDatabaseChanges(AlterDatabaseRequest 
request) {
+        List<DatabaseChange> databaseChanges =
+                request.getConfigChangesList().stream()
+                        .filter(Objects::nonNull)
+                        .map(ServerRpcMessageUtils::toDatabaseChange)
+                        .collect(Collectors.toList());
+
+        if (request.hasComment()) {
+            
databaseChanges.add(DatabaseChange.updateComment(request.getComment()));
+        }
+
+        return databaseChanges;
+    }
+
     public static List<TableChange> 
toAlterTableSchemaChanges(AlterTableRequest request) {
         List<TableChange> alterTableSchemaChanges = new ArrayList<>();
         
alterTableSchemaChanges.addAll(toAddColumns(request.getAddColumnsList()));
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index e6efb053e..037e06479 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -472,6 +472,13 @@ public class ZooKeeperClient implements AutoCloseable {
         LOG.info("Registered database {}", database);
     }
 
+    public void updateDatabase(String database, DatabaseRegistration 
databaseRegistration)
+            throws Exception {
+        String path = DatabaseZNode.path(database);
+        zkClient.setData().forPath(path, 
DatabaseZNode.encode(databaseRegistration));
+        LOG.info("Updated database {}", database);
+    }
+
     /** Get the database in ZK. */
     public Optional<DatabaseRegistration> getDatabase(String database) throws 
Exception {
         String path = DatabaseZNode.path(database);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
index 89d2c69d2..9986c7eb5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/DatabaseRegistration.java
@@ -54,6 +54,14 @@ public class DatabaseRegistration {
         return builder.build();
     }
 
+    public DatabaseRegistration newProperties(DatabaseDescriptor 
databaseDescriptor) {
+        return new DatabaseRegistration(
+                databaseDescriptor.getComment().orElse(null),
+                databaseDescriptor.getCustomProperties(),
+                createdTime,
+                System.currentTimeMillis());
+    }
+
     public static DatabaseRegistration of(DatabaseDescriptor 
databaseDescriptor) {
         final long currentMillis = System.currentTimeMillis();
         return new DatabaseRegistration(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 6aeee7137..6fc51fc44 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -30,6 +30,8 @@ import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsRequest;
 import org.apache.fluss.rpc.messages.AlterClusterConfigsResponse;
+import org.apache.fluss.rpc.messages.AlterDatabaseRequest;
+import org.apache.fluss.rpc.messages.AlterDatabaseResponse;
 import org.apache.fluss.rpc.messages.AlterTableRequest;
 import org.apache.fluss.rpc.messages.AlterTableResponse;
 import org.apache.fluss.rpc.messages.ApiVersionsRequest;
@@ -162,6 +164,11 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<AlterDatabaseResponse> 
alterDatabase(AlterDatabaseRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public CompletableFuture<DropDatabaseResponse> 
dropDatabase(DropDatabaseRequest request) {
         throw new UnsupportedOperationException();

Reply via email to