This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ceec4189 [Subtask]: In master-slave mode, database-based bucket 
allocation is supported.  (#4123)
7ceec4189 is described below

commit 7ceec41890836f162cec5d073a6bddcd77d6b2e9
Author: can <[email protected]>
AuthorDate: Thu Mar 19 13:22:40 2026 +0800

    [Subtask]: In master-slave mode, database-based bucket allocation is 
supported.  (#4123)
    
    * [Subtask]: In master-slave mode, database-based bucket allocation is 
supported. #4121
    
    * [Subtask]: Optimize based on CR's feedback.#4121
    
    ---------
    
    Co-authored-by: wardli <[email protected]>
---
 .../org/apache/amoro/server/AmsAssignService.java  |  30 ++-
 .../amoro/server/BucketAssignStoreFactory.java     |   5 +-
 .../apache/amoro/server/DBBucketAssignStore.java   | 211 +++++++++++++++++++++
 .../server/persistence/BucketAssignmentMeta.java   |  86 +++++++++
 .../persistence/SqlSessionFactoryProvider.java     |   2 +
 .../persistence/mapper/BucketAssignMapper.java     |  83 ++++++++
 .../src/main/resources/derby/ams-derby-init.sql    |  11 +-
 .../src/main/resources/mysql/ams-mysql-init.sql    |   9 +
 .../main/resources/postgres/ams-postgres-init.sql  |  11 ++
 .../java/org/apache/amoro/utils/JacksonUtil.java   |  13 ++
 10 files changed, 452 insertions(+), 9 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
index a09445b54..dfee1ce8d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -39,7 +39,9 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Service for assigning bucket IDs to AMS nodes in master-slave mode. 
Periodically detects node
- * changes and redistributes bucket IDs evenly.
+ * changes and redistributes bucket IDs evenly. Works with any {@link 
BucketAssignStore}
+ * implementation (e.g. ZK or database); the store is chosen by {@link 
BucketAssignStoreFactory}
+ * based on {@code ha.type}.
  */
 public class AmsAssignService {
 
@@ -151,6 +153,8 @@ public class AmsAssignService {
       rebalance(aliveNodes, change.newNodes, bucketsToRedistribute, 
allBuckets, newAssignments);
       persistAssignments(newAssignments);
     } catch (Exception e) {
+      // Catch broadly to keep the scheduler running; store (ZK/DB) errors are 
logged and retried
+      // next interval
       LOG.error("Error during bucket assignment", e);
     }
   }
@@ -297,7 +301,7 @@ public class AmsAssignService {
 
   /**
    * Get node key for matching nodes. Uses host:thriftBindPort format, 
consistent with
-   * ZkBucketAssignStore.getNodeKey().
+   * BucketAssignStore implementations (ZkBucketAssignStore and 
DBBucketAssignStore).
    */
   private String getNodeKey(AmsServerInfo nodeInfo) {
     return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
@@ -408,12 +412,18 @@ public class AmsAssignService {
     return new NodeChangeResult(newNodes, offlineNodes);
   }
 
+  /**
+   * Removes offline nodes from the store and collects their buckets for 
redistribution. Only adds
+   * buckets to the result after a successful remove, so that store failures 
do not lead to the same
+   * bucket being assigned to both the offline node and an alive node.
+   */
   private List<String> handleOfflineNodes(
       Set<AmsServerInfo> offlineNodes, Map<AmsServerInfo, List<String>> 
currentAssignments) {
     List<String> bucketsToRedistribute = new ArrayList<>();
     for (AmsServerInfo offlineNode : offlineNodes) {
+      List<String> offlineBuckets = currentAssignments.get(offlineNode);
       try {
-        List<String> offlineBuckets = currentAssignments.get(offlineNode);
+        assignStore.removeAssignments(offlineNode);
         if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
           bucketsToRedistribute.addAll(offlineBuckets);
           LOG.info(
@@ -421,9 +431,11 @@ public class AmsAssignService {
               offlineBuckets.size(),
               offlineNode);
         }
-        assignStore.removeAssignments(offlineNode);
       } catch (BucketAssignStoreException e) {
-        LOG.warn("Failed to remove assignments for offline node {}", 
offlineNode, e);
+        LOG.warn(
+            "Failed to remove assignments for offline node {}, skip 
redistributing its buckets",
+            offlineNode,
+            e);
       }
     }
     return bucketsToRedistribute;
@@ -488,6 +500,10 @@ public class AmsAssignService {
     }
   }
 
+  /**
+   * Persists the new assignment map to the store. On per-node failure we log 
and continue so that
+   * other nodes are still updated; the next run will retry.
+   */
   private void persistAssignments(Map<AmsServerInfo, List<String>> 
newAssignments) {
     for (Map.Entry<AmsServerInfo, List<String>> entry : 
newAssignments.entrySet()) {
       try {
@@ -503,6 +519,10 @@ public class AmsAssignService {
     }
   }
 
+  /**
+   * Refreshes last update time for all alive nodes when no reassignment is 
needed. Per-node
+   * failures are logged and skipped; the next run will retry.
+   */
   private void refreshLastUpdateTime(List<AmsServerInfo> aliveNodes) {
     for (AmsServerInfo node : aliveNodes) {
       try {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
index ec865db37..12a494450 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -63,9 +63,8 @@ public final class BucketAssignStoreFactory {
             "Cannot create ZkBucketAssignStore: ZK client not available or 
invalid container type");
 
       case AmoroManagementConf.HA_TYPE_DATABASE:
-        LOG.info("Creating DataBaseBucketAssignStore for cluster: {}", 
clusterName);
-        // TODO: Implement DataBaseBucketAssignStore when ready
-        throw new UnsupportedOperationException("DataBaseBucketAssignStore is 
not yet implemented");
+        LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
+        return new DBBucketAssignStore(clusterName);
 
       default:
         throw new IllegalArgumentException(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
new file mode 100644
index 000000000..b044f7b6c
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
+import 
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Database-backed implementation of BucketAssignStore for HA_TYPE_DATABASE. 
Stores bucket
+ * assignments in the {@code bucket_assignments} table, keyed by cluster name 
and node key
+ * (host:thriftBindPort).
+ */
+public class DBBucketAssignStore extends PersistentBase implements 
BucketAssignStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DBBucketAssignStore.class);
+  private static final TypeReference<List<String>> LIST_STRING_TYPE =
+      new TypeReference<List<String>>() {};
+
+  private final String clusterName;
+
+  public DBBucketAssignStore(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  @Override
+  public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+      throws BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String serverInfoJson = JacksonUtil.toJSONString(nodeInfo);
+    String assignmentsJson =
+        JacksonUtil.toJSONString(bucketIds != null ? bucketIds : new 
ArrayList<>());
+    long now = System.currentTimeMillis();
+    try {
+      // Use atomic operation: try insert first, if failed then update
+      try {
+        doAs(
+            BucketAssignMapper.class,
+            mapper ->
+                mapper.insert(
+                    new BucketAssignmentMeta(
+                        clusterName, nodeKey, serverInfoJson, assignmentsJson, 
now)));
+      } catch (Exception insertException) {
+        // If insert failed (record already exists), then perform update
+        doAsExisted(
+            BucketAssignMapper.class,
+            mapper -> mapper.update(clusterName, nodeKey, serverInfoJson, 
assignmentsJson, now),
+            () ->
+                new BucketAssignStoreException(
+                    "Failed to save bucket assignments for node " + nodeKey, 
insertException));
+      }
+      LOG.debug("Saved bucket assignments for node {}: {}", nodeKey, 
bucketIds);
+    } catch (Exception e) {
+      LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to save bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public List<String> getAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    try {
+      BucketAssignmentMeta meta =
+          getAs(BucketAssignMapper.class, mapper -> 
mapper.selectByNode(clusterName, nodeKey));
+      if (meta == null
+          || meta.getAssignmentsJson() == null
+          || meta.getAssignmentsJson().isEmpty()) {
+        return new ArrayList<>();
+      }
+      return JacksonUtil.parseObject(meta.getAssignmentsJson(), 
LIST_STRING_TYPE);
+    } catch (Exception e) {
+      LOG.error("Failed to get bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to get bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public void removeAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    try {
+      doAs(BucketAssignMapper.class, mapper -> 
mapper.deleteByNode(clusterName, nodeKey));
+      LOG.debug("Removed bucket assignments for node {}", nodeKey);
+    } catch (Exception e) {
+      LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to remove bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public Map<AmsServerInfo, List<String>> getAllAssignments() throws 
BucketAssignStoreException {
+    try {
+      List<BucketAssignmentMeta> rows =
+          getAs(BucketAssignMapper.class, mapper -> 
mapper.selectAllByCluster(clusterName));
+      Map<AmsServerInfo, List<String>> result = new HashMap<>();
+      for (BucketAssignmentMeta meta : rows) {
+        if (meta.getAssignmentsJson() == null || 
meta.getAssignmentsJson().isEmpty()) {
+          continue;
+        }
+        List<String> bucketIds =
+            JacksonUtil.parseObject(meta.getAssignmentsJson(), 
LIST_STRING_TYPE);
+        if (bucketIds == null || bucketIds.isEmpty()) {
+          continue;
+        }
+        AmsServerInfo nodeInfo = parseNodeInfo(meta);
+        result.put(nodeInfo, bucketIds);
+      }
+      return result;
+    } catch (Exception e) {
+      LOG.error("Failed to get all bucket assignments", e);
+      throw new BucketAssignStoreException("Failed to get all bucket 
assignments", e);
+    }
+  }
+
+  @Override
+  public long getLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    try {
+      BucketAssignmentMeta meta =
+          getAs(BucketAssignMapper.class, mapper -> 
mapper.selectByNode(clusterName, nodeKey));
+      if (meta == null || meta.getLastUpdateTime() == null) {
+        return 0;
+      }
+      return meta.getLastUpdateTime();
+    } catch (Exception e) {
+      LOG.error("Failed to get last update time for node {}", nodeKey, e);
+      throw new BucketAssignStoreException("Failed to get last update time for 
node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    long now = System.currentTimeMillis();
+    try {
+      Long updated =
+          updateAs(
+              BucketAssignMapper.class,
+              mapper -> mapper.updateLastUpdateTime(clusterName, nodeKey, 
now));
+      if (updated != null && updated == 0) {
+        // Row may not exist; insert a minimal row so last_update_time is 
stored
+        doAs(
+            BucketAssignMapper.class,
+            mapper ->
+                mapper.insert(new BucketAssignmentMeta(clusterName, nodeKey, 
null, null, now)));
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to update last update time for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to update last update time for node " + nodeKey, e);
+    }
+  }
+
+  private static String getNodeKey(AmsServerInfo nodeInfo) {
+    return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+  }
+
+  private static AmsServerInfo parseNodeInfo(BucketAssignmentMeta meta) {
+    if (meta.getServerInfoJson() != null && 
!meta.getServerInfoJson().isEmpty()) {
+      try {
+        return JacksonUtil.parseObject(meta.getServerInfoJson(), 
AmsServerInfo.class);
+      } catch (Exception e) {
+        LOG.warn(
+            "Failed to parse server_info_json for node {}, fallback to 
node_key",
+            meta.getNodeKey(),
+            e);
+      }
+    }
+    return parseNodeKey(meta.getNodeKey());
+  }
+
+  private static AmsServerInfo parseNodeKey(String nodeKey) {
+    String[] parts = nodeKey.split(":");
+    if (parts.length != 2) {
+      throw new IllegalArgumentException("Invalid node key format: " + 
nodeKey);
+    }
+    AmsServerInfo nodeInfo = new AmsServerInfo();
+    nodeInfo.setHost(parts[0]);
+    nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+    return nodeInfo;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
new file mode 100644
index 000000000..4f6ab7fb4
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+/**
+ * Entity for a single bucket assignment row: one node (node_key) in a cluster 
with its assigned
+ * bucket IDs and last update time.
+ */
+public class BucketAssignmentMeta {
+  private String clusterName;
+  private String nodeKey;
+  private String serverInfoJson;
+  private String assignmentsJson;
+  private Long lastUpdateTime;
+
+  public BucketAssignmentMeta() {}
+
+  public BucketAssignmentMeta(
+      String clusterName,
+      String nodeKey,
+      String serverInfoJson,
+      String assignmentsJson,
+      Long lastUpdateTime) {
+    this.clusterName = clusterName;
+    this.nodeKey = nodeKey;
+    this.serverInfoJson = serverInfoJson;
+    this.assignmentsJson = assignmentsJson;
+    this.lastUpdateTime = lastUpdateTime;
+  }
+
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(String clusterName) {
+    this.clusterName = clusterName;
+  }
+
+  public String getNodeKey() {
+    return nodeKey;
+  }
+
+  public void setNodeKey(String nodeKey) {
+    this.nodeKey = nodeKey;
+  }
+
+  public String getServerInfoJson() {
+    return serverInfoJson;
+  }
+
+  public void setServerInfoJson(String serverInfoJson) {
+    this.serverInfoJson = serverInfoJson;
+  }
+
+  public String getAssignmentsJson() {
+    return assignmentsJson;
+  }
+
+  public void setAssignmentsJson(String assignmentsJson) {
+    this.assignmentsJson = assignmentsJson;
+  }
+
+  public Long getLastUpdateTime() {
+    return lastUpdateTime;
+  }
+
+  public void setLastUpdateTime(Long lastUpdateTime) {
+    this.lastUpdateTime = lastUpdateTime;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 3086bac5e..c6cb1bd53 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -25,6 +25,7 @@ import com.github.pagehelper.dialect.helper.MySqlDialect;
 import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
 import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
 import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
 import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
 import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
 import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
@@ -76,6 +77,7 @@ public class SqlSessionFactoryProvider {
     configuration.addMapper(TableProcessMapper.class);
     configuration.addMapper(TableRuntimeMapper.class);
     configuration.addMapper(HaLeaseMapper.class);
+    configuration.addMapper(BucketAssignMapper.class);
 
     PageInterceptor interceptor = new PageInterceptor();
     Properties interceptorProperties = new Properties();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
new file mode 100644
index 000000000..55fab0175
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+
+/** MyBatis mapper for bucket_assignments table. */
+public interface BucketAssignMapper {
+
+  @Insert(
+      "INSERT INTO bucket_assignments (cluster_name, node_key, 
server_info_json, assignments_json, last_update_time) "
+          + "VALUES (#{meta.clusterName}, #{meta.nodeKey}, 
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime})")
+  int insert(@Param("meta") BucketAssignmentMeta meta);
+
+  @Update(
+      "UPDATE bucket_assignments SET server_info_json = #{serverInfoJson}, 
assignments_json = #{assignmentsJson}, last_update_time = #{lastUpdateTime} "
+          + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+  int update(
+      @Param("clusterName") String clusterName,
+      @Param("nodeKey") String nodeKey,
+      @Param("serverInfoJson") String serverInfoJson,
+      @Param("assignmentsJson") String assignmentsJson,
+      @Param("lastUpdateTime") Long lastUpdateTime);
+
+  @Update(
+      "UPDATE bucket_assignments SET last_update_time = #{lastUpdateTime} "
+          + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+  int updateLastUpdateTime(
+      @Param("clusterName") String clusterName,
+      @Param("nodeKey") String nodeKey,
+      @Param("lastUpdateTime") Long lastUpdateTime);
+
+  @Select(
+      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time "
+          + "FROM bucket_assignments WHERE cluster_name = #{clusterName} AND 
node_key = #{nodeKey}")
+  @Results(
+      id = "BucketAssignmentMetaMap",
+      value = {
+        @Result(column = "cluster_name", property = "clusterName"),
+        @Result(column = "node_key", property = "nodeKey"),
+        @Result(column = "server_info_json", property = "serverInfoJson"),
+        @Result(column = "assignments_json", property = "assignmentsJson"),
+        @Result(column = "last_update_time", property = "lastUpdateTime")
+      })
+  BucketAssignmentMeta selectByNode(
+      @Param("clusterName") String clusterName, @Param("nodeKey") String 
nodeKey);
+
+  @Select(
+      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time "
+          + "FROM bucket_assignments WHERE cluster_name = #{clusterName}")
+  @ResultMap("BucketAssignmentMetaMap")
+  List<BucketAssignmentMeta> selectAllByCluster(@Param("clusterName") String 
clusterName);
+
+  @Delete(
+      "DELETE FROM bucket_assignments WHERE cluster_name = #{clusterName} AND 
node_key = #{nodeKey}")
+  int deleteByNode(@Param("clusterName") String clusterName, @Param("nodeKey") 
String nodeKey);
+}
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql 
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 1e7a7e340..6f4be2ae5 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -264,4 +264,13 @@ CREATE TABLE ha_lease (
 );
 
 CREATE INDEX idx_ha_lease_expire ON ha_lease (lease_expire_ts);
-CREATE INDEX idx_ha_lease_node   ON ha_lease (node_id);
\ No newline at end of file
+CREATE INDEX idx_ha_lease_node   ON ha_lease (node_id);
+
+CREATE TABLE bucket_assignments (
+  cluster_name       VARCHAR(64)    NOT NULL,
+  node_key           VARCHAR(256)  NOT NULL,
+  server_info_json   VARCHAR(32672),
+  assignments_json   VARCHAR(32672),
+  last_update_time   BIGINT        NOT NULL DEFAULT 0,
+  PRIMARY KEY (cluster_name, node_key)
+);
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index f1f91e3a3..e9fd1a244 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -281,3 +281,12 @@ CREATE TABLE IF NOT EXISTS ha_lease (
   KEY `idx_ha_lease_expire` (lease_expire_ts) COMMENT 'Index for querying 
expired leases',
   KEY `idx_ha_lease_node` (node_id) COMMENT 'Index for querying leases by node 
ID'
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='HA lease table for leader 
election and heartbeat renewal';
+
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+  cluster_name       VARCHAR(64)   NOT NULL COMMENT 'AMS cluster name',
+  node_key           VARCHAR(256) NOT NULL COMMENT 'Node key 
(host:thriftBindPort)',
+  server_info_json   TEXT         NULL COMMENT 'JSON encoded AmsServerInfo',
+  assignments_json   TEXT         NULL COMMENT 'JSON array of bucket IDs',
+  last_update_time   BIGINT       NOT NULL DEFAULT 0 COMMENT 'Last update 
timestamp (ms since epoch)',
+  PRIMARY KEY (cluster_name, node_key)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS 
node for master-slave mode';
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 8f13d6862..eeb1348bb 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -456,6 +456,17 @@ CREATE TABLE IF NOT EXISTS ha_lease (
 CREATE INDEX IF NOT EXISTS idx_ha_lease_expire ON ha_lease (lease_expire_ts);
 CREATE INDEX IF NOT EXISTS idx_ha_lease_node   ON ha_lease (node_id);
 
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+  cluster_name       VARCHAR(64)  NOT NULL,
+  node_key           VARCHAR(256) NOT NULL,
+  server_info_json   TEXT         NULL,
+  assignments_json   TEXT         NULL,
+  last_update_time   BIGINT       NOT NULL DEFAULT 0,
+  PRIMARY KEY (cluster_name, node_key)
+);
+
+CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON 
bucket_assignments (cluster_name);
+
 COMMENT ON COLUMN service_name IS 'Service name 
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)';
 COMMENT ON COLUMN node_id IS 'Unique node identifier (host:port:uuid)';
 COMMENT ON COLUMN node_ip IS 'Node IP address';
diff --git a/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java 
b/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
index 89f9188a8..9146292ab 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
@@ -75,6 +75,19 @@ public class JacksonUtil {
     }
   }
 
+  /**
+   * Deserialize a json string to the type specified by the given 
TypeReference (e.g. list or map
+   * with generics). Uses the same ObjectMapper as toJSONString and 
parseObject(Class) for
+   * consistent config.
+   */
+  public static <T> T parseObject(String jsonString, TypeReference<T> 
typeReference) {
+    try {
+      return OBJECT_MAPPER.readValue(jsonString, typeReference);
+    } catch (IOException e) {
+      throw new IllegalStateException(DESERIALIZE_ERROR, e);
+    }
+  }
+
   /** Convert an java object(usually java bean) to JsonNode. */
   public static <T> JsonNode fromObjects(T object) {
     return OBJECT_MAPPER.valueToTree(object);

Reply via email to