This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 25f19da9a [AMORO-3921] Add AmsAssignService and ZkBucketAssignStore to 
implement balanced bucket allocation in master-slave mode (#3922)
25f19da9a is described below

commit 25f19da9a6cf6aa4acea787a2834c864ea57edb0
Author: can <[email protected]>
AuthorDate: Mon Mar 16 14:19:47 2026 +0800

    [AMORO-3921] Add AmsAssignService and ZkBucketAssignStore to implement 
balanced bucket allocation in master-slave mode (#3922)
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Add a registration function for table allocation in 
master-slave mode. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: Replace zk with mocking. #3919
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: Revised based on CR's comments. #3921
    
    * [Subtask]: Revised based on CR's comments. #3921
    
    * [Subtask]: Revised based on CR's comments. #3921
    
    ---------
    
    Co-authored-by: wardli <[email protected]>
---
 .../apache/amoro/server/AmoroManagementConf.java   |  21 +
 .../apache/amoro/server/AmoroServiceContainer.java |  20 +
 .../org/apache/amoro/server/AmsAssignService.java  | 515 ++++++++++++
 .../org/apache/amoro/server/BucketAssignStore.java |  84 ++
 .../amoro/server/BucketAssignStoreFactory.java     |  75 ++
 .../apache/amoro/server/ZkBucketAssignStore.java   | 245 ++++++
 .../server/ha/ZkHighAvailabilityContainer.java     |  28 +-
 .../apache/amoro/server/TestAmsAssignService.java  | 894 +++++++++++++++++++++
 .../server/TestHighAvailabilityContainer.java      | 563 +++++++++++++
 .../amoro/server/TestZkBucketAssignStore.java      | 480 +++++++++++
 .../src/main/java/org/apache/amoro/ErrorCodes.java |   2 +
 .../amoro/exception/AmoroRuntimeException.java     |   1 +
 .../exception/BucketAssignStoreException.java      |  38 +
 .../apache/amoro/properties/AmsHAProperties.java   |   5 +
 docs/configuration/ams-config.md                   |   3 +
 15 files changed, 2964 insertions(+), 10 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index e50e18a90..a3c5599be 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -286,6 +286,27 @@ public class AmoroManagementConf {
           .defaultValue(java.time.Duration.ofSeconds(30))
           .withDescription("TTL of HA lease.");
 
+  public static final ConfigOption<Integer> HA_BUCKET_ID_TOTAL_COUNT =
+      ConfigOptions.key("ha.bucket-id.total-count")
+          .intType()
+          .defaultValue(100)
+          .withDescription(
+              "Total count of bucket IDs for assignment. Bucket IDs range from 
1 to this value.");
+
+  public static final ConfigOption<Duration> HA_NODE_OFFLINE_TIMEOUT =
+      ConfigOptions.key("ha.node-offline.timeout")
+          .durationType()
+          .defaultValue(Duration.ofMinutes(5))
+          .withDescription(
+              "Timeout duration to determine if a node is offline. After this 
duration, the node's bucket IDs will be reassigned.");
+
+  public static final ConfigOption<Duration> HA_ASSIGN_INTERVAL =
+      ConfigOptions.key("ha.bucket-assign.interval")
+          .durationType()
+          .defaultValue(Duration.ofSeconds(60))
+          .withDescription(
+              "Interval for bucket assignment service to detect node changes 
and redistribute bucket IDs.");
+
   public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
       ConfigOptions.key("thrift-server.table-service.bind-port")
           .intType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 25de6a7af..0b268f7bf 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -126,6 +126,7 @@ public class AmoroServiceContainer {
   private Javalin httpServer;
   private AmsServiceMetrics amsServiceMetrics;
   private HAState haState = HAState.INITIALIZING;
+  private AmsAssignService amsAssignService;
 
   public AmoroServiceContainer() throws Exception {
     initConfig();
@@ -244,6 +245,20 @@ public class AmoroServiceContainer {
 
     DefaultTableRuntimeFactory defaultRuntimeFactory = new 
DefaultTableRuntimeFactory();
     defaultRuntimeFactory.initialize(processFactories);
+    // In master-slave mode, create AmsAssignService for bucket assignment
+    if (IS_MASTER_SLAVE_MODE && haContainer != null) {
+      try {
+        // Create and start AmsAssignService for bucket assignment
+        // The factory will handle different HA types (ZK, database, etc.)
+        amsAssignService = new AmsAssignService(haContainer, serviceConfig);
+        amsAssignService.start();
+        LOG.info("AmsAssignService started for master-slave mode");
+      } catch (UnsupportedOperationException e) {
+        LOG.info("Skip AmsAssignService: {}", e.getMessage());
+      } catch (Exception e) {
+        LOG.error("Failed to start AmsAssignService", e);
+      }
+    }
 
     List<ActionCoordinator> actionCoordinators = 
defaultRuntimeFactory.supportedCoordinators();
     ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
@@ -293,6 +308,11 @@ public class AmoroServiceContainer {
       LOG.info("Stopping optimizing server[serving:{}] ...", 
optimizingServiceServer.isServing());
       optimizingServiceServer.stop();
     }
+    if (amsAssignService != null) {
+      LOG.info("Stopping AmsAssignService...");
+      amsAssignService.stop();
+      amsAssignService = null;
+    }
     if (tableService != null) {
       LOG.info("Stopping table service...");
       tableService.dispose();
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
new file mode 100644
index 000000000..a09445b54
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode. 
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AmsAssignService.class);
+
+  private final ScheduledExecutorService assignScheduler =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("ams-assign-scheduler-%d")
+              .setDaemon(true)
+              .build());
+
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore assignStore;
+  private final Configurations serviceConfig;
+  private final int bucketIdTotalCount;
+  private final long nodeOfflineTimeoutMs;
+  private final long assignIntervalSeconds;
+  private volatile boolean running = false;
+
+  boolean isRunning() {
+    return running;
+  }
+
+  public AmsAssignService(HighAvailabilityContainer haContainer, 
Configurations serviceConfig) {
+    this.haContainer = haContainer;
+    this.serviceConfig = serviceConfig;
+    this.bucketIdTotalCount =
+        serviceConfig.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+    this.nodeOfflineTimeoutMs =
+        
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
+    this.assignIntervalSeconds =
+        serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
+    this.assignStore = BucketAssignStoreFactory.create(haContainer, 
serviceConfig);
+  }
+
+  /**
+   * Start the assignment service. Only works in master-slave mode and when 
current node is leader.
+   */
+  public void start() {
+    if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
+      LOG.info("Master-slave mode is not enabled, skip starting bucket 
assignment service");
+      return;
+    }
+    if (running) {
+      LOG.warn("Bucket assignment service is already running");
+      return;
+    }
+    running = true;
+    assignScheduler.scheduleWithFixedDelay(
+        this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS);
+    LOG.info("Bucket assignment service started with interval: {} seconds", 
assignIntervalSeconds);
+  }
+
+  /** Stop the assignment service. */
+  public void stop() {
+    if (!running) {
+      return;
+    }
+    running = false;
+    assignScheduler.shutdown();
+    try {
+      if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+        assignScheduler.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      assignScheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Bucket assignment service stopped");
+  }
+
+  @VisibleForTesting
+  public void doAssign() {
+    try {
+      if (!haContainer.hasLeadership()) {
+        LOG.debug("Current node is not leader, skip bucket assignment");
+        return;
+      }
+
+      List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+      if (aliveNodes.isEmpty()) {
+        LOG.debug("No alive nodes found, skip bucket assignment");
+        return;
+      }
+
+      Map<AmsServerInfo, List<String>> currentAssignments = 
assignStore.getAllAssignments();
+      Map<String, AmsServerInfo> aliveNodeMap = buildAliveNodeMap(aliveNodes);
+      NormalizedAssignments normalized =
+          normalizeCurrentAssignments(currentAssignments, aliveNodeMap);
+      NodeChangeResult change =
+          detectNodeChanges(aliveNodes, currentAssignments, aliveNodeMap, 
normalized.assignedNodes);
+
+      if (!change.needReassign()) {
+        refreshLastUpdateTime(aliveNodes);
+        return;
+      }
+
+      LOG.info(
+          "Detected node changes - New nodes: {}, Offline nodes: {}, 
Performing incremental reassignment...",
+          change.newNodes.size(),
+          change.offlineNodes.size());
+
+      List<String> bucketsToRedistribute =
+          handleOfflineNodes(change.offlineNodes, currentAssignments);
+      List<String> allBuckets = generateBucketIds();
+      Map<AmsServerInfo, List<String>> newAssignments =
+          buildNewAssignments(aliveNodes, change.offlineNodes, 
normalized.assignments);
+      rebalance(aliveNodes, change.newNodes, bucketsToRedistribute, 
allBuckets, newAssignments);
+      persistAssignments(newAssignments);
+    } catch (Exception e) {
+      LOG.error("Error during bucket assignment", e);
+    }
+  }
+
+  /**
+   * Redistribute buckets incrementally to alive nodes using round-robin. This 
minimizes bucket
+   * migration by only redistributing buckets from offline nodes.
+   *
+   * @param aliveNodes List of alive nodes
+   * @param bucketsToRedistribute Buckets to redistribute (from offline nodes)
+   * @param currentAssignments Current assignments map (will be modified)
+   */
+  private void redistributeBucketsIncrementally(
+      List<AmsServerInfo> aliveNodes,
+      List<String> bucketsToRedistribute,
+      Map<AmsServerInfo, List<String>> currentAssignments) {
+    if (aliveNodes.isEmpty() || bucketsToRedistribute.isEmpty()) {
+      return;
+    }
+
+    // Distribute buckets using round-robin to minimize migration
+    int nodeIndex = 0;
+    for (String bucketId : bucketsToRedistribute) {
+      AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size());
+      currentAssignments.get(node).add(bucketId);
+      nodeIndex++;
+    }
+  }
+
+  /**
+   * Balance buckets for new nodes by taking buckets from existing nodes. This 
minimizes migration
+   * by only moving necessary buckets to new nodes.
+   *
+   * @param aliveNodes All alive nodes
+   * @param newNodes Newly added nodes
+   * @param currentAssignments Current assignments map (will be modified)
+   * @param targetBucketsPerNode Target number of buckets per node
+   * @param remainder Remainder when dividing total buckets by node count
+   */
+  private void balanceBucketsForNewNodes(
+      List<AmsServerInfo> aliveNodes,
+      Set<AmsServerInfo> newNodes,
+      Map<AmsServerInfo, List<String>> currentAssignments,
+      int targetBucketsPerNode,
+      int remainder) {
+    if (newNodes.isEmpty()) {
+      return;
+    }
+
+    // Calculate how many buckets each new node should get
+    int bucketsPerNewNode = targetBucketsPerNode;
+    int newNodeIndex = 0;
+    for (AmsServerInfo newNode : newNodes) {
+      // First 'remainder' nodes get one extra bucket
+      int targetForNewNode = bucketsPerNewNode + (newNodeIndex < remainder ? 1 
: 0);
+      int currentCount = currentAssignments.get(newNode).size();
+      int needed = targetForNewNode - currentCount;
+
+      if (needed > 0) {
+        // Collect buckets from existing nodes (prefer nodes with more buckets)
+        List<String> bucketsToMove =
+            collectBucketsFromExistingNodes(aliveNodes, newNodes, 
currentAssignments, needed);
+        currentAssignments.get(newNode).addAll(bucketsToMove);
+        LOG.info(
+            "Moved {} buckets to new node {} (target: {})",
+            bucketsToMove.size(),
+            newNode,
+            targetForNewNode);
+      }
+      newNodeIndex++;
+    }
+  }
+
+  /**
+   * Collect buckets from existing nodes to balance for new nodes. Prefer 
taking from nodes that
+   * have more buckets than target.
+   *
+   * @param aliveNodes All alive nodes
+   * @param newNodes New nodes (excluded from source)
+   * @param currentAssignments Current assignments
+   * @param needed Number of buckets needed
+   * @return List of bucket IDs to move
+   */
+  private List<String> collectBucketsFromExistingNodes(
+      List<AmsServerInfo> aliveNodes,
+      Set<AmsServerInfo> newNodes,
+      Map<AmsServerInfo, List<String>> currentAssignments,
+      int needed) {
+    List<String> bucketsToMove = new ArrayList<>();
+    List<AmsServerInfo> existingNodes = new ArrayList<>();
+    for (AmsServerInfo node : aliveNodes) {
+      if (!newNodes.contains(node)) {
+        existingNodes.add(node);
+      }
+    }
+
+    if (existingNodes.isEmpty()) {
+      return bucketsToMove;
+    }
+
+    // Sort existing nodes by current bucket count (descending)
+    // This ensures we take from nodes with more buckets first
+    existingNodes.sort(
+        (n1, n2) -> {
+          int count1 = currentAssignments.get(n1).size();
+          int count2 = currentAssignments.get(n2).size();
+          return Integer.compare(count2, count1);
+        });
+
+    // Collect buckets from existing nodes using round-robin
+    int nodeIndex = 0;
+    int collected = 0;
+    while (collected < needed && !existingNodes.isEmpty()) {
+      AmsServerInfo sourceNode = existingNodes.get(nodeIndex % 
existingNodes.size());
+      List<String> sourceBuckets = currentAssignments.get(sourceNode);
+      if (!sourceBuckets.isEmpty()) {
+        // Take one bucket from this node
+        String bucketToMove = sourceBuckets.remove(0);
+        bucketsToMove.add(bucketToMove);
+        collected++;
+        LOG.debug("Moving bucket {} from node {} to new node", bucketToMove, 
sourceNode);
+      } else {
+        // This node has no more buckets, remove it from consideration
+        existingNodes.remove(sourceNode);
+        if (existingNodes.isEmpty()) {
+          break;
+        }
+        nodeIndex = nodeIndex % existingNodes.size();
+        continue;
+      }
+      nodeIndex++;
+    }
+
+    return bucketsToMove;
+  }
+
+  private List<String> generateBucketIds() {
+    List<String> bucketIds = new ArrayList<>();
+    for (int i = 1; i <= bucketIdTotalCount; i++) {
+      bucketIds.add(String.valueOf(i));
+    }
+    return bucketIds;
+  }
+
+  /**
+   * Get node key for matching nodes. Uses host:thriftBindPort format, 
consistent with
+   * ZkBucketAssignStore.getNodeKey().
+   */
+  private String getNodeKey(AmsServerInfo nodeInfo) {
+    return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+  }
+
+  private Map<String, AmsServerInfo> buildAliveNodeMap(List<AmsServerInfo> 
aliveNodes) {
+    Map<String, AmsServerInfo> map = new HashMap<>();
+    for (AmsServerInfo node : aliveNodes) {
+      map.put(getNodeKey(node), node);
+    }
+    return map;
+  }
+
+  private static class NormalizedAssignments {
+    final Map<AmsServerInfo, List<String>> assignments;
+    final Set<AmsServerInfo> assignedNodes;
+
+    NormalizedAssignments(
+        Map<AmsServerInfo, List<String>> assignments, Set<AmsServerInfo> 
assignedNodes) {
+      this.assignments = assignments;
+      this.assignedNodes = assignedNodes;
+    }
+  }
+
+  private NormalizedAssignments normalizeCurrentAssignments(
+      Map<AmsServerInfo, List<String>> currentAssignments,
+      Map<String, AmsServerInfo> aliveNodeMap) {
+    Map<AmsServerInfo, List<String>> normalized = new HashMap<>();
+    Set<AmsServerInfo> assignedNodes = new HashSet<>();
+    for (Map.Entry<AmsServerInfo, List<String>> entry : 
currentAssignments.entrySet()) {
+      AmsServerInfo storedNode = entry.getKey();
+      String nodeKey = getNodeKey(storedNode);
+      AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey);
+      if (aliveNode != null) {
+        normalized.put(aliveNode, entry.getValue());
+        assignedNodes.add(aliveNode);
+      } else {
+        normalized.put(storedNode, entry.getValue());
+        assignedNodes.add(storedNode);
+      }
+    }
+    return new NormalizedAssignments(normalized, assignedNodes);
+  }
+
+  private static class NodeChangeResult {
+    final Set<AmsServerInfo> newNodes;
+    final Set<AmsServerInfo> offlineNodes;
+
+    NodeChangeResult(Set<AmsServerInfo> newNodes, Set<AmsServerInfo> 
offlineNodes) {
+      this.newNodes = newNodes;
+      this.offlineNodes = offlineNodes;
+    }
+
+    boolean needReassign() {
+      return !newNodes.isEmpty() || !offlineNodes.isEmpty();
+    }
+  }
+
+  private NodeChangeResult detectNodeChanges(
+      List<AmsServerInfo> aliveNodes,
+      Map<AmsServerInfo, List<String>> currentAssignments,
+      Map<String, AmsServerInfo> aliveNodeMap,
+      Set<AmsServerInfo> currentAssignedNodes) {
+    Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
+    Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
+    newNodes.removeAll(currentAssignedNodes);
+
+    Set<AmsServerInfo> offlineNodes = new HashSet<>();
+    for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+      if (!aliveNodeMap.containsKey(getNodeKey(storedNode))) {
+        offlineNodes.add(storedNode);
+      }
+    }
+
+    long currentTime = System.currentTimeMillis();
+    Set<String> aliveNodeKeys = new HashSet<>();
+    for (AmsServerInfo node : aliveNodes) {
+      aliveNodeKeys.add(getNodeKey(node));
+    }
+    for (AmsServerInfo node : currentAssignedNodes) {
+      String nodeKey = getNodeKey(node);
+      if (aliveNodeKeys.contains(nodeKey)) {
+        try {
+          long lastUpdateTime = assignStore.getLastUpdateTime(node);
+          if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) > 
nodeOfflineTimeoutMs) {
+            for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+              if (getNodeKey(storedNode).equals(nodeKey)) {
+                offlineNodes.add(storedNode);
+                break;
+              }
+            }
+            LOG.warn(
+                "Node {} is considered offline due to timeout. Last update: 
{}",
+                node,
+                lastUpdateTime);
+          }
+        } catch (BucketAssignStoreException e) {
+          LOG.warn("Failed to get last update time for node {}, treating as 
offline", node, e);
+          for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+            if (getNodeKey(storedNode).equals(nodeKey)) {
+              offlineNodes.add(storedNode);
+              break;
+            }
+          }
+        }
+      }
+    }
+    return new NodeChangeResult(newNodes, offlineNodes);
+  }
+
+  private List<String> handleOfflineNodes(
+      Set<AmsServerInfo> offlineNodes, Map<AmsServerInfo, List<String>> 
currentAssignments) {
+    List<String> bucketsToRedistribute = new ArrayList<>();
+    for (AmsServerInfo offlineNode : offlineNodes) {
+      try {
+        List<String> offlineBuckets = currentAssignments.get(offlineNode);
+        if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
+          bucketsToRedistribute.addAll(offlineBuckets);
+          LOG.info(
+              "Collected {} buckets from offline node {} for redistribution",
+              offlineBuckets.size(),
+              offlineNode);
+        }
+        assignStore.removeAssignments(offlineNode);
+      } catch (BucketAssignStoreException e) {
+        LOG.warn("Failed to remove assignments for offline node {}", 
offlineNode, e);
+      }
+    }
+    return bucketsToRedistribute;
+  }
+
+  private Map<AmsServerInfo, List<String>> buildNewAssignments(
+      List<AmsServerInfo> aliveNodes,
+      Set<AmsServerInfo> offlineNodes,
+      Map<AmsServerInfo, List<String>> normalizedAssignments) {
+    Map<AmsServerInfo, List<String>> newAssignments = new HashMap<>();
+    Set<String> offlineNodeKeys = new HashSet<>();
+    for (AmsServerInfo offlineNode : offlineNodes) {
+      offlineNodeKeys.add(getNodeKey(offlineNode));
+    }
+    for (AmsServerInfo node : aliveNodes) {
+      String nodeKey = getNodeKey(node);
+      if (!offlineNodeKeys.contains(nodeKey)) {
+        List<String> existing = normalizedAssignments.get(node);
+        if (existing != null && !existing.isEmpty()) {
+          newAssignments.put(node, new ArrayList<>(existing));
+        } else {
+          newAssignments.put(node, new ArrayList<>());
+        }
+      } else {
+        newAssignments.put(node, new ArrayList<>());
+      }
+    }
+    return newAssignments;
+  }
+
+  private void rebalance(
+      List<AmsServerInfo> aliveNodes,
+      Set<AmsServerInfo> newNodes,
+      List<String> bucketsToRedistribute,
+      List<String> allBuckets,
+      Map<AmsServerInfo, List<String>> newAssignments) {
+    if (!bucketsToRedistribute.isEmpty()) {
+      redistributeBucketsIncrementally(aliveNodes, bucketsToRedistribute, 
newAssignments);
+    }
+
+    int totalBuckets = allBuckets.size();
+    int totalAliveNodes = aliveNodes.size();
+    int targetBucketsPerNode = totalBuckets / totalAliveNodes;
+    int remainder = totalBuckets % totalAliveNodes;
+    if (!newNodes.isEmpty()) {
+      balanceBucketsForNewNodes(
+          aliveNodes, newNodes, newAssignments, targetBucketsPerNode, 
remainder);
+    }
+
+    Set<String> allAssignedBuckets = new HashSet<>();
+    for (List<String> buckets : newAssignments.values()) {
+      allAssignedBuckets.addAll(buckets);
+    }
+    List<String> unassignedBuckets = new ArrayList<>();
+    for (String bucket : allBuckets) {
+      if (!allAssignedBuckets.contains(bucket)) {
+        unassignedBuckets.add(bucket);
+      }
+    }
+    if (!unassignedBuckets.isEmpty()) {
+      redistributeBucketsIncrementally(aliveNodes, unassignedBuckets, 
newAssignments);
+    }
+  }
+
+  private void persistAssignments(Map<AmsServerInfo, List<String>> 
newAssignments) {
+    for (Map.Entry<AmsServerInfo, List<String>> entry : 
newAssignments.entrySet()) {
+      try {
+        assignStore.saveAssignments(entry.getKey(), entry.getValue());
+        LOG.info(
+            "Assigned {} buckets to node {}: {}",
+            entry.getValue().size(),
+            entry.getKey(),
+            entry.getValue());
+      } catch (BucketAssignStoreException e) {
+        LOG.error("Failed to save assignments for node {}", entry.getKey(), e);
+      }
+    }
+  }
+
+  private void refreshLastUpdateTime(List<AmsServerInfo> aliveNodes) {
+    for (AmsServerInfo node : aliveNodes) {
+      try {
+        assignStore.updateLastUpdateTime(node);
+      } catch (BucketAssignStoreException e) {
+        LOG.warn("Failed to update last update time for node {}", node, e);
+      }
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
new file mode 100644
index 000000000..8db61265b
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for storing and retrieving bucket ID assignments to AMS nodes. 
Different
+ * implementations can use different storage backends (e.g., ZooKeeper, 
database).
+ */
+public interface BucketAssignStore {
+
+  /**
+   * Save bucket ID assignments for a node.
+   *
+   * @param nodeInfo The node information
+   * @param bucketIds List of bucket IDs assigned to this node
+   * @throws BucketAssignStoreException If save operation fails
+   */
+  void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+      throws BucketAssignStoreException;
+
+  /**
+   * Get bucket ID assignments for a node.
+   *
+   * @param nodeInfo The node information
+   * @return List of bucket IDs assigned to this node, empty list if not found
+   * @throws BucketAssignStoreException If retrieval operation fails
+   */
+  List<String> getAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException;
+
+  /**
+   * Remove bucket ID assignments for a node.
+   *
+   * @param nodeInfo The node information
+   * @throws BucketAssignStoreException If removal operation fails
+   */
+  void removeAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException;
+
+  /**
+   * Get all bucket ID assignments for all nodes.
+   *
+   * @return Map of node info to list of bucket IDs
+   * @throws BucketAssignStoreException If retrieval operation fails
+   */
+  Map<AmsServerInfo, List<String>> getAllAssignments() throws 
BucketAssignStoreException;
+
+  /**
+   * Get the last update time for a node's assignments.
+   *
+   * @param nodeInfo The node information
+   * @return Last update timestamp in milliseconds, 0 if not found
+   * @throws BucketAssignStoreException If retrieval operation fails
+   */
+  long getLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException;
+
+  /**
+   * Update the last update time for a node's assignments.
+   *
+   * @param nodeInfo The node information
+   * @throws BucketAssignStoreException If update operation fails
+   */
+  void updateLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException;
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
new file mode 100644
index 000000000..ec865db37
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating BucketAssignStore implementations based on HA 
configuration.
+ *
+ * <p>Supports different storage backends (ZK, database) according to HA type.
+ */
+public final class BucketAssignStoreFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketAssignStoreFactory.class);
+
+  private BucketAssignStoreFactory() {}
+
+  /**
+   * Creates a BucketAssignStore based on the given HA configuration and 
container.
+   *
+   * @param haContainer the HA container
+   * @param conf service configuration
+   * @return a BucketAssignStore implementation according to HA type
+   * @throws IllegalArgumentException if HA type is unsupported
+   * @throws RuntimeException if the ZK store cannot be created
+   */
+  public static BucketAssignStore create(
+      HighAvailabilityContainer haContainer, Configurations conf) {
+    String haType = conf.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+    String clusterName = conf.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+    switch (haType) {
+      case AmoroManagementConf.HA_TYPE_ZK:
+        if (haContainer instanceof ZkHighAvailabilityContainer) {
+          ZkHighAvailabilityContainer zkHaContainer = 
(ZkHighAvailabilityContainer) haContainer;
+          CuratorFramework zkClient = zkHaContainer.getZkClient();
+          if (zkClient != null) {
+            LOG.info("Creating ZkBucketAssignStore for cluster: {}", 
clusterName);
+            return new ZkBucketAssignStore(zkClient, clusterName);
+          }
+        }
+        throw new RuntimeException(
+            "Cannot create ZkBucketAssignStore: ZK client not available or 
invalid container type");
+
+      case AmoroManagementConf.HA_TYPE_DATABASE:
+        LOG.info("Creating DataBaseBucketAssignStore for cluster: {}", 
clusterName);
+        // TODO: Implement DataBaseBucketAssignStore when ready
+        throw new UnsupportedOperationException("DataBaseBucketAssignStore is 
not yet implemented");
+
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported ha.type: " + haType + ", only 'zk' or 'database' are 
allowed");
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
new file mode 100644
index 000000000..9ece14aa4
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.properties.AmsHAProperties;
+import 
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import 
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ZooKeeper-based implementation of BucketAssignStore. Stores bucket ID 
assignments in ZooKeeper
+ * with the following structure: 
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/assignments -
+ * bucket IDs 
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/last-update-time - timestamp
+ */
+public class ZkBucketAssignStore implements BucketAssignStore {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkBucketAssignStore.class);
+  private static final String ASSIGNMENTS_SUFFIX = "/assignments";
+  private static final String LAST_UPDATE_TIME_SUFFIX = "/last-update-time";
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final TypeReference<List<String>> LIST_STRING_TYPE =
+      new TypeReference<List<String>>() {};
+
+  private final CuratorFramework zkClient;
+  private final String assignmentsBasePath;
+
+  public ZkBucketAssignStore(CuratorFramework zkClient, String clusterName) {
+    this.zkClient = zkClient;
+    this.assignmentsBasePath = 
AmsHAProperties.getBucketAssignmentsPath(clusterName);
+    try {
+      createPathIfNeeded(assignmentsBasePath);
+    } catch (Exception e) {
+      LOG.error("Failed to create bucket assignments path", e);
+      throw new RuntimeException("Failed to initialize ZkBucketAssignStore", 
e);
+    }
+  }
+
+  @Override
+  public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+      throws BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String assignmentsPath = assignmentsBasePath + "/" + nodeKey + 
ASSIGNMENTS_SUFFIX;
+    String assignmentsJson = JacksonUtil.toJSONString(bucketIds);
+    try {
+      if (zkClient.checkExists().forPath(assignmentsPath) != null) {
+        zkClient
+            .setData()
+            .forPath(assignmentsPath, 
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+      } else {
+        zkClient
+            .create()
+            .creatingParentsIfNeeded()
+            .withMode(CreateMode.PERSISTENT)
+            .forPath(assignmentsPath, 
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+      }
+      updateLastUpdateTime(nodeInfo);
+      LOG.debug("Saved bucket assignments for node {}: {}", nodeKey, 
bucketIds);
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to save bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public List<String> getAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String assignmentsPath = assignmentsBasePath + "/" + nodeKey + 
ASSIGNMENTS_SUFFIX;
+    try {
+      if (zkClient.checkExists().forPath(assignmentsPath) == null) {
+        return new ArrayList<>();
+      }
+      byte[] data = zkClient.getData().forPath(assignmentsPath);
+      if (data == null || data.length == 0) {
+        return new ArrayList<>();
+      }
+      String assignmentsJson = new String(data, StandardCharsets.UTF_8);
+      return OBJECT_MAPPER.readValue(assignmentsJson, LIST_STRING_TYPE);
+    } catch (KeeperException.NoNodeException e) {
+      return new ArrayList<>();
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to get bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to get bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public void removeAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String nodePath = assignmentsBasePath + "/" + nodeKey;
+    try {
+      if (zkClient.checkExists().forPath(nodePath) != null) {
+        zkClient.delete().deletingChildrenIfNeeded().forPath(nodePath);
+        LOG.debug("Removed bucket assignments for node {}", nodeKey);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // Already deleted, ignore
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to remove bucket assignments for node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public Map<AmsServerInfo, List<String>> getAllAssignments() throws 
BucketAssignStoreException {
+    Map<AmsServerInfo, List<String>> allAssignments = new HashMap<>();
+    try {
+      if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
+        return allAssignments;
+      }
+      List<String> nodeKeys = 
zkClient.getChildren().forPath(assignmentsBasePath);
+      for (String nodeKey : nodeKeys) {
+        try {
+          AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
+          List<String> bucketIds = getAssignments(nodeInfo);
+          if (!bucketIds.isEmpty()) {
+            allAssignments.put(nodeInfo, bucketIds);
+          }
+        } catch (BucketAssignStoreException e) {
+          throw e;
+        } catch (Exception e) {
+          LOG.warn("Failed to parse node key or get assignments: {}", nodeKey, 
e);
+        }
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // Path doesn't exist, return empty map
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to get all bucket assignments", e);
+      throw new BucketAssignStoreException("Failed to get all bucket 
assignments", e);
+    }
+    return allAssignments;
+  }
+
+  @Override
+  public long getLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String timePath = assignmentsBasePath + "/" + nodeKey + 
LAST_UPDATE_TIME_SUFFIX;
+    try {
+      if (zkClient.checkExists().forPath(timePath) == null) {
+        return 0;
+      }
+      byte[] data = zkClient.getData().forPath(timePath);
+      if (data == null || data.length == 0) {
+        return 0;
+      }
+      return Long.parseLong(new String(data, StandardCharsets.UTF_8));
+    } catch (KeeperException.NoNodeException e) {
+      return 0;
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to get last update time for node {}", nodeKey, e);
+      throw new BucketAssignStoreException("Failed to get last update time for 
node " + nodeKey, e);
+    }
+  }
+
+  @Override
+  public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+    String nodeKey = getNodeKey(nodeInfo);
+    String timePath = assignmentsBasePath + "/" + nodeKey + 
LAST_UPDATE_TIME_SUFFIX;
+    long currentTime = System.currentTimeMillis();
+    String timeStr = String.valueOf(currentTime);
+    try {
+      if (zkClient.checkExists().forPath(timePath) != null) {
+        zkClient.setData().forPath(timePath, 
timeStr.getBytes(StandardCharsets.UTF_8));
+      } else {
+        zkClient
+            .create()
+            .creatingParentsIfNeeded()
+            .withMode(CreateMode.PERSISTENT)
+            .forPath(timePath, timeStr.getBytes(StandardCharsets.UTF_8));
+      }
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to update last update time for node {}", nodeKey, e);
+      throw new BucketAssignStoreException(
+          "Failed to update last update time for node " + nodeKey, e);
+    }
+  }
+
+  private String getNodeKey(AmsServerInfo nodeInfo) {
+    return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+  }
+
+  private AmsServerInfo parseNodeKey(String nodeKey) {
+    String[] parts = nodeKey.split(":");
+    if (parts.length != 2) {
+      throw new IllegalArgumentException("Invalid node key format: " + 
nodeKey);
+    }
+    AmsServerInfo nodeInfo = new AmsServerInfo();
+    nodeInfo.setHost(parts[0]);
+    nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+    return nodeInfo;
+  }
+
+  private void createPathIfNeeded(String path) throws 
BucketAssignStoreException {
+    try {
+      
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
+    } catch (KeeperException.NodeExistsException e) {
+      // ignore
+    } catch (Exception e) {
+      throw new BucketAssignStoreException("Failed to create path: " + path, 
e);
+    }
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 7c070f3b6..0aa7b6e35 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -57,16 +57,6 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
 
   private final LeaderLatch leaderLatch;
   private final CuratorFramework zkClient;
-
-  // Package-private accessors for testing
-  CuratorFramework getZkClient() {
-    return zkClient;
-  }
-
-  LeaderLatch getLeaderLatch() {
-    return leaderLatch;
-  }
-
   private final String tableServiceMasterPath;
   private final String optimizingServiceMasterPath;
   private final String nodesPath;
@@ -295,6 +285,24 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
     return leaderLatch.hasLeadership();
   }
 
+  /**
+   * Get the current node's table service server info.
+   *
+   * @return The current node's server info, null if HA is not enabled
+   */
+  public AmsServerInfo getTableServiceServerInfo() {
+    return tableServiceServerInfo;
+  }
+
+  /**
+   * Get the ZooKeeper client. This is used for creating BucketAssignStore.
+   *
+   * @return The ZooKeeper client, null if HA is not enabled
+   */
+  public CuratorFramework getZkClient() {
+    return zkClient;
+  }
+
   private void createPathIfNeeded(String path) throws Exception {
     try {
       
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
new file mode 100644
index 000000000..57f678db8
--- /dev/null
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for AmsAssignService using mocked ZK to avoid connection issues. */
+public class TestAmsAssignService {
+
+  private Configurations serviceConfig;
+  private HighAvailabilityContainer haContainer;
+  private AmsAssignService assignService;
+  private AmsServerInfo node1;
+  private AmsServerInfo node2;
+  private AmsServerInfo node3;
+  private MockZkState mockZkState;
+  private CuratorFramework mockZkClient;
+  private LeaderLatch mockLeaderLatch;
+  private MockBucketAssignStore mockAssignStore;
+
+  @Before
+  public void setUp() throws Exception {
+    mockZkState = new MockZkState();
+    mockZkClient = createMockZkClient();
+    mockLeaderLatch = createMockLeaderLatch(true); // Is leader by default
+    mockAssignStore = new MockBucketAssignStore();
+
+    serviceConfig = new Configurations();
+    serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, 
"127.0.0.1");
+    
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 
1260);
+    
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
 1261);
+    serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+    serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+    serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS, 
"127.0.0.1:2181");
+    serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME, 
"test-cluster");
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    serviceConfig.setInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT, 
100);
+    serviceConfig.set(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT, 
java.time.Duration.ofMinutes(5));
+
+    haContainer = createContainerWithMockZk();
+
+    // Create AmsAssignService with mock assign store
+    assignService = createAssignServiceWithMockStore();
+
+    node1 = new AmsServerInfo();
+    node1.setHost("127.0.0.1");
+    node1.setThriftBindPort(1260);
+    node1.setRestBindPort(1630);
+
+    node2 = new AmsServerInfo();
+    node2.setHost("127.0.0.2");
+    node2.setThriftBindPort(1262);
+    node2.setRestBindPort(1632);
+
+    node3 = new AmsServerInfo();
+    node3.setHost("127.0.0.3");
+    node3.setThriftBindPort(1263);
+    node3.setRestBindPort(1633);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (assignService != null) {
+      assignService.stop();
+    }
+    if (haContainer != null) {
+      haContainer.close();
+    }
+    mockZkState.clear();
+  }
+
+  @Test
+  public void testInitialAssignment() throws Exception {
+    // Register nodes
+    haContainer.registerAndElect();
+
+    // Create second node
+    Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer haContainer2 = 
createContainerWithMockZk(config2);
+    haContainer2.registerAndElect();
+
+    try {
+      // Wait a bit for registration
+      Thread.sleep(100);
+
+      // Trigger assignment manually
+      assignService.doAssign();
+
+      // Check assignments
+      Map<AmsServerInfo, List<String>> assignments = 
mockAssignStore.getAllAssignments();
+      Assert.assertEquals("Should have assignments for 2 nodes", 2, 
assignments.size());
+
+      // Verify buckets are distributed
+      int totalAssigned = 0;
+      for (List<String> buckets : assignments.values()) {
+        totalAssigned += buckets.size();
+        Assert.assertTrue("Each node should have buckets", !buckets.isEmpty());
+      }
+      Assert.assertEquals("All buckets should be assigned", 100, 
totalAssigned);
+
+      // Verify balance (difference should be at most 1)
+      List<Integer> bucketCounts = new ArrayList<>();
+      for (List<String> buckets : assignments.values()) {
+        bucketCounts.add(buckets.size());
+      }
+      int max = 
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+      int min = 
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+      Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+    } finally {
+      haContainer2.close();
+    }
+  }
+
+  @Test
+  public void testNodeOfflineReassignment() throws Exception {
+    // Setup: 2 nodes with initial assignment
+    haContainer.registerAndElect();
+    Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer haContainer2 = 
createContainerWithMockZk(config2);
+    haContainer2.registerAndElect();
+
+    try {
+      Thread.sleep(100);
+
+      // Initial assignment
+      assignService.doAssign();
+      Map<AmsServerInfo, List<String>> initialAssignments = 
mockAssignStore.getAllAssignments();
+      Assert.assertEquals("Should have 2 nodes", 2, initialAssignments.size());
+
+      // Verify initial assignment is balanced
+      List<Integer> initialCounts = new ArrayList<>();
+      for (List<String> buckets : initialAssignments.values()) {
+        initialCounts.add(buckets.size());
+      }
+      int maxInitial = 
initialCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+      int minInitial = 
initialCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+      Assert.assertTrue("Initial assignment should be balanced", maxInitial - 
minInitial <= 1);
+
+      // Simulate node2 going offline by removing it from mock state
+      mockZkState.deleteNodeByHost("127.0.0.2");
+      Thread.sleep(100);
+
+      // Trigger reassignment
+      assignService.doAssign();
+
+      // Check that node2's buckets are redistributed
+      Map<AmsServerInfo, List<String>> newAssignments = 
mockAssignStore.getAllAssignments();
+      Assert.assertEquals("Should have 1 node after offline", 1, 
newAssignments.size());
+
+      // The only remaining node (node1) should have all buckets. ZK stores
+      // optimizingServiceServerInfo (thrift port 1261), not table port 
(1260), so we
+      // take the single entry instead of matching by node1's thriftBindPort.
+      List<String> remainingBuckets = 
newAssignments.values().iterator().next();
+      Assert.assertNotNull("Node1 should have assignments", remainingBuckets);
+      Assert.assertEquals("Node1 should have all buckets", 100, 
remainingBuckets.size());
+    } finally {
+      try {
+        haContainer2.close();
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+  }
+
+  @Test
+  public void testNewNodeIncrementalAssignment() throws Exception {
+    // Setup: 1 node initially
+    haContainer.registerAndElect();
+    Thread.sleep(100);
+
+    // Initial assignment - all buckets to node1
+    assignService.doAssign();
+    Map<AmsServerInfo, List<String>> initialAssignments = 
mockAssignStore.getAllAssignments();
+    // ZK stores optimizing port (1261), not table port (1260); match by host 
only (single node).
+    List<String> node1InitialBuckets = findBucketsByHost(initialAssignments, 
node1.getHost());
+    Assert.assertNotNull("Node1 should have assignments", node1InitialBuckets);
+    Assert.assertEquals("Node1 should have all buckets initially", 100, 
node1InitialBuckets.size());
+
+    // Add new node
+    Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer haContainer2 = 
createContainerWithMockZk(config2);
+    haContainer2.registerAndElect();
+
+    try {
+      Thread.sleep(100);
+
+      // Trigger reassignment
+      assignService.doAssign();
+
+      // Check assignments
+      Map<AmsServerInfo, List<String>> newAssignments = 
mockAssignStore.getAllAssignments();
+      Assert.assertEquals("Should have 2 nodes", 2, newAssignments.size());
+
+      // Verify incremental assignment - node1 should keep most of its buckets.
+      // ZK stores optimizing port, not table port; match by host.
+      List<String> node1NewBuckets = findBucketsByHost(newAssignments, 
node1.getHost());
+      Assert.assertNotNull("Node1 should still have assignments", 
node1NewBuckets);
+
+      // Node1 should have kept most buckets (incremental assignment)
+      Assert.assertTrue("Node1 should keep some buckets", 
node1NewBuckets.size() > 0);
+
+      // Verify balance
+      List<Integer> bucketCounts = new ArrayList<>();
+      for (List<String> buckets : newAssignments.values()) {
+        bucketCounts.add(buckets.size());
+      }
+      int max = 
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+      int min = 
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+      Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+
+      // Verify total
+      int total = bucketCounts.stream().mapToInt(Integer::intValue).sum();
+      Assert.assertEquals("Total buckets should be 100", 100, total);
+    } finally {
+      haContainer2.close();
+    }
+  }
+
+  @Test
+  public void testBalanceAfterNodeChanges() throws Exception {
+    // Setup: 3 nodes
+    haContainer.registerAndElect();
+    Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer haContainer2 = 
createContainerWithMockZk(config2);
+    haContainer2.registerAndElect();
+    Configurations config3 = createNodeConfig("127.0.0.3", 1263, 1633);
+    HighAvailabilityContainer haContainer3 = 
createContainerWithMockZk(config3);
+    haContainer3.registerAndElect();
+
+    try {
+      Thread.sleep(200);
+
+      // Initial assignment
+      assignService.doAssign();
+
+      // Verify balance
+      Map<AmsServerInfo, List<String>> assignments = 
mockAssignStore.getAllAssignments();
+      Assert.assertEquals("Should have 3 nodes", 3, assignments.size());
+
+      List<Integer> bucketCounts = new ArrayList<>();
+      for (List<String> buckets : assignments.values()) {
+        bucketCounts.add(buckets.size());
+      }
+      int max = 
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+      int min = 
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+      Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+
+      // Verify all buckets are assigned
+      int total = bucketCounts.stream().mapToInt(Integer::intValue).sum();
+      Assert.assertEquals("All buckets should be assigned", 100, total);
+    } finally {
+      haContainer2.close();
+      haContainer3.close();
+    }
+  }
+
+  @Test
+  public void testIncrementalAssignmentMinimizesMigration() throws Exception {
+    // Setup: 2 nodes initially
+    haContainer.registerAndElect();
+    Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer haContainer2 = 
createContainerWithMockZk(config2);
+    haContainer2.registerAndElect();
+    HighAvailabilityContainer haContainer3 = null;
+
+    try {
+      Thread.sleep(100);
+
+      // Initial assignment
+      assignService.doAssign();
+      Map<AmsServerInfo, List<String>> initialAssignments = 
mockAssignStore.getAllAssignments();
+
+      // Record initial assignments
+      Set<String> node1InitialBuckets = new HashSet<>();
+      Set<String> node2InitialBuckets = new HashSet<>();
+      for (Map.Entry<AmsServerInfo, List<String>> entry : 
initialAssignments.entrySet()) {
+        if (entry.getKey().getHost().equals("127.0.0.1")) {
+          node1InitialBuckets.addAll(entry.getValue());
+        } else {
+          node2InitialBuckets.addAll(entry.getValue());
+        }
+      }
+
+      // Add new node
+      Configurations config3 = createNodeConfig("127.0.0.3", 1263, 1633);
+      haContainer3 = createContainerWithMockZk(config3);
+      haContainer3.registerAndElect();
+
+      Thread.sleep(100);
+
+      // Trigger reassignment
+      assignService.doAssign();
+
+      // Check new assignments
+      Map<AmsServerInfo, List<String>> newAssignments = 
mockAssignStore.getAllAssignments();
+
+      // Calculate migration: buckets that moved from node1 or node2
+      Set<String> node1NewBuckets = new HashSet<>();
+      Set<String> node2NewBuckets = new HashSet<>();
+      Set<String> node3Buckets = new HashSet<>();
+      for (Map.Entry<AmsServerInfo, List<String>> entry : 
newAssignments.entrySet()) {
+        if (entry.getKey().getHost().equals("127.0.0.1")) {
+          node1NewBuckets.addAll(entry.getValue());
+        } else if (entry.getKey().getHost().equals("127.0.0.2")) {
+          node2NewBuckets.addAll(entry.getValue());
+        } else {
+          node3Buckets.addAll(entry.getValue());
+        }
+      }
+
+      // Node1 and Node2 should keep most of their buckets
+      Set<String> node1Kept = new HashSet<>(node1InitialBuckets);
+      node1Kept.retainAll(node1NewBuckets);
+      Set<String> node2Kept = new HashSet<>(node2InitialBuckets);
+      node2Kept.retainAll(node2NewBuckets);
+
+      // Verify incremental assignment: nodes should keep most buckets
+      Assert.assertTrue(
+          "Node1 should keep most buckets (incremental)",
+          node1Kept.size() > node1InitialBuckets.size() / 2);
+      Assert.assertTrue(
+          "Node2 should keep most buckets (incremental)",
+          node2Kept.size() > node2InitialBuckets.size() / 2);
+
+      // Node3 should get buckets from both
+      Assert.assertTrue("Node3 should have buckets", node3Buckets.size() > 0);
+    } finally {
+      haContainer2.close();
+      if (haContainer3 != null) {
+        try {
+          haContainer3.close();
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testServiceStartStop() {
+    // Test that service can start and stop without errors
+    assignService.start();
+    Assert.assertTrue("Service should be running", assignService.isRunning());
+
+    assignService.stop();
+    Assert.assertFalse("Service should be stopped", assignService.isRunning());
+  }
+
+  @Test
+  public void testServiceSkipsWhenNotLeader() throws Exception {
+    // Create a non-leader container
+    mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+    Configurations nonLeaderConfig = createNodeConfig("127.0.0.2", 1262, 1632);
+    HighAvailabilityContainer nonLeaderContainer = 
createContainerWithMockZk(nonLeaderConfig);
+    nonLeaderContainer.registerAndElect();
+
+    try {
+      // Wait a bit
+      Thread.sleep(100);
+
+      AmsAssignService nonLeaderService = 
createAssignServiceWithMockStore(nonLeaderContainer);
+
+      // Should not throw exception even if not leader
+      nonLeaderService.doAssign();
+
+      // Should not have assignments if not leader
+      Map<AmsServerInfo, List<String>> assignments = 
mockAssignStore.getAllAssignments();
+      // Verify that non-leader doesn't create assignments
+      Assert.assertTrue(
+          "Non-leader should not create assignments",
+          assignments.isEmpty() || assignments.size() == 0);
+    } finally {
+      nonLeaderContainer.close();
+    }
+  }
+
+  private Configurations createNodeConfig(String host, int thriftPort, int 
httpPort) {
+    Configurations config = new Configurations();
+    config.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, host);
+    config.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 
thriftPort);
+    config.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT, 
thriftPort + 1);
+    config.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, httpPort);
+    config.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+    config.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS, 
"127.0.0.1:2181");
+    config.setString(AmoroManagementConf.HA_CLUSTER_NAME, "test-cluster");
+    config.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    config.setInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT, 100);
+    config.set(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT, 
java.time.Duration.ofMinutes(5));
+    return config;
+  }
+
+  /**
+   * Find bucket list by node host (assignments use ZK node info with 
optimizing port, not table
+   * port).
+   */
+  private static List<String> findBucketsByHost(
+      Map<AmsServerInfo, List<String>> assignments, String host) {
+    for (Map.Entry<AmsServerInfo, List<String>> entry : 
assignments.entrySet()) {
+      if (host.equals(entry.getKey().getHost())) {
+        return entry.getValue();
+      }
+    }
+    return null;
+  }
+
+  /** Create HighAvailabilityContainer with mocked ZK components using 
reflection. */
+  private HighAvailabilityContainer createContainerWithMockZk() throws 
Exception {
+    return createContainerWithMockZk(serviceConfig);
+  }
+
+  /** Create HighAvailabilityContainer with mocked ZK components using 
reflection. */
+  private HighAvailabilityContainer createContainerWithMockZk(Configurations 
config)
+      throws Exception {
+    // Create container without ZK connection to avoid any connection attempts
+    HighAvailabilityContainer container = createContainerWithoutZk(config);
+
+    // Inject mock ZK client and leader latch
+    java.lang.reflect.Field zkClientField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+    zkClientField.setAccessible(true);
+    zkClientField.set(container, mockZkClient);
+
+    java.lang.reflect.Field leaderLatchField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+    leaderLatchField.setAccessible(true);
+    leaderLatchField.set(container, mockLeaderLatch);
+
+    return container;
+  }
+
+  /** Create a HighAvailabilityContainer without initializing ZK connection. */
+  private HighAvailabilityContainer createContainerWithoutZk(Configurations 
config)
+      throws Exception {
+    java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+        
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+    // Create a minimal config that disables HA to avoid ZK connection
+    Configurations tempConfig = new Configurations(config);
+    tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+    HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+    // Now set all required fields using reflection
+    java.lang.reflect.Field isMasterSlaveModeField =
+        
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+    isMasterSlaveModeField.setAccessible(true);
+    isMasterSlaveModeField.set(
+        container, 
config.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+    if (config.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+      String haClusterName = 
config.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+      java.lang.reflect.Field tableServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+      tableServiceMasterPathField.setAccessible(true);
+      tableServiceMasterPathField.set(
+          container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field optimizingServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+      optimizingServiceMasterPathField.setAccessible(true);
+      optimizingServiceMasterPathField.set(
+          container, 
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field nodesPathField =
+          ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+      nodesPathField.setAccessible(true);
+      nodesPathField.set(container, 
AmsHAProperties.getNodesPath(haClusterName));
+
+      java.lang.reflect.Field tableServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+      tableServiceServerInfoField.setAccessible(true);
+      AmsServerInfo tableServiceServerInfo =
+          buildServerInfo(
+              config.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
config.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+              config.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+      java.lang.reflect.Field optimizingServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+      optimizingServiceServerInfoField.setAccessible(true);
+      AmsServerInfo optimizingServiceServerInfo =
+          buildServerInfo(
+              config.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
config.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+              config.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      optimizingServiceServerInfoField.set(container, 
optimizingServiceServerInfo);
+    }
+
+    return container;
+  }
+
+  /** Helper method to build AmsServerInfo. */
+  private AmsServerInfo buildServerInfo(String host, Integer thriftPort, 
Integer httpPort) {
+    AmsServerInfo serverInfo = new AmsServerInfo();
+    serverInfo.setHost(host);
+    serverInfo.setThriftBindPort(thriftPort);
+    serverInfo.setRestBindPort(httpPort);
+    return serverInfo;
+  }
+
+  /** Create AmsAssignService with mock BucketAssignStore. */
+  private AmsAssignService createAssignServiceWithMockStore() throws Exception 
{
+    return createAssignServiceWithMockStore(haContainer);
+  }
+
+  /** Create AmsAssignService with mock BucketAssignStore. */
+  private AmsAssignService 
createAssignServiceWithMockStore(HighAvailabilityContainer container)
+      throws Exception {
+    AmsAssignService service = new AmsAssignService(container, serviceConfig);
+
+    // Use reflection to inject mock assign store
+    java.lang.reflect.Field assignStoreField =
+        AmsAssignService.class.getDeclaredField("assignStore");
+    assignStoreField.setAccessible(true);
+    assignStoreField.set(service, mockAssignStore);
+
+    return service;
+  }
+
+  /** Create a mock CuratorFramework that uses MockZkState for storage. */
+  @SuppressWarnings("unchecked")
+  private CuratorFramework createMockZkClient() throws Exception {
+    CuratorFramework mockClient = mock(CuratorFramework.class);
+
+    // Mock getChildren()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+        getChildrenBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .GetChildrenBuilder.class);
+    when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+    when(getChildrenBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getChildren(path);
+            });
+
+    // Mock getData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+        getDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+                    .class);
+    when(mockClient.getData()).thenReturn(getDataBuilder);
+    when(getDataBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getData(path);
+            });
+
+    // Mock create() - manually create the entire fluent API chain
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
 createBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+    @SuppressWarnings("unchecked")
+    org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                .ProtectACLCreateModeStatPathAndBytesable<
+            String>
+        pathAndBytesable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .ProtectACLCreateModeStatPathAndBytesable.class);
+
+    when(mockClient.create()).thenReturn(createBuilder);
+    when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+    
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+    // Mock forPath(path, data) - used by registAndElect() and 
saveAssignments()
+    when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              return mockZkState.createNode(path, data);
+            });
+
+    // Mock forPath(path) - used by createPathIfNeeded()
+    when(pathAndBytesable.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              if (mockZkState.exists(path) == null) {
+                mockZkState.createNode(path, new byte[0]);
+              }
+              return null;
+            });
+
+    // Mock setData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+        setDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+                    .class);
+    when(mockClient.setData()).thenReturn(setDataBuilder);
+    when(setDataBuilder.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              mockZkState.setData(path, data);
+              return null;
+            });
+
+    // Mock delete()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
 deleteBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+    when(mockClient.delete()).thenReturn(deleteBuilder);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNode(path);
+              return null;
+            })
+        .when(deleteBuilder)
+        .forPath(anyString());
+
+    // Mock deletingChildrenIfNeeded()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+        childrenDeletable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+                    .class);
+    
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNodeRecursive(path);
+              return null;
+            })
+        .when(childrenDeletable)
+        .forPath(anyString());
+
+    // Mock checkExists()
+    @SuppressWarnings("unchecked")
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+        checkExistsBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+                    .class);
+    when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+    when(checkExistsBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.exists(path);
+            });
+
+    // Mock start() and close()
+    doAnswer(invocation -> null).when(mockClient).start();
+    doAnswer(invocation -> null).when(mockClient).close();
+
+    return mockClient;
+  }
+
+  /** Create a mock LeaderLatch with specified leadership status. */
+  private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws 
Exception {
+    LeaderLatch mockLatch = mock(LeaderLatch.class);
+    when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+    doAnswer(invocation -> null).when(mockLatch).addListener(any());
+    doAnswer(invocation -> null).when(mockLatch).start();
+    doAnswer(invocation -> null).when(mockLatch).close();
+    doAnswer(
+            invocation -> {
+              // Mock implementation - doesn't actually wait
+              return null;
+            })
+        .when(mockLatch)
+        .await();
+    return mockLatch;
+  }
+
+  /** In-memory ZK state simulator. */
+  private static class MockZkState {
+    private final Map<String, byte[]> nodes = new HashMap<>();
+    private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+    public List<String> getChildren(String path) throws KeeperException {
+      List<String> children = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+          String relativePath = nodePath.substring(prefix.length());
+          if (!relativePath.contains("/")) {
+            children.add(relativePath);
+          }
+        }
+      }
+      children.sort(String::compareTo);
+      return children;
+    }
+
+    public byte[] getData(String path) throws KeeperException {
+      byte[] data = nodes.get(path);
+      if (data == null) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      return data;
+    }
+
+    public void setData(String path, byte[] data) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.put(path, data);
+    }
+
+    public String createNode(String path, byte[] data) {
+      // Handle sequential nodes
+      if (path.endsWith("-")) {
+        int seq = sequenceCounter.incrementAndGet();
+        path = path + String.format("%010d", seq);
+      }
+      nodes.put(path, data);
+      return path;
+    }
+
+    public void deleteNode(String path) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.remove(path);
+    }
+
+    public void deleteNodeRecursive(String path) throws KeeperException {
+      // Delete the node and all its children
+      List<String> toDelete = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        if (nodePath.equals(path) || nodePath.startsWith(prefix)) {
+          toDelete.add(nodePath);
+        }
+      }
+      for (String nodePath : toDelete) {
+        nodes.remove(nodePath);
+      }
+    }
+
+    public void deleteNodeByHost(String host) {
+      // Delete all nodes that have this host in their data (JSON)
+      List<String> toDelete = new ArrayList<>();
+      for (Map.Entry<String, byte[]> entry : nodes.entrySet()) {
+        String nodePath = entry.getKey();
+        byte[] data = entry.getValue();
+        // Check if this is a node registration path (contains "/node-")
+        if (nodePath.contains("/node-") && data != null && data.length > 0) {
+          try {
+            String nodeInfoJson = new String(data, 
java.nio.charset.StandardCharsets.UTF_8);
+            // Parse JSON to check host
+            AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, 
AmsServerInfo.class);
+            if (nodeInfo != null && host.equals(nodeInfo.getHost())) {
+              toDelete.add(nodePath);
+            }
+          } catch (Exception e) {
+            // Ignore parsing errors
+          }
+        }
+      }
+      for (String nodePath : toDelete) {
+        nodes.remove(nodePath);
+      }
+    }
+
+    public Stat exists(String path) {
+      return nodes.containsKey(path) ? new Stat() : null;
+    }
+
+    public void clear() {
+      nodes.clear();
+      sequenceCounter.set(0);
+    }
+  }
+
+  /** In-memory implementation of BucketAssignStore for testing. */
+  private static class MockBucketAssignStore implements BucketAssignStore {
+    private final Map<String, List<String>> assignments = new HashMap<>();
+    private final Map<String, Long> lastUpdateTimes = new HashMap<>();
+    // Store full AmsServerInfo for proper matching
+    private final Map<String, AmsServerInfo> nodeInfoMap = new HashMap<>();
+
+    private String getNodeKey(AmsServerInfo nodeInfo) {
+      return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+    }
+
+    @Override
+    public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+        throws BucketAssignStoreException {
+      String nodeKey = getNodeKey(nodeInfo);
+      assignments.put(nodeKey, new ArrayList<>(bucketIds));
+      // Store full node info for proper matching
+      nodeInfoMap.put(nodeKey, nodeInfo);
+      updateLastUpdateTime(nodeInfo);
+    }
+
+    @Override
+    public List<String> getAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+      String nodeKey = getNodeKey(nodeInfo);
+      return new ArrayList<>(assignments.getOrDefault(nodeKey, new 
ArrayList<>()));
+    }
+
+    @Override
+    public void removeAssignments(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+      String nodeKey = getNodeKey(nodeInfo);
+      assignments.remove(nodeKey);
+      lastUpdateTimes.remove(nodeKey);
+      nodeInfoMap.remove(nodeKey);
+    }
+
+    @Override
+    public Map<AmsServerInfo, List<String>> getAllAssignments() throws 
BucketAssignStoreException {
+      Map<AmsServerInfo, List<String>> result = new HashMap<>();
+      for (Map.Entry<String, List<String>> entry : assignments.entrySet()) {
+        String nodeKey = entry.getKey();
+        // Use stored full node info if available, otherwise parse from key
+        AmsServerInfo nodeInfo = nodeInfoMap.getOrDefault(nodeKey, 
parseNodeKey(nodeKey));
+        result.put(nodeInfo, new ArrayList<>(entry.getValue()));
+      }
+      return result;
+    }
+
+    @Override
+    public long getLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+      String nodeKey = getNodeKey(nodeInfo);
+      return lastUpdateTimes.getOrDefault(nodeKey, 0L);
+    }
+
+    @Override
+    public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
+      String nodeKey = getNodeKey(nodeInfo);
+      lastUpdateTimes.put(nodeKey, System.currentTimeMillis());
+    }
+
+    private AmsServerInfo parseNodeKey(String nodeKey) {
+      String[] parts = nodeKey.split(":");
+      if (parts.length != 2) {
+        throw new IllegalArgumentException("Invalid node key format: " + 
nodeKey);
+      }
+      AmsServerInfo nodeInfo = new AmsServerInfo();
+      nodeInfo.setHost(parts[0]);
+      nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+      return nodeInfo;
+    }
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
new file mode 100644
index 000000000..19048896a
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for HighAvailabilityContainer using mocked ZK to avoid connection 
issues. */
+public class TestHighAvailabilityContainer {
+
+  private Configurations serviceConfig;
+  private HighAvailabilityContainer haContainer;
+  private MockZkState mockZkState;
+  private CuratorFramework mockZkClient;
+  private LeaderLatch mockLeaderLatch;
+
+  @Before
+  public void setUp() throws Exception {
+    mockZkState = new MockZkState();
+    mockZkClient = createMockZkClient();
+    mockLeaderLatch = createMockLeaderLatch();
+
+    // Create test configuration
+    serviceConfig = new Configurations();
+    serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, 
"127.0.0.1");
+    
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT, 
1260);
+    
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
 1261);
