This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 7ceec4189 [Subtask]: In master-slave mode, database-based bucket
allocation is supported. (#4123)
7ceec4189 is described below
commit 7ceec41890836f162cec5d073a6bddcd77d6b2e9
Author: can <[email protected]>
AuthorDate: Thu Mar 19 13:22:40 2026 +0800
[Subtask]: In master-slave mode, database-based bucket allocation is
supported. (#4123)
* [Subtask]: In master-slave mode, database-based bucket allocation is
supported. #4121
* [Subtask]: Optimize based on CR's feedback.#4121
---------
Co-authored-by: wardli <[email protected]>
---
.../org/apache/amoro/server/AmsAssignService.java | 30 ++-
.../amoro/server/BucketAssignStoreFactory.java | 5 +-
.../apache/amoro/server/DBBucketAssignStore.java | 211 +++++++++++++++++++++
.../server/persistence/BucketAssignmentMeta.java | 86 +++++++++
.../persistence/SqlSessionFactoryProvider.java | 2 +
.../persistence/mapper/BucketAssignMapper.java | 83 ++++++++
.../src/main/resources/derby/ams-derby-init.sql | 11 +-
.../src/main/resources/mysql/ams-mysql-init.sql | 9 +
.../main/resources/postgres/ams-postgres-init.sql | 11 ++
.../java/org/apache/amoro/utils/JacksonUtil.java | 13 ++
10 files changed, 452 insertions(+), 9 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
index a09445b54..dfee1ce8d 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -39,7 +39,9 @@ import java.util.concurrent.TimeUnit;
/**
* Service for assigning bucket IDs to AMS nodes in master-slave mode.
Periodically detects node
- * changes and redistributes bucket IDs evenly.
+ * changes and redistributes bucket IDs evenly. Works with any {@link
BucketAssignStore}
+ * implementation (e.g. ZK or database); the store is chosen by {@link
BucketAssignStoreFactory}
+ * based on {@code ha.type}.
*/
public class AmsAssignService {
@@ -151,6 +153,8 @@ public class AmsAssignService {
rebalance(aliveNodes, change.newNodes, bucketsToRedistribute,
allBuckets, newAssignments);
persistAssignments(newAssignments);
} catch (Exception e) {
+ // Catch broadly to keep the scheduler running; store (ZK/DB) errors are
logged and retried
+ // next interval
LOG.error("Error during bucket assignment", e);
}
}
@@ -297,7 +301,7 @@ public class AmsAssignService {
/**
* Get node key for matching nodes. Uses host:thriftBindPort format,
consistent with
- * ZkBucketAssignStore.getNodeKey().
+ * BucketAssignStore implementations (ZkBucketAssignStore and
DBBucketAssignStore).
*/
private String getNodeKey(AmsServerInfo nodeInfo) {
return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
@@ -408,12 +412,18 @@ public class AmsAssignService {
return new NodeChangeResult(newNodes, offlineNodes);
}
+ /**
+ * Removes offline nodes from the store and collects their buckets for
redistribution. Only adds
+ * buckets to the result after a successful remove, so that store failures
do not lead to the same
+ * bucket being assigned to both the offline node and an alive node.
+ */
private List<String> handleOfflineNodes(
Set<AmsServerInfo> offlineNodes, Map<AmsServerInfo, List<String>>
currentAssignments) {
List<String> bucketsToRedistribute = new ArrayList<>();
for (AmsServerInfo offlineNode : offlineNodes) {
+ List<String> offlineBuckets = currentAssignments.get(offlineNode);
try {
- List<String> offlineBuckets = currentAssignments.get(offlineNode);
+ assignStore.removeAssignments(offlineNode);
if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
bucketsToRedistribute.addAll(offlineBuckets);
LOG.info(
@@ -421,9 +431,11 @@ public class AmsAssignService {
offlineBuckets.size(),
offlineNode);
}
- assignStore.removeAssignments(offlineNode);
} catch (BucketAssignStoreException e) {
- LOG.warn("Failed to remove assignments for offline node {}",
offlineNode, e);
+ LOG.warn(
+ "Failed to remove assignments for offline node {}, skip
redistributing its buckets",
+ offlineNode,
+ e);
}
}
return bucketsToRedistribute;
@@ -488,6 +500,10 @@ public class AmsAssignService {
}
}
+ /**
+ * Persists the new assignment map to the store. On per-node failure we log
and continue so that
+ * other nodes are still updated; the next run will retry.
+ */
private void persistAssignments(Map<AmsServerInfo, List<String>>
newAssignments) {
for (Map.Entry<AmsServerInfo, List<String>> entry :
newAssignments.entrySet()) {
try {
@@ -503,6 +519,10 @@ public class AmsAssignService {
}
}
+ /**
+ * Refreshes last update time for all alive nodes when no reassignment is
needed. Per-node
+ * failures are logged and skipped; the next run will retry.
+ */
private void refreshLastUpdateTime(List<AmsServerInfo> aliveNodes) {
for (AmsServerInfo node : aliveNodes) {
try {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
index ec865db37..12a494450 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -63,9 +63,8 @@ public final class BucketAssignStoreFactory {
"Cannot create ZkBucketAssignStore: ZK client not available or
invalid container type");
case AmoroManagementConf.HA_TYPE_DATABASE:
- LOG.info("Creating DataBaseBucketAssignStore for cluster: {}",
clusterName);
- // TODO: Implement DataBaseBucketAssignStore when ready
- throw new UnsupportedOperationException("DataBaseBucketAssignStore is
not yet implemented");
+ LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
+ return new DBBucketAssignStore(clusterName);
default:
throw new IllegalArgumentException(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
new file mode 100644
index 000000000..b044f7b6c
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
+import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Database-backed implementation of BucketAssignStore for HA_TYPE_DATABASE.
Stores bucket
+ * assignments in the {@code bucket_assignments} table, keyed by cluster name
and node key
+ * (host:thriftBindPort).
+ */
+public class DBBucketAssignStore extends PersistentBase implements
BucketAssignStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DBBucketAssignStore.class);
+ private static final TypeReference<List<String>> LIST_STRING_TYPE =
+ new TypeReference<List<String>>() {};
+
+ private final String clusterName;
+
+ public DBBucketAssignStore(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ @Override
+ public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+ throws BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String serverInfoJson = JacksonUtil.toJSONString(nodeInfo);
+ String assignmentsJson =
+ JacksonUtil.toJSONString(bucketIds != null ? bucketIds : new
ArrayList<>());
+ long now = System.currentTimeMillis();
+ try {
+ // Use atomic operation: try insert first, if failed then update
+ try {
+ doAs(
+ BucketAssignMapper.class,
+ mapper ->
+ mapper.insert(
+ new BucketAssignmentMeta(
+ clusterName, nodeKey, serverInfoJson, assignmentsJson,
now)));
+ } catch (Exception insertException) {
+ // If insert failed (record already exists), then perform update
+ doAsExisted(
+ BucketAssignMapper.class,
+ mapper -> mapper.update(clusterName, nodeKey, serverInfoJson,
assignmentsJson, now),
+ () ->
+ new BucketAssignStoreException(
+ "Failed to save bucket assignments for node " + nodeKey,
insertException));
+ }
+ LOG.debug("Saved bucket assignments for node {}: {}", nodeKey,
bucketIds);
+ } catch (Exception e) {
+ LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to save bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public List<String> getAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ try {
+ BucketAssignmentMeta meta =
+ getAs(BucketAssignMapper.class, mapper ->
mapper.selectByNode(clusterName, nodeKey));
+ if (meta == null
+ || meta.getAssignmentsJson() == null
+ || meta.getAssignmentsJson().isEmpty()) {
+ return new ArrayList<>();
+ }
+ return JacksonUtil.parseObject(meta.getAssignmentsJson(),
LIST_STRING_TYPE);
+ } catch (Exception e) {
+ LOG.error("Failed to get bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to get bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public void removeAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ try {
+ doAs(BucketAssignMapper.class, mapper ->
mapper.deleteByNode(clusterName, nodeKey));
+ LOG.debug("Removed bucket assignments for node {}", nodeKey);
+ } catch (Exception e) {
+ LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to remove bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public Map<AmsServerInfo, List<String>> getAllAssignments() throws
BucketAssignStoreException {
+ try {
+ List<BucketAssignmentMeta> rows =
+ getAs(BucketAssignMapper.class, mapper ->
mapper.selectAllByCluster(clusterName));
+ Map<AmsServerInfo, List<String>> result = new HashMap<>();
+ for (BucketAssignmentMeta meta : rows) {
+ if (meta.getAssignmentsJson() == null ||
meta.getAssignmentsJson().isEmpty()) {
+ continue;
+ }
+ List<String> bucketIds =
+ JacksonUtil.parseObject(meta.getAssignmentsJson(),
LIST_STRING_TYPE);
+ if (bucketIds == null || bucketIds.isEmpty()) {
+ continue;
+ }
+ AmsServerInfo nodeInfo = parseNodeInfo(meta);
+ result.put(nodeInfo, bucketIds);
+ }
+ return result;
+ } catch (Exception e) {
+ LOG.error("Failed to get all bucket assignments", e);
+ throw new BucketAssignStoreException("Failed to get all bucket
assignments", e);
+ }
+ }
+
+ @Override
+ public long getLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ try {
+ BucketAssignmentMeta meta =
+ getAs(BucketAssignMapper.class, mapper ->
mapper.selectByNode(clusterName, nodeKey));
+ if (meta == null || meta.getLastUpdateTime() == null) {
+ return 0;
+ }
+ return meta.getLastUpdateTime();
+ } catch (Exception e) {
+ LOG.error("Failed to get last update time for node {}", nodeKey, e);
+ throw new BucketAssignStoreException("Failed to get last update time for
node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ long now = System.currentTimeMillis();
+ try {
+ Long updated =
+ updateAs(
+ BucketAssignMapper.class,
+ mapper -> mapper.updateLastUpdateTime(clusterName, nodeKey,
now));
+ if (updated != null && updated == 0) {
+ // Row may not exist; insert a minimal row so last_update_time is
stored
+ doAs(
+ BucketAssignMapper.class,
+ mapper ->
+ mapper.insert(new BucketAssignmentMeta(clusterName, nodeKey,
null, null, now)));
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to update last update time for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to update last update time for node " + nodeKey, e);
+ }
+ }
+
+ private static String getNodeKey(AmsServerInfo nodeInfo) {
+ return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+ }
+
+ private static AmsServerInfo parseNodeInfo(BucketAssignmentMeta meta) {
+ if (meta.getServerInfoJson() != null &&
!meta.getServerInfoJson().isEmpty()) {
+ try {
+ return JacksonUtil.parseObject(meta.getServerInfoJson(),
AmsServerInfo.class);
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to parse server_info_json for node {}, fallback to
node_key",
+ meta.getNodeKey(),
+ e);
+ }
+ }
+ return parseNodeKey(meta.getNodeKey());
+ }
+
+ private static AmsServerInfo parseNodeKey(String nodeKey) {
+ String[] parts = nodeKey.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid node key format: " +
nodeKey);
+ }
+ AmsServerInfo nodeInfo = new AmsServerInfo();
+ nodeInfo.setHost(parts[0]);
+ nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+ return nodeInfo;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
new file mode 100644
index 000000000..4f6ab7fb4
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+/**
+ * Entity for a single bucket assignment row: one node (node_key) in a cluster
with its assigned
+ * bucket IDs and last update time.
+ */
+public class BucketAssignmentMeta {
+ private String clusterName;
+ private String nodeKey;
+ private String serverInfoJson;
+ private String assignmentsJson;
+ private Long lastUpdateTime;
+
+ public BucketAssignmentMeta() {}
+
+ public BucketAssignmentMeta(
+ String clusterName,
+ String nodeKey,
+ String serverInfoJson,
+ String assignmentsJson,
+ Long lastUpdateTime) {
+ this.clusterName = clusterName;
+ this.nodeKey = nodeKey;
+ this.serverInfoJson = serverInfoJson;
+ this.assignmentsJson = assignmentsJson;
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public String getNodeKey() {
+ return nodeKey;
+ }
+
+ public void setNodeKey(String nodeKey) {
+ this.nodeKey = nodeKey;
+ }
+
+ public String getServerInfoJson() {
+ return serverInfoJson;
+ }
+
+ public void setServerInfoJson(String serverInfoJson) {
+ this.serverInfoJson = serverInfoJson;
+ }
+
+ public String getAssignmentsJson() {
+ return assignmentsJson;
+ }
+
+ public void setAssignmentsJson(String assignmentsJson) {
+ this.assignmentsJson = assignmentsJson;
+ }
+
+ public Long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public void setLastUpdateTime(Long lastUpdateTime) {
+ this.lastUpdateTime = lastUpdateTime;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 3086bac5e..c6cb1bd53 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -25,6 +25,7 @@ import com.github.pagehelper.dialect.helper.MySqlDialect;
import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
import org.apache.amoro.server.persistence.mapper.OptimizerMapper;
@@ -76,6 +77,7 @@ public class SqlSessionFactoryProvider {
configuration.addMapper(TableProcessMapper.class);
configuration.addMapper(TableRuntimeMapper.class);
configuration.addMapper(HaLeaseMapper.class);
+ configuration.addMapper(BucketAssignMapper.class);
PageInterceptor interceptor = new PageInterceptor();
Properties interceptorProperties = new Properties();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
new file mode 100644
index 000000000..55fab0175
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence.mapper;
+
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
+import org.apache.ibatis.annotations.Delete;
+import org.apache.ibatis.annotations.Insert;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.Result;
+import org.apache.ibatis.annotations.ResultMap;
+import org.apache.ibatis.annotations.Results;
+import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
+
+import java.util.List;
+
+/** MyBatis mapper for bucket_assignments table. */
+public interface BucketAssignMapper {
+
+ @Insert(
+ "INSERT INTO bucket_assignments (cluster_name, node_key,
server_info_json, assignments_json, last_update_time) "
+ + "VALUES (#{meta.clusterName}, #{meta.nodeKey},
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime})")
+ int insert(@Param("meta") BucketAssignmentMeta meta);
+
+ @Update(
+ "UPDATE bucket_assignments SET server_info_json = #{serverInfoJson},
assignments_json = #{assignmentsJson}, last_update_time = #{lastUpdateTime} "
+ + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+ int update(
+ @Param("clusterName") String clusterName,
+ @Param("nodeKey") String nodeKey,
+ @Param("serverInfoJson") String serverInfoJson,
+ @Param("assignmentsJson") String assignmentsJson,
+ @Param("lastUpdateTime") Long lastUpdateTime);
+
+ @Update(
+ "UPDATE bucket_assignments SET last_update_time = #{lastUpdateTime} "
+ + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+ int updateLastUpdateTime(
+ @Param("clusterName") String clusterName,
+ @Param("nodeKey") String nodeKey,
+ @Param("lastUpdateTime") Long lastUpdateTime);
+
+ @Select(
+ "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time "
+ + "FROM bucket_assignments WHERE cluster_name = #{clusterName} AND
node_key = #{nodeKey}")
+ @Results(
+ id = "BucketAssignmentMetaMap",
+ value = {
+ @Result(column = "cluster_name", property = "clusterName"),
+ @Result(column = "node_key", property = "nodeKey"),
+ @Result(column = "server_info_json", property = "serverInfoJson"),
+ @Result(column = "assignments_json", property = "assignmentsJson"),
+ @Result(column = "last_update_time", property = "lastUpdateTime")
+ })
+ BucketAssignmentMeta selectByNode(
+ @Param("clusterName") String clusterName, @Param("nodeKey") String
nodeKey);
+
+ @Select(
+ "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time "
+ + "FROM bucket_assignments WHERE cluster_name = #{clusterName}")
+ @ResultMap("BucketAssignmentMetaMap")
+ List<BucketAssignmentMeta> selectAllByCluster(@Param("clusterName") String
clusterName);
+
+ @Delete(
+ "DELETE FROM bucket_assignments WHERE cluster_name = #{clusterName} AND
node_key = #{nodeKey}")
+ int deleteByNode(@Param("clusterName") String clusterName, @Param("nodeKey")
String nodeKey);
+}
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 1e7a7e340..6f4be2ae5 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -264,4 +264,13 @@ CREATE TABLE ha_lease (
);
CREATE INDEX idx_ha_lease_expire ON ha_lease (lease_expire_ts);
-CREATE INDEX idx_ha_lease_node ON ha_lease (node_id);
\ No newline at end of file
+CREATE INDEX idx_ha_lease_node ON ha_lease (node_id);
+
+CREATE TABLE bucket_assignments (
+ cluster_name VARCHAR(64) NOT NULL,
+ node_key VARCHAR(256) NOT NULL,
+ server_info_json VARCHAR(32672),
+ assignments_json VARCHAR(32672),
+ last_update_time BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (cluster_name, node_key)
+);
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index f1f91e3a3..e9fd1a244 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -281,3 +281,12 @@ CREATE TABLE IF NOT EXISTS ha_lease (
KEY `idx_ha_lease_expire` (lease_expire_ts) COMMENT 'Index for querying
expired leases',
KEY `idx_ha_lease_node` (node_id) COMMENT 'Index for querying leases by node
ID'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='HA lease table for leader
election and heartbeat renewal';
+
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+ cluster_name VARCHAR(64) NOT NULL COMMENT 'AMS cluster name',
+ node_key VARCHAR(256) NOT NULL COMMENT 'Node key
(host:thriftBindPort)',
+ server_info_json TEXT NULL COMMENT 'JSON encoded AmsServerInfo',
+ assignments_json TEXT NULL COMMENT 'JSON array of bucket IDs',
+ last_update_time BIGINT NOT NULL DEFAULT 0 COMMENT 'Last update
timestamp (ms since epoch)',
+ PRIMARY KEY (cluster_name, node_key)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS
node for master-slave mode';
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index 8f13d6862..eeb1348bb 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -456,6 +456,17 @@ CREATE TABLE IF NOT EXISTS ha_lease (
CREATE INDEX IF NOT EXISTS idx_ha_lease_expire ON ha_lease (lease_expire_ts);
CREATE INDEX IF NOT EXISTS idx_ha_lease_node ON ha_lease (node_id);
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+ cluster_name VARCHAR(64) NOT NULL,
+ node_key VARCHAR(256) NOT NULL,
+ server_info_json TEXT NULL,
+ assignments_json TEXT NULL,
+ last_update_time BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (cluster_name, node_key)
+);
+
+CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON
bucket_assignments (cluster_name);
+
COMMENT ON COLUMN service_name IS 'Service name
(AMS/TABLE_SERVICE/OPTIMIZING_SERVICE)';
COMMENT ON COLUMN node_id IS 'Unique node identifier (host:port:uuid)';
COMMENT ON COLUMN node_ip IS 'Node IP address';
diff --git a/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
b/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
index 89f9188a8..9146292ab 100644
--- a/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
+++ b/amoro-common/src/main/java/org/apache/amoro/utils/JacksonUtil.java
@@ -75,6 +75,19 @@ public class JacksonUtil {
}
}
+ /**
+ * Deserialize a json string to the type specified by the given
TypeReference (e.g. list or map
+ * with generics). Uses the same ObjectMapper as toJSONString and
parseObject(Class) for
+ * consistent config.
+ */
+ public static <T> T parseObject(String jsonString, TypeReference<T>
typeReference) {
+ try {
+ return OBJECT_MAPPER.readValue(jsonString, typeReference);
+ } catch (IOException e) {
+ throw new IllegalStateException(DESERIALIZE_ERROR, e);
+ }
+ }
+
/** Convert an java object(usually java bean) to JsonNode. */
public static <T> JsonNode fromObjects(T object) {
return OBJECT_MAPPER.valueToTree(object);