+    serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+    serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+    serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS, 
"127.0.0.1:2181");
+    serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME, 
"test-cluster");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (haContainer != null) {
+      haContainer.close();
+    }
+    mockZkState.clear();
+  }
+
+  @Test
+  public void testRegistAndElectWithoutMasterSlaveMode() throws Exception {
+    // Test that node registration is skipped when master-slave mode is 
disabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+    haContainer = createContainerWithMockZk();
+
+    // Should not throw exception and should not register node
+    haContainer.registerAndElect();
+
+    // Verify no node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals(
+        "No nodes should be registered when master-slave mode is disabled", 0, 
children.size());
+  }
+
+  @Test
+  public void testRegistAndElectWithMasterSlaveMode() throws Exception {
+    // Test that node registration works when master-slave mode is enabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("One node should be registered", 1, children.size());
+
+    // Verify node data
+    String nodePath = nodesPath + "/" + children.get(0);
+    byte[] data = mockZkState.getData(nodePath);
+    Assert.assertNotNull("Node data should not be null", data);
+    Assert.assertTrue("Node data should not be empty", data.length > 0);
+
+    // Verify node info
+    String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+    AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, 
AmsServerInfo.class);
+    Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+    Assert.assertEquals(
+        "Thrift port should match", Integer.valueOf(1261), 
nodeInfo.getThriftBindPort());
+  }
+
+  @Test
+  public void testGetAliveNodesWithoutMasterSlaveMode() throws Exception {
+    // Test that getAliveNodes returns empty list when master-slave mode is 
disabled
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+    haContainer = createContainerWithMockZk();
+
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals(
+        "Alive nodes list should be empty when master-slave mode is disabled",
+        0,
+        aliveNodes.size());
+  }
+
+  @Test
+  public void testGetAliveNodesWhenNotLeader() throws Exception {
+    // Test that getAliveNodes returns empty list when not leader
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Since we're not the leader, should return empty list
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Alive nodes list should be empty when not leader", 0, 
aliveNodes.size());
+  }
+
+  @Test
+  public void testGetAliveNodesAsLeader() throws Exception {
+    // Test that getAliveNodes returns nodes when leader
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify we are leader
+    Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+
+    // Get alive nodes
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Should have one alive node", 1, aliveNodes.size());
+
+    // Verify node info
+    AmsServerInfo nodeInfo = aliveNodes.get(0);
+    Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+    Assert.assertEquals(
+        "Thrift port should match", Integer.valueOf(1261), 
nodeInfo.getThriftBindPort());
+    Assert.assertEquals(
+        "HTTP port should match", Integer.valueOf(1630), 
nodeInfo.getRestBindPort());
+  }
+
+  @Test
+  public void testGetAliveNodesWithMultipleNodes() throws Exception {
+    // Test that getAliveNodes returns all registered nodes
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+    haContainer = createContainerWithMockZk();
+
+    // Register first node
+    haContainer.registerAndElect();
+
+    // Verify first node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> childrenAfterFirst = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("First node should be registered", 1, 
childrenAfterFirst.size());
+
+    // Register second node manually in mock state
+    // Use createNode with sequential path to get the correct sequence number
+    AmsServerInfo nodeInfo2 = new AmsServerInfo();
+    nodeInfo2.setHost("127.0.0.2");
+    nodeInfo2.setThriftBindPort(1262);
+    nodeInfo2.setRestBindPort(1631);
+    String nodeInfo2Json = JacksonUtil.toJSONString(nodeInfo2);
+    // Use sequential path ending with "-" to let createNode generate the 
sequence number
+    // This ensures the second node gets the correct sequence number 
(0000000001)
+    mockZkState.createNode(nodesPath + "/node-", 
nodeInfo2Json.getBytes(StandardCharsets.UTF_8));
+
+    // Get alive nodes
+    List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+    Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+    Assert.assertEquals("Should have two alive nodes", 2, aliveNodes.size());
+  }
+
+  @Test
+  public void testCloseUnregistersNode() throws Exception {
+    // Test that close() unregisters the node
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = createContainerWithMockZk();
+
+    // Register node
+    haContainer.registerAndElect();
+
+    // Verify node was registered
+    String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+    List<String> children = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("One node should be registered", 1, children.size());
+
+    // Close container
+    haContainer.close();
+    haContainer = null;
+
+    // Verify node was unregistered
+    List<String> childrenAfterClose = mockZkState.getChildren(nodesPath);
+    Assert.assertEquals("No nodes should be registered after close", 0, 
childrenAfterClose.size());
+  }
+
+  @Test
+  public void testHasLeadership() throws Exception {
+    // Test hasLeadership() method
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    mockLeaderLatch = createMockLeaderLatch(false); // Not leader initially
+    haContainer = createContainerWithMockZk();
+
+    // Initially should not be leader
+    Assert.assertFalse("Should not be leader initially", 
haContainer.hasLeadership());
+
+    // Change to leader
+    mockLeaderLatch = createMockLeaderLatch(true);
+    haContainer = createContainerWithMockZk();
+
+    // Should be leader now
+    Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+  }
+
+  @Test
+  public void testRegistAndElectWithoutHAEnabled() throws Exception {
+    // Test that registAndElect skips when HA is not enabled
+    serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+    serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+    haContainer = new ZkHighAvailabilityContainer(serviceConfig);
+
+    // Should not throw exception
+    haContainer.registerAndElect();
+  }
+
+  /** Create HighAvailabilityContainer with mocked ZK components using 
reflection. */
+  private HighAvailabilityContainer createContainerWithMockZk() throws 
Exception {
+    // Create container without ZK connection to avoid any connection attempts
+    HighAvailabilityContainer container = createContainerWithoutZk();
+
+    // Inject mock ZK client and leader latch
+    java.lang.reflect.Field zkClientField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+    zkClientField.setAccessible(true);
+    zkClientField.set(container, mockZkClient);
+
+    java.lang.reflect.Field leaderLatchField =
+        ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+    leaderLatchField.setAccessible(true);
+    leaderLatchField.set(container, mockLeaderLatch);
+
+    // Note: We don't need to create the paths themselves as nodes in ZK
+    // ZK paths are logical containers, not actual nodes
+    // The createPathIfNeeded() calls will be handled by the mock when needed
+
+    return container;
+  }
+
+  /**
+   * Create a HighAvailabilityContainer without initializing ZK connection. 
This is used when we
+   * want to completely avoid ZK connection attempts.
+   */
+  private HighAvailabilityContainer createContainerWithoutZk() throws 
Exception {
+    // Use reflection to create ZkHighAvailabilityContainer without calling 
constructor
+    java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+        
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+    // Create a minimal config that disables HA to avoid ZK connection
+    Configurations tempConfig = new Configurations(serviceConfig);
+    tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+    HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+    // Now set all required fields using reflection
+    java.lang.reflect.Field isMasterSlaveModeField =
+        
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+    isMasterSlaveModeField.setAccessible(true);
+    isMasterSlaveModeField.set(
+        container, 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+    if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+      String haClusterName = 
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+      java.lang.reflect.Field tableServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+      tableServiceMasterPathField.setAccessible(true);
+      tableServiceMasterPathField.set(
+          container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field optimizingServiceMasterPathField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+      optimizingServiceMasterPathField.setAccessible(true);
+      optimizingServiceMasterPathField.set(
+          container, 
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+      java.lang.reflect.Field nodesPathField =
+          ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+      nodesPathField.setAccessible(true);
+      nodesPathField.set(container, 
AmsHAProperties.getNodesPath(haClusterName));
+
+      java.lang.reflect.Field tableServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+      tableServiceServerInfoField.setAccessible(true);
+      AmsServerInfo tableServiceServerInfo =
+          buildServerInfo(
+              serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+              serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+      java.lang.reflect.Field optimizingServiceServerInfoField =
+          
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+      optimizingServiceServerInfoField.setAccessible(true);
+      AmsServerInfo optimizingServiceServerInfo =
+          buildServerInfo(
+              serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+              
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+              serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+      optimizingServiceServerInfoField.set(container, 
optimizingServiceServerInfo);
+    }
+
+    return container;
+  }
+
+  /** Helper method to build AmsServerInfo (copied from 
HighAvailabilityContainer). */
+  private AmsServerInfo buildServerInfo(String host, Integer thriftPort, 
Integer httpPort) {
+    AmsServerInfo serverInfo = new AmsServerInfo();
+    serverInfo.setHost(host);
+    serverInfo.setThriftBindPort(thriftPort);
+    serverInfo.setRestBindPort(httpPort);
+    return serverInfo;
+  }
+
+  /** Create a mock CuratorFramework that uses MockZkState for storage. */
+  @SuppressWarnings("unchecked")
+  private CuratorFramework createMockZkClient() throws Exception {
+    CuratorFramework mockClient = mock(CuratorFramework.class);
+
+    // Mock getChildren() - create a chain of mocks
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+        getChildrenBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .GetChildrenBuilder.class);
+    when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+    when(getChildrenBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getChildren(path);
+            });
+
+    // Mock getData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+        getDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+                    .class);
+    when(mockClient.getData()).thenReturn(getDataBuilder);
+    when(getDataBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getData(path);
+            });
+
+    // Mock create() - manually create the entire fluent API chain to ensure 
consistency
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
 createBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+    @SuppressWarnings("unchecked")
+    org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                .ProtectACLCreateModeStatPathAndBytesable<
+            String>
+        pathAndBytesable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .ProtectACLCreateModeStatPathAndBytesable.class);
+
+    when(mockClient.create()).thenReturn(createBuilder);
+
+    // Mock the chain: creatingParentsIfNeeded() -> withMode() -> forPath()
+    // Use the same mock object for the entire chain
+    when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+    
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+    // Mock forPath(path, data) - used by registAndElect()
+    when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              return mockZkState.createNode(path, data);
+            });
+
+    // Mock forPath(path) - used by createPathIfNeeded()
+    // Note: createPathIfNeeded() creates paths without data, but we still 
need to store them
+    // so that getChildren() can work correctly
+    when(pathAndBytesable.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              // Create the path as an empty node (this simulates ZK path 
creation)
+              // In real ZK, paths are logical containers, but we need to 
store them
+              // to make getChildren() work correctly
+              if (mockZkState.exists(path) == null) {
+                mockZkState.createNode(path, new byte[0]);
+              }
+              return null;
+            });
+
+    // Mock delete()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
 deleteBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+    when(mockClient.delete()).thenReturn(deleteBuilder);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNode(path);
+              return null;
+            })
+        .when(deleteBuilder)
+        .forPath(anyString());
+
+    // Mock checkExists()
+    @SuppressWarnings("unchecked")
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+        checkExistsBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+                    .class);
+    when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+    when(checkExistsBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.exists(path);
+            });
+
+    // Mock start() and close()
+    doAnswer(invocation -> null).when(mockClient).start();
+    doAnswer(invocation -> null).when(mockClient).close();
+
+    return mockClient;
+  }
+
+  /** Create a mock LeaderLatch. */
+  private LeaderLatch createMockLeaderLatch() throws Exception {
+    return createMockLeaderLatch(true);
+  }
+
+  /** Create a mock LeaderLatch with specified leadership status. */
+  private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws 
Exception {
+    LeaderLatch mockLatch = mock(LeaderLatch.class);
+    when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+    doAnswer(invocation -> null).when(mockLatch).addListener(any());
+    doAnswer(invocation -> null).when(mockLatch).start();
+    doAnswer(invocation -> null).when(mockLatch).close();
+    // Mock await() - it throws IOException and InterruptedException
+    doAnswer(
+            invocation -> {
+              // Mock implementation - doesn't actually wait
+              return null;
+            })
+        .when(mockLatch)
+        .await();
+    return mockLatch;
+  }
+
+  /** In-memory ZK state simulator. */
+  private static class MockZkState {
+    private final Map<String, byte[]> nodes = new HashMap<>();
+    private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+    public List<String> getChildren(String path) throws KeeperException {
+      List<String> children = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        // Only include direct children (not the path itself, and not nested 
paths)
+        if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+          String relativePath = nodePath.substring(prefix.length());
+          // Only add direct children (no additional slashes)
+          // This means the path should be exactly: prefix + relativePath
+          if (!relativePath.contains("/")) {
+            children.add(relativePath);
+          }
+        }
+      }
+      // Sort to ensure consistent ordering
+      children.sort(String::compareTo);
+      return children;
+    }
+
+    public byte[] getData(String path) throws KeeperException {
+      byte[] data = nodes.get(path);
+      if (data == null) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      return data;
+    }
+
+    public String createNode(String path, byte[] data) {
+      // Handle sequential nodes
+      if (path.endsWith("-")) {
+        int seq = sequenceCounter.incrementAndGet();
+        path = path + String.format("%010d", seq);
+      }
+      nodes.put(path, data);
+      return path;
+    }
+
+    public void deleteNode(String path) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.remove(path);
+    }
+
+    public Stat exists(String path) {
+      return nodes.containsKey(path) ? new Stat() : null;
+    }
+
+    public void clear() {
+      nodes.clear();
+      sequenceCounter.set(0);
+    }
+  }
+}
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java
new file mode 100644
index 000000000..a331001ed
--- /dev/null
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for ZkBucketAssignStore using mocked ZK to avoid connection issues. */
+public class TestZkBucketAssignStore {
+
+  private CuratorFramework mockZkClient;
+  private ZkBucketAssignStore assignStore;
+  private AmsServerInfo node1;
+  private AmsServerInfo node2;
+  private MockZkState mockZkState;
+
+  @Before
+  public void setUp() throws Exception {
+    mockZkState = new MockZkState();
+    mockZkClient = createMockZkClient();
+
+    assignStore = new ZkBucketAssignStore(mockZkClient, "test-cluster");
+
+    node1 = new AmsServerInfo();
+    node1.setHost("127.0.0.1");
+    node1.setThriftBindPort(1260);
+    node1.setRestBindPort(1630);
+
+    node2 = new AmsServerInfo();
+    node2.setHost("127.0.0.2");
+    node2.setThriftBindPort(1261);
+    node2.setRestBindPort(1631);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (assignStore != null) {
+      try {
+        assignStore.removeAssignments(node1);
+        assignStore.removeAssignments(node2);
+      } catch (Exception e) {
+        // ignore
+      }
+    }
+    mockZkState.clear();
+  }
+
+  @Test
+  public void testSaveAndGetAssignments() throws Exception {
+    List<String> bucketIds = Arrays.asList("1", "2", "3", "4", "5");
+
+    // Save assignments
+    assignStore.saveAssignments(node1, bucketIds);
+
+    // Get assignments
+    List<String> retrieved = assignStore.getAssignments(node1);
+    Assert.assertEquals("Bucket IDs should match", bucketIds, retrieved);
+  }
+
+  @Test
+  public void testGetAssignmentsForNonExistentNode() throws Exception {
+    List<String> retrieved = assignStore.getAssignments(node1);
+    Assert.assertNotNull("Should return empty list", retrieved);
+    Assert.assertTrue("Should return empty list", retrieved.isEmpty());
+  }
+
+  @Test
+  public void testUpdateAssignments() throws Exception {
+    List<String> initialBuckets = Arrays.asList("1", "2", "3");
+    List<String> updatedBuckets = Arrays.asList("4", "5", "6", "7");
+
+    // Save initial assignments
+    assignStore.saveAssignments(node1, initialBuckets);
+    Assert.assertEquals(initialBuckets, assignStore.getAssignments(node1));
+
+    // Update assignments
+    assignStore.saveAssignments(node1, updatedBuckets);
+    Assert.assertEquals(updatedBuckets, assignStore.getAssignments(node1));
+  }
+
+  @Test
+  public void testRemoveAssignments() throws Exception {
+    List<String> bucketIds = Arrays.asList("1", "2", "3");
+
+    // Save assignments
+    assignStore.saveAssignments(node1, bucketIds);
+    Assert.assertFalse("Should have assignments", 
assignStore.getAssignments(node1).isEmpty());
+
+    // Remove assignments
+    assignStore.removeAssignments(node1);
+    Assert.assertTrue("Should be empty after removal", 
assignStore.getAssignments(node1).isEmpty());
+  }
+
+  @Test
+  public void testGetAllAssignments() throws Exception {
+    List<String> buckets1 = Arrays.asList("1", "2", "3");
+    List<String> buckets2 = Arrays.asList("4", "5", "6");
+
+    // Save assignments for multiple nodes
+    assignStore.saveAssignments(node1, buckets1);
+    assignStore.saveAssignments(node2, buckets2);
+
+    // Get all assignments
+    Map<AmsServerInfo, List<String>> allAssignments = 
assignStore.getAllAssignments();
+    Assert.assertEquals("Should have 2 nodes", 2, allAssignments.size());
+
+    // Find nodes by host and port since parseNodeKey doesn't set restBindPort
+    List<String> foundBuckets1 = null;
+    List<String> foundBuckets2 = null;
+    for (Map.Entry<AmsServerInfo, List<String>> entry : 
allAssignments.entrySet()) {
+      AmsServerInfo node = entry.getKey();
+      if (node1.getHost().equals(node.getHost())
+          && node1.getThriftBindPort().equals(node.getThriftBindPort())) {
+        foundBuckets1 = entry.getValue();
+      } else if (node2.getHost().equals(node.getHost())
+          && node2.getThriftBindPort().equals(node.getThriftBindPort())) {
+        foundBuckets2 = entry.getValue();
+      }
+    }
+    Assert.assertEquals(buckets1, foundBuckets1);
+    Assert.assertEquals(buckets2, foundBuckets2);
+  }
+
+  @Test
+  public void testGetAllAssignmentsEmpty() throws Exception {
+    Map<AmsServerInfo, List<String>> allAssignments = 
assignStore.getAllAssignments();
+    Assert.assertNotNull("Should return empty map", allAssignments);
+    Assert.assertTrue("Should be empty", allAssignments.isEmpty());
+  }
+
+  @Test
+  public void testLastUpdateTime() throws Exception {
+    List<String> bucketIds = Arrays.asList("1", "2", "3");
+
+    // Initially no update time
+    long initialTime = assignStore.getLastUpdateTime(node1);
+    Assert.assertEquals("Should be 0 initially", 0, initialTime);
+
+    // Save assignments (should update time)
+    long beforeSave = System.currentTimeMillis();
+    assignStore.saveAssignments(node1, bucketIds);
+    long afterSave = System.currentTimeMillis();
+
+    long updateTime = assignStore.getLastUpdateTime(node1);
+    Assert.assertTrue(
+        "Update time should be between before and after",
+        updateTime >= beforeSave && updateTime <= afterSave);
+
+    // Manually update time
+    Thread.sleep(10);
+    assignStore.updateLastUpdateTime(node1);
+    long newUpdateTime = assignStore.getLastUpdateTime(node1);
+    Assert.assertTrue("New update time should be later", newUpdateTime > 
updateTime);
+  }
+
+  @Test
+  public void testEmptyBucketList() throws Exception {
+    List<String> emptyList = new ArrayList<>();
+    assignStore.saveAssignments(node1, emptyList);
+    List<String> retrieved = assignStore.getAssignments(node1);
+    Assert.assertNotNull("Should return empty list", retrieved);
+    Assert.assertTrue("Should be empty", retrieved.isEmpty());
+  }
+
+  @Test
+  public void testMultipleNodesWithSameHostDifferentPort() throws Exception {
+    AmsServerInfo node3 = new AmsServerInfo();
+    node3.setHost("127.0.0.1");
+    node3.setThriftBindPort(1262);
+    node3.setRestBindPort(1632);
+
+    List<String> buckets1 = Arrays.asList("1", "2");
+    List<String> buckets3 = Arrays.asList("3", "4");
+
+    assignStore.saveAssignments(node1, buckets1);
+    assignStore.saveAssignments(node3, buckets3);
+
+    Assert.assertEquals(buckets1, assignStore.getAssignments(node1));
+    Assert.assertEquals(buckets3, assignStore.getAssignments(node3));
+
+    Map<AmsServerInfo, List<String>> allAssignments = 
assignStore.getAllAssignments();
+    Assert.assertEquals("Should have 2 nodes", 2, allAssignments.size());
+  }
+
+  /** Create a mock CuratorFramework that uses MockZkState for storage. */
+  @SuppressWarnings("unchecked")
+  private CuratorFramework createMockZkClient() throws Exception {
+    CuratorFramework mockClient = mock(CuratorFramework.class);
+
+    // Mock getChildren()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+        getChildrenBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .GetChildrenBuilder.class);
+    when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+    when(getChildrenBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getChildren(path);
+            });
+
+    // Mock getData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+        getDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+                    .class);
+    when(mockClient.getData()).thenReturn(getDataBuilder);
+    when(getDataBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.getData(path);
+            });
+
+    // Mock create() - manually create the entire fluent API chain
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
 createBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+    @SuppressWarnings("unchecked")
+    org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                .ProtectACLCreateModeStatPathAndBytesable<
+            String>
+        pathAndBytesable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+                    .ProtectACLCreateModeStatPathAndBytesable.class);
+
+    when(mockClient.create()).thenReturn(createBuilder);
+    when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+    
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+    // Mock forPath(path, data) - used by saveAssignments()
+    when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              return mockZkState.createNode(path, data);
+            });
+
+    // Mock forPath(path) - used by createPathIfNeeded()
+    when(pathAndBytesable.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              if (mockZkState.exists(path) == null) {
+                mockZkState.createNode(path, new byte[0]);
+              }
+              return null;
+            });
+
+    // Mock setData()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+        setDataBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+                    .class);
+    when(mockClient.setData()).thenReturn(setDataBuilder);
+    when(setDataBuilder.forPath(anyString(), any(byte[].class)))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              byte[] data = invocation.getArgument(1);
+              mockZkState.setData(path, data);
+              return null;
+            });
+
+    // Mock delete()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
 deleteBuilder =
+        mock(
+            
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+    when(mockClient.delete()).thenReturn(deleteBuilder);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNode(path);
+              return null;
+            })
+        .when(deleteBuilder)
+        .forPath(anyString());
+
+    // Mock deletingChildrenIfNeeded()
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+        childrenDeletable =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+                    .class);
+    
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable);
+    doAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              mockZkState.deleteNodeRecursive(path);
+              return null;
+            })
+        .when(childrenDeletable)
+        .forPath(anyString());
+
+    // Mock checkExists()
+    @SuppressWarnings("unchecked")
+    
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+        checkExistsBuilder =
+            mock(
+                
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+                    .class);
+    when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+    when(checkExistsBuilder.forPath(anyString()))
+        .thenAnswer(
+            invocation -> {
+              String path = invocation.getArgument(0);
+              return mockZkState.exists(path);
+            });
+
+    // Mock start() and close()
+    doAnswer(invocation -> null).when(mockClient).start();
+    doAnswer(invocation -> null).when(mockClient).close();
+
+    return mockClient;
+  }
+
+  /** Create a mock LeaderLatch with specified leadership status. */
+  private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws 
Exception {
+    LeaderLatch mockLatch = mock(LeaderLatch.class);
+    when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+    doAnswer(invocation -> null).when(mockLatch).addListener(any());
+    doAnswer(invocation -> null).when(mockLatch).start();
+    doAnswer(invocation -> null).when(mockLatch).close();
+    doAnswer(
+            invocation -> {
+              // Mock implementation - doesn't actually wait
+              return null;
+            })
+        .when(mockLatch)
+        .await();
+    return mockLatch;
+  }
+
+  /** In-memory ZK state simulator. */
+  private static class MockZkState {
+    private final Map<String, byte[]> nodes = new HashMap<>();
+    private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+    public List<String> getChildren(String path) throws KeeperException {
+      List<String> children = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+          String relativePath = nodePath.substring(prefix.length());
+          if (!relativePath.contains("/")) {
+            children.add(relativePath);
+          }
+        }
+      }
+      children.sort(String::compareTo);
+      return children;
+    }
+
+    public byte[] getData(String path) throws KeeperException {
+      byte[] data = nodes.get(path);
+      if (data == null) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      return data;
+    }
+
+    public void setData(String path, byte[] data) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.put(path, data);
+    }
+
+    public String createNode(String path, byte[] data) {
+      // Handle sequential nodes
+      if (path.endsWith("-")) {
+        int seq = sequenceCounter.incrementAndGet();
+        path = path + String.format("%010d", seq);
+      }
+      // Create parent paths if they don't exist (simulating 
creatingParentsIfNeeded)
+      createParentPaths(path);
+      nodes.put(path, data);
+      return path;
+    }
+
+    private void createParentPaths(String path) {
+      // Create all parent paths as empty nodes
+      // Handle absolute paths (starting with "/")
+      boolean isAbsolute = path.startsWith("/");
+      String[] parts = path.split("/");
+      StringBuilder currentPath = new StringBuilder();
+      if (isAbsolute) {
+        currentPath.append("/");
+      }
+      for (int i = 0; i < parts.length - 1; i++) {
+        if (parts[i].isEmpty()) {
+          continue; // Skip empty parts from split
+        }
+        if (currentPath.length() > 0 && !currentPath.toString().endsWith("/")) 
{
+          currentPath.append("/");
+        }
+        currentPath.append(parts[i]);
+        String parentPath = currentPath.toString();
+        // Only create if it doesn't exist
+        if (!nodes.containsKey(parentPath)) {
+          nodes.put(parentPath, new byte[0]);
+        }
+      }
+    }
+
+    public void deleteNode(String path) throws KeeperException {
+      if (!nodes.containsKey(path)) {
+        throw new KeeperException.NoNodeException(path);
+      }
+      nodes.remove(path);
+    }
+
+    public void deleteNodeRecursive(String path) throws KeeperException {
+      // Delete the node and all its children
+      List<String> toDelete = new ArrayList<>();
+      String prefix = path.endsWith("/") ? path : path + "/";
+      for (String nodePath : nodes.keySet()) {
+        if (nodePath.equals(path) || nodePath.startsWith(prefix)) {
+          toDelete.add(nodePath);
+        }
+      }
+      for (String nodePath : toDelete) {
+        nodes.remove(nodePath);
+      }
+    }
+
+    public Stat exists(String path) {
+      return nodes.containsKey(path) ? new Stat() : null;
+    }
+
+    public void clear() {
+      nodes.clear();
+      sequenceCounter.set(0);
+    }
+  }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java 
b/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
index 76a60f4a4..df91badb7 100644
--- a/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
+++ b/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
@@ -36,4 +36,6 @@ public final class ErrorCodes {
   public static final int PLUGIN_RETRY_AUTH_ERROR_CODE = 2006;
 
   public static final int BLOCKER_CONFLICT_ERROR_CODE = 3001;
+
+  public static final int BUCKET_ASSIGN_STORE_ERROR_CODE = 4000;
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
 
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
index 555c238c1..fd30c9de0 100644
--- 
a/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
@@ -40,6 +40,7 @@ public class AmoroRuntimeException extends RuntimeException {
 
   static {
     CODE_MAP.put(PersistenceException.class, 
ErrorCodes.PERSISTENCE_ERROR_CODE);
+    CODE_MAP.put(BucketAssignStoreException.class, 
ErrorCodes.BUCKET_ASSIGN_STORE_ERROR_CODE);
     CODE_MAP.put(ObjectNotExistsException.class, 
ErrorCodes.OBJECT_NOT_EXISTS_ERROR_CODE);
     CODE_MAP.put(AlreadyExistsException.class, 
ErrorCodes.ALREADY_EXISTS_ERROR_CODE);
     CODE_MAP.put(IllegalMetadataException.class, 
ErrorCodes.ILLEGAL_METADATA_ERROR_CODE);
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
 
b/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
new file mode 100644
index 000000000..ff02963cc
--- /dev/null
+++ 
b/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.exception;
+
+/**
+ * Exception thrown when bucket assignment store operations fail (e.g. save, 
get, remove
+ * assignments).
+ */
+public class BucketAssignStoreException extends AmoroRuntimeException {
+
+  public BucketAssignStoreException(String message) {
+    super(message);
+  }
+
+  public BucketAssignStoreException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public BucketAssignStoreException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java 
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
index 08b9ef04a..5e4f0a8d8 100644
--- 
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
+++ 
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
@@ -26,6 +26,7 @@ public class AmsHAProperties {
   private static final String TABLE_SERVICE_MASTER_PATH = "/master";
   private static final String OPTIMIZING_SERVICE_MASTER_PATH = 
"/optimizing-service-master";
   private static final String NODES_PATH = "/nodes";
+  private static final String BUCKET_ASSIGNMENTS_PATH = "/bucket-assignments";
   private static final String NAMESPACE_DEFAULT = "default";
 
   private static String getBasePath(String namespace) {
@@ -50,4 +51,8 @@ public class AmsHAProperties {
   public static String getNodesPath(String namespace) {
     return getBasePath(namespace) + NODES_PATH;
   }
+
+  public static String getBucketAssignmentsPath(String namespace) {
+    return getBasePath(namespace) + BUCKET_ASSIGNMENTS_PATH;
+  }
 }
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 0f5f272da..da6cf20eb 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -70,11 +70,14 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | expire-snapshots.enabled | true | Enable snapshots expiring. |
 | expire-snapshots.interval | 1 h | Interval for expiring snapshots. |
 | expire-snapshots.thread-count | 10 | The number of threads used for 
snapshots expiring. |
+| ha.bucket-assign.interval | 1 min | Interval for bucket assignment service 
to detect node changes and redistribute bucket IDs. |
+| ha.bucket-id.total-count | 100 | Total count of bucket IDs for assignment. 
Bucket IDs range from 1 to this value. |
 | ha.cluster-name | default | Amoro management service cluster name. |
 | ha.connection-timeout | 5 min | The Zookeeper connection timeout in 
milliseconds. |
 | ha.enabled | false | Whether to enable high availability mode. |
 | ha.heartbeat-interval | 10 s | HA heartbeat interval. |
 | ha.lease-ttl | 30 s | TTL of HA lease. |
+| ha.node-offline.timeout | 5 min | Timeout duration to determine if a node is 
offline. After this duration, the node's bucket IDs will be reassigned. |
 | ha.session-timeout | 30 s | The Zookeeper session timeout in milliseconds. |
 | ha.type | zk | High availability implementation type: zk or database. |
 | ha.zookeeper-address |  | The Zookeeper address used for high availability. |

Reply via email to