This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 25f19da9a [AMORO-3921] Add AmsAssignService and ZkBucketAssignStore to
implement balanced bucket allocation in master-slave mode (#3922)
25f19da9a is described below
commit 25f19da9a6cf6aa4acea787a2834c864ea57edb0
Author: can <[email protected]>
AuthorDate: Mon Mar 16 14:19:47 2026 +0800
[AMORO-3921] Add AmsAssignService and ZkBucketAssignStore to implement
balanced bucket allocation in master-slave mode (#3922)
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Add a registration function for table allocation in
master-slave mode. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: Replace zk with mocking. #3919
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: Revised based on CR's comments. #3921
* [Subtask]: Revised based on CR's comments. #3921
* [Subtask]: Revised based on CR's comments. #3921
---------
Co-authored-by: wardli <[email protected]>
---
.../apache/amoro/server/AmoroManagementConf.java | 21 +
.../apache/amoro/server/AmoroServiceContainer.java | 20 +
.../org/apache/amoro/server/AmsAssignService.java | 515 ++++++++++++
.../org/apache/amoro/server/BucketAssignStore.java | 84 ++
.../amoro/server/BucketAssignStoreFactory.java | 75 ++
.../apache/amoro/server/ZkBucketAssignStore.java | 245 ++++++
.../server/ha/ZkHighAvailabilityContainer.java | 28 +-
.../apache/amoro/server/TestAmsAssignService.java | 894 +++++++++++++++++++++
.../server/TestHighAvailabilityContainer.java | 563 +++++++++++++
.../amoro/server/TestZkBucketAssignStore.java | 480 +++++++++++
.../src/main/java/org/apache/amoro/ErrorCodes.java | 2 +
.../amoro/exception/AmoroRuntimeException.java | 1 +
.../exception/BucketAssignStoreException.java | 38 +
.../apache/amoro/properties/AmsHAProperties.java | 5 +
docs/configuration/ams-config.md | 3 +
15 files changed, 2964 insertions(+), 10 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index e50e18a90..a3c5599be 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -286,6 +286,27 @@ public class AmoroManagementConf {
.defaultValue(java.time.Duration.ofSeconds(30))
.withDescription("TTL of HA lease.");
+ public static final ConfigOption<Integer> HA_BUCKET_ID_TOTAL_COUNT =
+ ConfigOptions.key("ha.bucket-id.total-count")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "Total count of bucket IDs for assignment. Bucket IDs range from
1 to this value.");
+
+ public static final ConfigOption<Duration> HA_NODE_OFFLINE_TIMEOUT =
+ ConfigOptions.key("ha.node-offline.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(5))
+ .withDescription(
+ "Timeout duration to determine if a node is offline. After this
duration, the node's bucket IDs will be reassigned.");
+
+ public static final ConfigOption<Duration> HA_ASSIGN_INTERVAL =
+ ConfigOptions.key("ha.bucket-assign.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "Interval for bucket assignment service to detect node changes
and redistribute bucket IDs.");
+
public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.table-service.bind-port")
.intType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 25de6a7af..0b268f7bf 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -126,6 +126,7 @@ public class AmoroServiceContainer {
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
private HAState haState = HAState.INITIALIZING;
+ private AmsAssignService amsAssignService;
public AmoroServiceContainer() throws Exception {
initConfig();
@@ -244,6 +245,20 @@ public class AmoroServiceContainer {
DefaultTableRuntimeFactory defaultRuntimeFactory = new
DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
+ // In master-slave mode, create AmsAssignService for bucket assignment
+ if (IS_MASTER_SLAVE_MODE && haContainer != null) {
+ try {
+ // Create and start AmsAssignService for bucket assignment
+ // The factory will handle different HA types (ZK, database, etc.)
+ amsAssignService = new AmsAssignService(haContainer, serviceConfig);
+ amsAssignService.start();
+ LOG.info("AmsAssignService started for master-slave mode");
+ } catch (UnsupportedOperationException e) {
+ LOG.info("Skip AmsAssignService: {}", e.getMessage());
+ } catch (Exception e) {
+ LOG.error("Failed to start AmsAssignService", e);
+ }
+ }
List<ActionCoordinator> actionCoordinators =
defaultRuntimeFactory.supportedCoordinators();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
@@ -293,6 +308,11 @@ public class AmoroServiceContainer {
LOG.info("Stopping optimizing server[serving:{}] ...",
optimizingServiceServer.isServing());
optimizingServiceServer.stop();
}
+ if (amsAssignService != null) {
+ LOG.info("Stopping AmsAssignService...");
+ amsAssignService.stop();
+ amsAssignService = null;
+ }
if (tableService != null) {
LOG.info("Stopping table service...");
tableService.dispose();
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
new file mode 100644
index 000000000..a09445b54
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
+import
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode.
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AmsAssignService.class);
+
+ private final ScheduledExecutorService assignScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("ams-assign-scheduler-%d")
+ .setDaemon(true)
+ .build());
+
+ private final HighAvailabilityContainer haContainer;
+ private final BucketAssignStore assignStore;
+ private final Configurations serviceConfig;
+ private final int bucketIdTotalCount;
+ private final long nodeOfflineTimeoutMs;
+ private final long assignIntervalSeconds;
+ private volatile boolean running = false;
+
+ boolean isRunning() {
+ return running;
+ }
+
+ public AmsAssignService(HighAvailabilityContainer haContainer,
Configurations serviceConfig) {
+ this.haContainer = haContainer;
+ this.serviceConfig = serviceConfig;
+ this.bucketIdTotalCount =
+ serviceConfig.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+ this.nodeOfflineTimeoutMs =
+
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
+ this.assignIntervalSeconds =
+ serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
+ this.assignStore = BucketAssignStoreFactory.create(haContainer,
serviceConfig);
+ }
+
+ /**
+ * Start the assignment service. Only works in master-slave mode and when
current node is leader.
+ */
+ public void start() {
+ if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
+ LOG.info("Master-slave mode is not enabled, skip starting bucket
assignment service");
+ return;
+ }
+ if (running) {
+ LOG.warn("Bucket assignment service is already running");
+ return;
+ }
+ running = true;
+ assignScheduler.scheduleWithFixedDelay(
+ this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS);
+ LOG.info("Bucket assignment service started with interval: {} seconds",
assignIntervalSeconds);
+ }
+
+ /** Stop the assignment service. */
+ public void stop() {
+ if (!running) {
+ return;
+ }
+ running = false;
+ assignScheduler.shutdown();
+ try {
+ if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+ assignScheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ assignScheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("Bucket assignment service stopped");
+ }
+
+ @VisibleForTesting
+ public void doAssign() {
+ try {
+ if (!haContainer.hasLeadership()) {
+ LOG.debug("Current node is not leader, skip bucket assignment");
+ return;
+ }
+
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ if (aliveNodes.isEmpty()) {
+ LOG.debug("No alive nodes found, skip bucket assignment");
+ return;
+ }
+
+ Map<AmsServerInfo, List<String>> currentAssignments =
assignStore.getAllAssignments();
+ Map<String, AmsServerInfo> aliveNodeMap = buildAliveNodeMap(aliveNodes);
+ NormalizedAssignments normalized =
+ normalizeCurrentAssignments(currentAssignments, aliveNodeMap);
+ NodeChangeResult change =
+ detectNodeChanges(aliveNodes, currentAssignments, aliveNodeMap,
normalized.assignedNodes);
+
+ if (!change.needReassign()) {
+ refreshLastUpdateTime(aliveNodes);
+ return;
+ }
+
+ LOG.info(
+ "Detected node changes - New nodes: {}, Offline nodes: {},
Performing incremental reassignment...",
+ change.newNodes.size(),
+ change.offlineNodes.size());
+
+ List<String> bucketsToRedistribute =
+ handleOfflineNodes(change.offlineNodes, currentAssignments);
+ List<String> allBuckets = generateBucketIds();
+ Map<AmsServerInfo, List<String>> newAssignments =
+ buildNewAssignments(aliveNodes, change.offlineNodes,
normalized.assignments);
+ rebalance(aliveNodes, change.newNodes, bucketsToRedistribute,
allBuckets, newAssignments);
+ persistAssignments(newAssignments);
+ } catch (Exception e) {
+ LOG.error("Error during bucket assignment", e);
+ }
+ }
+
+ /**
+ * Redistribute buckets incrementally to alive nodes using round-robin. This
minimizes bucket
+ * migration by only redistributing buckets from offline nodes.
+ *
+ * @param aliveNodes List of alive nodes
+ * @param bucketsToRedistribute Buckets to redistribute (from offline nodes)
+ * @param currentAssignments Current assignments map (will be modified)
+ */
+ private void redistributeBucketsIncrementally(
+ List<AmsServerInfo> aliveNodes,
+ List<String> bucketsToRedistribute,
+ Map<AmsServerInfo, List<String>> currentAssignments) {
+ if (aliveNodes.isEmpty() || bucketsToRedistribute.isEmpty()) {
+ return;
+ }
+
+ // Distribute buckets using round-robin to minimize migration
+ int nodeIndex = 0;
+ for (String bucketId : bucketsToRedistribute) {
+ AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size());
+ currentAssignments.get(node).add(bucketId);
+ nodeIndex++;
+ }
+ }
+
+ /**
+ * Balance buckets for new nodes by taking buckets from existing nodes. This
minimizes migration
+ * by only moving necessary buckets to new nodes.
+ *
+ * @param aliveNodes All alive nodes
+ * @param newNodes Newly added nodes
+ * @param currentAssignments Current assignments map (will be modified)
+ * @param targetBucketsPerNode Target number of buckets per node
+ * @param remainder Remainder when dividing total buckets by node count
+ */
+ private void balanceBucketsForNewNodes(
+ List<AmsServerInfo> aliveNodes,
+ Set<AmsServerInfo> newNodes,
+ Map<AmsServerInfo, List<String>> currentAssignments,
+ int targetBucketsPerNode,
+ int remainder) {
+ if (newNodes.isEmpty()) {
+ return;
+ }
+
+ // Calculate how many buckets each new node should get
+ int bucketsPerNewNode = targetBucketsPerNode;
+ int newNodeIndex = 0;
+ for (AmsServerInfo newNode : newNodes) {
+ // First 'remainder' nodes get one extra bucket
+ int targetForNewNode = bucketsPerNewNode + (newNodeIndex < remainder ? 1
: 0);
+ int currentCount = currentAssignments.get(newNode).size();
+ int needed = targetForNewNode - currentCount;
+
+ if (needed > 0) {
+ // Collect buckets from existing nodes (prefer nodes with more buckets)
+ List<String> bucketsToMove =
+ collectBucketsFromExistingNodes(aliveNodes, newNodes,
currentAssignments, needed);
+ currentAssignments.get(newNode).addAll(bucketsToMove);
+ LOG.info(
+ "Moved {} buckets to new node {} (target: {})",
+ bucketsToMove.size(),
+ newNode,
+ targetForNewNode);
+ }
+ newNodeIndex++;
+ }
+ }
+
+ /**
+ * Collect buckets from existing nodes to balance for new nodes. Prefer
taking from nodes that
+ * have more buckets than target.
+ *
+ * @param aliveNodes All alive nodes
+ * @param newNodes New nodes (excluded from source)
+ * @param currentAssignments Current assignments
+ * @param needed Number of buckets needed
+ * @return List of bucket IDs to move
+ */
+ private List<String> collectBucketsFromExistingNodes(
+ List<AmsServerInfo> aliveNodes,
+ Set<AmsServerInfo> newNodes,
+ Map<AmsServerInfo, List<String>> currentAssignments,
+ int needed) {
+ List<String> bucketsToMove = new ArrayList<>();
+ List<AmsServerInfo> existingNodes = new ArrayList<>();
+ for (AmsServerInfo node : aliveNodes) {
+ if (!newNodes.contains(node)) {
+ existingNodes.add(node);
+ }
+ }
+
+ if (existingNodes.isEmpty()) {
+ return bucketsToMove;
+ }
+
+ // Sort existing nodes by current bucket count (descending)
+ // This ensures we take from nodes with more buckets first
+ existingNodes.sort(
+ (n1, n2) -> {
+ int count1 = currentAssignments.get(n1).size();
+ int count2 = currentAssignments.get(n2).size();
+ return Integer.compare(count2, count1);
+ });
+
+ // Collect buckets from existing nodes using round-robin
+ int nodeIndex = 0;
+ int collected = 0;
+ while (collected < needed && !existingNodes.isEmpty()) {
+ AmsServerInfo sourceNode = existingNodes.get(nodeIndex %
existingNodes.size());
+ List<String> sourceBuckets = currentAssignments.get(sourceNode);
+ if (!sourceBuckets.isEmpty()) {
+ // Take one bucket from this node
+ String bucketToMove = sourceBuckets.remove(0);
+ bucketsToMove.add(bucketToMove);
+ collected++;
+ LOG.debug("Moving bucket {} from node {} to new node", bucketToMove,
sourceNode);
+ } else {
+ // This node has no more buckets, remove it from consideration
+ existingNodes.remove(sourceNode);
+ if (existingNodes.isEmpty()) {
+ break;
+ }
+ nodeIndex = nodeIndex % existingNodes.size();
+ continue;
+ }
+ nodeIndex++;
+ }
+
+ return bucketsToMove;
+ }
+
+ private List<String> generateBucketIds() {
+ List<String> bucketIds = new ArrayList<>();
+ for (int i = 1; i <= bucketIdTotalCount; i++) {
+ bucketIds.add(String.valueOf(i));
+ }
+ return bucketIds;
+ }
+
+ /**
+ * Get node key for matching nodes. Uses host:thriftBindPort format,
consistent with
+ * ZkBucketAssignStore.getNodeKey().
+ */
+ private String getNodeKey(AmsServerInfo nodeInfo) {
+ return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+ }
+
+ private Map<String, AmsServerInfo> buildAliveNodeMap(List<AmsServerInfo>
aliveNodes) {
+ Map<String, AmsServerInfo> map = new HashMap<>();
+ for (AmsServerInfo node : aliveNodes) {
+ map.put(getNodeKey(node), node);
+ }
+ return map;
+ }
+
+ private static class NormalizedAssignments {
+ final Map<AmsServerInfo, List<String>> assignments;
+ final Set<AmsServerInfo> assignedNodes;
+
+ NormalizedAssignments(
+ Map<AmsServerInfo, List<String>> assignments, Set<AmsServerInfo>
assignedNodes) {
+ this.assignments = assignments;
+ this.assignedNodes = assignedNodes;
+ }
+ }
+
+ private NormalizedAssignments normalizeCurrentAssignments(
+ Map<AmsServerInfo, List<String>> currentAssignments,
+ Map<String, AmsServerInfo> aliveNodeMap) {
+ Map<AmsServerInfo, List<String>> normalized = new HashMap<>();
+ Set<AmsServerInfo> assignedNodes = new HashSet<>();
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
currentAssignments.entrySet()) {
+ AmsServerInfo storedNode = entry.getKey();
+ String nodeKey = getNodeKey(storedNode);
+ AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey);
+ if (aliveNode != null) {
+ normalized.put(aliveNode, entry.getValue());
+ assignedNodes.add(aliveNode);
+ } else {
+ normalized.put(storedNode, entry.getValue());
+ assignedNodes.add(storedNode);
+ }
+ }
+ return new NormalizedAssignments(normalized, assignedNodes);
+ }
+
+ private static class NodeChangeResult {
+ final Set<AmsServerInfo> newNodes;
+ final Set<AmsServerInfo> offlineNodes;
+
+ NodeChangeResult(Set<AmsServerInfo> newNodes, Set<AmsServerInfo>
offlineNodes) {
+ this.newNodes = newNodes;
+ this.offlineNodes = offlineNodes;
+ }
+
+ boolean needReassign() {
+ return !newNodes.isEmpty() || !offlineNodes.isEmpty();
+ }
+ }
+
+ private NodeChangeResult detectNodeChanges(
+ List<AmsServerInfo> aliveNodes,
+ Map<AmsServerInfo, List<String>> currentAssignments,
+ Map<String, AmsServerInfo> aliveNodeMap,
+ Set<AmsServerInfo> currentAssignedNodes) {
+ Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
+ Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
+ newNodes.removeAll(currentAssignedNodes);
+
+ Set<AmsServerInfo> offlineNodes = new HashSet<>();
+ for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+ if (!aliveNodeMap.containsKey(getNodeKey(storedNode))) {
+ offlineNodes.add(storedNode);
+ }
+ }
+
+ long currentTime = System.currentTimeMillis();
+ Set<String> aliveNodeKeys = new HashSet<>();
+ for (AmsServerInfo node : aliveNodes) {
+ aliveNodeKeys.add(getNodeKey(node));
+ }
+ for (AmsServerInfo node : currentAssignedNodes) {
+ String nodeKey = getNodeKey(node);
+ if (aliveNodeKeys.contains(nodeKey)) {
+ try {
+ long lastUpdateTime = assignStore.getLastUpdateTime(node);
+ if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) >
nodeOfflineTimeoutMs) {
+ for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+ if (getNodeKey(storedNode).equals(nodeKey)) {
+ offlineNodes.add(storedNode);
+ break;
+ }
+ }
+ LOG.warn(
+ "Node {} is considered offline due to timeout. Last update:
{}",
+ node,
+ lastUpdateTime);
+ }
+ } catch (BucketAssignStoreException e) {
+ LOG.warn("Failed to get last update time for node {}, treating as
offline", node, e);
+ for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+ if (getNodeKey(storedNode).equals(nodeKey)) {
+ offlineNodes.add(storedNode);
+ break;
+ }
+ }
+ }
+ }
+ }
+ return new NodeChangeResult(newNodes, offlineNodes);
+ }
+
+ private List<String> handleOfflineNodes(
+ Set<AmsServerInfo> offlineNodes, Map<AmsServerInfo, List<String>>
currentAssignments) {
+ List<String> bucketsToRedistribute = new ArrayList<>();
+ for (AmsServerInfo offlineNode : offlineNodes) {
+ try {
+ List<String> offlineBuckets = currentAssignments.get(offlineNode);
+ if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
+ bucketsToRedistribute.addAll(offlineBuckets);
+ LOG.info(
+ "Collected {} buckets from offline node {} for redistribution",
+ offlineBuckets.size(),
+ offlineNode);
+ }
+ assignStore.removeAssignments(offlineNode);
+ } catch (BucketAssignStoreException e) {
+ LOG.warn("Failed to remove assignments for offline node {}",
offlineNode, e);
+ }
+ }
+ return bucketsToRedistribute;
+ }
+
+ private Map<AmsServerInfo, List<String>> buildNewAssignments(
+ List<AmsServerInfo> aliveNodes,
+ Set<AmsServerInfo> offlineNodes,
+ Map<AmsServerInfo, List<String>> normalizedAssignments) {
+ Map<AmsServerInfo, List<String>> newAssignments = new HashMap<>();
+ Set<String> offlineNodeKeys = new HashSet<>();
+ for (AmsServerInfo offlineNode : offlineNodes) {
+ offlineNodeKeys.add(getNodeKey(offlineNode));
+ }
+ for (AmsServerInfo node : aliveNodes) {
+ String nodeKey = getNodeKey(node);
+ if (!offlineNodeKeys.contains(nodeKey)) {
+ List<String> existing = normalizedAssignments.get(node);
+ if (existing != null && !existing.isEmpty()) {
+ newAssignments.put(node, new ArrayList<>(existing));
+ } else {
+ newAssignments.put(node, new ArrayList<>());
+ }
+ } else {
+ newAssignments.put(node, new ArrayList<>());
+ }
+ }
+ return newAssignments;
+ }
+
+ private void rebalance(
+ List<AmsServerInfo> aliveNodes,
+ Set<AmsServerInfo> newNodes,
+ List<String> bucketsToRedistribute,
+ List<String> allBuckets,
+ Map<AmsServerInfo, List<String>> newAssignments) {
+ if (!bucketsToRedistribute.isEmpty()) {
+ redistributeBucketsIncrementally(aliveNodes, bucketsToRedistribute,
newAssignments);
+ }
+
+ int totalBuckets = allBuckets.size();
+ int totalAliveNodes = aliveNodes.size();
+ int targetBucketsPerNode = totalBuckets / totalAliveNodes;
+ int remainder = totalBuckets % totalAliveNodes;
+ if (!newNodes.isEmpty()) {
+ balanceBucketsForNewNodes(
+ aliveNodes, newNodes, newAssignments, targetBucketsPerNode,
remainder);
+ }
+
+ Set<String> allAssignedBuckets = new HashSet<>();
+ for (List<String> buckets : newAssignments.values()) {
+ allAssignedBuckets.addAll(buckets);
+ }
+ List<String> unassignedBuckets = new ArrayList<>();
+ for (String bucket : allBuckets) {
+ if (!allAssignedBuckets.contains(bucket)) {
+ unassignedBuckets.add(bucket);
+ }
+ }
+ if (!unassignedBuckets.isEmpty()) {
+ redistributeBucketsIncrementally(aliveNodes, unassignedBuckets,
newAssignments);
+ }
+ }
+
+ private void persistAssignments(Map<AmsServerInfo, List<String>>
newAssignments) {
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
newAssignments.entrySet()) {
+ try {
+ assignStore.saveAssignments(entry.getKey(), entry.getValue());
+ LOG.info(
+ "Assigned {} buckets to node {}: {}",
+ entry.getValue().size(),
+ entry.getKey(),
+ entry.getValue());
+ } catch (BucketAssignStoreException e) {
+ LOG.error("Failed to save assignments for node {}", entry.getKey(), e);
+ }
+ }
+ }
+
+ private void refreshLastUpdateTime(List<AmsServerInfo> aliveNodes) {
+ for (AmsServerInfo node : aliveNodes) {
+ try {
+ assignStore.updateLastUpdateTime(node);
+ } catch (BucketAssignStoreException e) {
+ LOG.warn("Failed to update last update time for node {}", node, e);
+ }
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
new file mode 100644
index 000000000..8db61265b
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for storing and retrieving bucket ID assignments to AMS nodes.
Different
+ * implementations can use different storage backends (e.g., ZooKeeper,
database).
+ */
+public interface BucketAssignStore {
+
+ /**
+ * Save bucket ID assignments for a node.
+ *
+ * @param nodeInfo The node information
+ * @param bucketIds List of bucket IDs assigned to this node
+ * @throws BucketAssignStoreException If save operation fails
+ */
+ void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+ throws BucketAssignStoreException;
+
+ /**
+ * Get bucket ID assignments for a node.
+ *
+ * @param nodeInfo The node information
+ * @return List of bucket IDs assigned to this node, empty list if not found
+ * @throws BucketAssignStoreException If retrieval operation fails
+ */
+ List<String> getAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException;
+
+ /**
+ * Remove bucket ID assignments for a node.
+ *
+ * @param nodeInfo The node information
+ * @throws BucketAssignStoreException If removal operation fails
+ */
+ void removeAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException;
+
+ /**
+ * Get all bucket ID assignments for all nodes.
+ *
+ * @return Map of node info to list of bucket IDs
+ * @throws BucketAssignStoreException If retrieval operation fails
+ */
+ Map<AmsServerInfo, List<String>> getAllAssignments() throws
BucketAssignStoreException;
+
+ /**
+ * Get the last update time for a node's assignments.
+ *
+ * @param nodeInfo The node information
+ * @return Last update timestamp in milliseconds, 0 if not found
+ * @throws BucketAssignStoreException If retrieval operation fails
+ */
+ long getLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException;
+
+ /**
+ * Update the last update time for a node's assignments.
+ *
+ * @param nodeInfo The node information
+ * @throws BucketAssignStoreException If update operation fails
+ */
+ void updateLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException;
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
new file mode 100644
index 000000000..ec865db37
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for creating BucketAssignStore implementations based on HA
configuration.
+ *
+ * <p>Supports different storage backends (ZK, database) according to HA type.
+ */
+public final class BucketAssignStoreFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketAssignStoreFactory.class);
+
+ private BucketAssignStoreFactory() {}
+
+ /**
+ * Creates a BucketAssignStore based on the given HA configuration and
container.
+ *
+ * @param haContainer the HA container
+ * @param conf service configuration
+ * @return a BucketAssignStore implementation according to HA type
+ * @throws IllegalArgumentException if HA type is unsupported
+ * @throws RuntimeException if the ZK store cannot be created
+ */
+ public static BucketAssignStore create(
+ HighAvailabilityContainer haContainer, Configurations conf) {
+ String haType = conf.getString(AmoroManagementConf.HA_TYPE).toLowerCase();
+ String clusterName = conf.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+ switch (haType) {
+ case AmoroManagementConf.HA_TYPE_ZK:
+ if (haContainer instanceof ZkHighAvailabilityContainer) {
+ ZkHighAvailabilityContainer zkHaContainer =
(ZkHighAvailabilityContainer) haContainer;
+ CuratorFramework zkClient = zkHaContainer.getZkClient();
+ if (zkClient != null) {
+ LOG.info("Creating ZkBucketAssignStore for cluster: {}",
clusterName);
+ return new ZkBucketAssignStore(zkClient, clusterName);
+ }
+ }
+ throw new RuntimeException(
+ "Cannot create ZkBucketAssignStore: ZK client not available or
invalid container type");
+
+ case AmoroManagementConf.HA_TYPE_DATABASE:
+ LOG.info("Creating DataBaseBucketAssignStore for cluster: {}",
clusterName);
+ // TODO: Implement DataBaseBucketAssignStore when ready
+ throw new UnsupportedOperationException("DataBaseBucketAssignStore is
not yet implemented");
+
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported ha.type: " + haType + ", only 'zk' or 'database' are
allowed");
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
new file mode 100644
index 000000000..9ece14aa4
--- /dev/null
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.properties.AmsHAProperties;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import
org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.utils.JacksonUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ZooKeeper-based implementation of BucketAssignStore. Stores bucket ID
assignments in ZooKeeper
+ * with the following structure:
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/assignments -
+ * bucket IDs
/{namespace}/amoro/ams/bucket-assignments/{nodeKey}/last-update-time - timestamp
+ */
+public class ZkBucketAssignStore implements BucketAssignStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ZkBucketAssignStore.class);
+ private static final String ASSIGNMENTS_SUFFIX = "/assignments";
+ private static final String LAST_UPDATE_TIME_SUFFIX = "/last-update-time";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final TypeReference<List<String>> LIST_STRING_TYPE =
+ new TypeReference<List<String>>() {};
+
+ private final CuratorFramework zkClient;
+ private final String assignmentsBasePath;
+
+ public ZkBucketAssignStore(CuratorFramework zkClient, String clusterName) {
+ this.zkClient = zkClient;
+ this.assignmentsBasePath =
AmsHAProperties.getBucketAssignmentsPath(clusterName);
+ try {
+ createPathIfNeeded(assignmentsBasePath);
+ } catch (Exception e) {
+ LOG.error("Failed to create bucket assignments path", e);
+ throw new RuntimeException("Failed to initialize ZkBucketAssignStore",
e);
+ }
+ }
+
+ @Override
+ public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+ throws BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String assignmentsPath = assignmentsBasePath + "/" + nodeKey +
ASSIGNMENTS_SUFFIX;
+ String assignmentsJson = JacksonUtil.toJSONString(bucketIds);
+ try {
+ if (zkClient.checkExists().forPath(assignmentsPath) != null) {
+ zkClient
+ .setData()
+ .forPath(assignmentsPath,
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+ } else {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(assignmentsPath,
assignmentsJson.getBytes(StandardCharsets.UTF_8));
+ }
+ updateLastUpdateTime(nodeInfo);
+ LOG.debug("Saved bucket assignments for node {}: {}", nodeKey,
bucketIds);
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to save bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to save bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public List<String> getAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String assignmentsPath = assignmentsBasePath + "/" + nodeKey +
ASSIGNMENTS_SUFFIX;
+ try {
+ if (zkClient.checkExists().forPath(assignmentsPath) == null) {
+ return new ArrayList<>();
+ }
+ byte[] data = zkClient.getData().forPath(assignmentsPath);
+ if (data == null || data.length == 0) {
+ return new ArrayList<>();
+ }
+ String assignmentsJson = new String(data, StandardCharsets.UTF_8);
+ return OBJECT_MAPPER.readValue(assignmentsJson, LIST_STRING_TYPE);
+ } catch (KeeperException.NoNodeException e) {
+ return new ArrayList<>();
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to get bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to get bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public void removeAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String nodePath = assignmentsBasePath + "/" + nodeKey;
+ try {
+ if (zkClient.checkExists().forPath(nodePath) != null) {
+ zkClient.delete().deletingChildrenIfNeeded().forPath(nodePath);
+ LOG.debug("Removed bucket assignments for node {}", nodeKey);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Already deleted, ignore
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to remove bucket assignments for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to remove bucket assignments for node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public Map<AmsServerInfo, List<String>> getAllAssignments() throws
BucketAssignStoreException {
+ Map<AmsServerInfo, List<String>> allAssignments = new HashMap<>();
+ try {
+ if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
+ return allAssignments;
+ }
+ List<String> nodeKeys =
zkClient.getChildren().forPath(assignmentsBasePath);
+ for (String nodeKey : nodeKeys) {
+ try {
+ AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
+ List<String> bucketIds = getAssignments(nodeInfo);
+ if (!bucketIds.isEmpty()) {
+ allAssignments.put(nodeInfo, bucketIds);
+ }
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.warn("Failed to parse node key or get assignments: {}", nodeKey,
e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // Path doesn't exist, return empty map
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to get all bucket assignments", e);
+ throw new BucketAssignStoreException("Failed to get all bucket
assignments", e);
+ }
+ return allAssignments;
+ }
+
+ @Override
+ public long getLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String timePath = assignmentsBasePath + "/" + nodeKey +
LAST_UPDATE_TIME_SUFFIX;
+ try {
+ if (zkClient.checkExists().forPath(timePath) == null) {
+ return 0;
+ }
+ byte[] data = zkClient.getData().forPath(timePath);
+ if (data == null || data.length == 0) {
+ return 0;
+ }
+ return Long.parseLong(new String(data, StandardCharsets.UTF_8));
+ } catch (KeeperException.NoNodeException e) {
+ return 0;
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to get last update time for node {}", nodeKey, e);
+ throw new BucketAssignStoreException("Failed to get last update time for
node " + nodeKey, e);
+ }
+ }
+
+ @Override
+ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ String timePath = assignmentsBasePath + "/" + nodeKey +
LAST_UPDATE_TIME_SUFFIX;
+ long currentTime = System.currentTimeMillis();
+ String timeStr = String.valueOf(currentTime);
+ try {
+ if (zkClient.checkExists().forPath(timePath) != null) {
+ zkClient.setData().forPath(timePath,
timeStr.getBytes(StandardCharsets.UTF_8));
+ } else {
+ zkClient
+ .create()
+ .creatingParentsIfNeeded()
+ .withMode(CreateMode.PERSISTENT)
+ .forPath(timePath, timeStr.getBytes(StandardCharsets.UTF_8));
+ }
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to update last update time for node {}", nodeKey, e);
+ throw new BucketAssignStoreException(
+ "Failed to update last update time for node " + nodeKey, e);
+ }
+ }
+
+ private String getNodeKey(AmsServerInfo nodeInfo) {
+ return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+ }
+
+ private AmsServerInfo parseNodeKey(String nodeKey) {
+ String[] parts = nodeKey.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid node key format: " +
nodeKey);
+ }
+ AmsServerInfo nodeInfo = new AmsServerInfo();
+ nodeInfo.setHost(parts[0]);
+ nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+ return nodeInfo;
+ }
+
+ private void createPathIfNeeded(String path) throws
BucketAssignStoreException {
+ try {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
+ } catch (KeeperException.NodeExistsException e) {
+ // ignore
+ } catch (Exception e) {
+ throw new BucketAssignStoreException("Failed to create path: " + path,
e);
+ }
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 7c070f3b6..0aa7b6e35 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -57,16 +57,6 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
private final LeaderLatch leaderLatch;
private final CuratorFramework zkClient;
-
- // Package-private accessors for testing
- CuratorFramework getZkClient() {
- return zkClient;
- }
-
- LeaderLatch getLeaderLatch() {
- return leaderLatch;
- }
-
private final String tableServiceMasterPath;
private final String optimizingServiceMasterPath;
private final String nodesPath;
@@ -295,6 +285,24 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
return leaderLatch.hasLeadership();
}
+ /**
+ * Get the current node's table service server info.
+ *
+ * @return The current node's server info, null if HA is not enabled
+ */
+ public AmsServerInfo getTableServiceServerInfo() {
+ return tableServiceServerInfo;
+ }
+
+ /**
+ * Get the ZooKeeper client. This is used for creating BucketAssignStore.
+ *
+ * @return The ZooKeeper client, null if HA is not enabled
+ */
+ public CuratorFramework getZkClient() {
+ return zkClient;
+ }
+
private void createPathIfNeeded(String path) throws Exception {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
new file mode 100644
index 000000000..57f678db8
--- /dev/null
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.exception.BucketAssignStoreException;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for AmsAssignService using mocked ZK to avoid connection issues. */
+public class TestAmsAssignService {
+
+ private Configurations serviceConfig;
+ private HighAvailabilityContainer haContainer;
+ private AmsAssignService assignService;
+ private AmsServerInfo node1;
+ private AmsServerInfo node2;
+ private AmsServerInfo node3;
+ private MockZkState mockZkState;
+ private CuratorFramework mockZkClient;
+ private LeaderLatch mockLeaderLatch;
+ private MockBucketAssignStore mockAssignStore;
+
+ @Before
+ public void setUp() throws Exception {
+ mockZkState = new MockZkState();
+ mockZkClient = createMockZkClient();
+ mockLeaderLatch = createMockLeaderLatch(true); // Is leader by default
+ mockAssignStore = new MockBucketAssignStore();
+
+ serviceConfig = new Configurations();
+ serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST,
"127.0.0.1");
+
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT,
1260);
+
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
1261);
+ serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+ serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+ serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS,
"127.0.0.1:2181");
+ serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME,
"test-cluster");
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ serviceConfig.setInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT,
100);
+ serviceConfig.set(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT,
java.time.Duration.ofMinutes(5));
+
+ haContainer = createContainerWithMockZk();
+
+ // Create AmsAssignService with mock assign store
+ assignService = createAssignServiceWithMockStore();
+
+ node1 = new AmsServerInfo();
+ node1.setHost("127.0.0.1");
+ node1.setThriftBindPort(1260);
+ node1.setRestBindPort(1630);
+
+ node2 = new AmsServerInfo();
+ node2.setHost("127.0.0.2");
+ node2.setThriftBindPort(1262);
+ node2.setRestBindPort(1632);
+
+ node3 = new AmsServerInfo();
+ node3.setHost("127.0.0.3");
+ node3.setThriftBindPort(1263);
+ node3.setRestBindPort(1633);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (assignService != null) {
+ assignService.stop();
+ }
+ if (haContainer != null) {
+ haContainer.close();
+ }
+ mockZkState.clear();
+ }
+
+ @Test
+ public void testInitialAssignment() throws Exception {
+ // Register nodes
+ haContainer.registerAndElect();
+
+ // Create second node
+ Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer haContainer2 =
createContainerWithMockZk(config2);
+ haContainer2.registerAndElect();
+
+ try {
+ // Wait a bit for registration
+ Thread.sleep(100);
+
+ // Trigger assignment manually
+ assignService.doAssign();
+
+ // Check assignments
+ Map<AmsServerInfo, List<String>> assignments =
mockAssignStore.getAllAssignments();
+ Assert.assertEquals("Should have assignments for 2 nodes", 2,
assignments.size());
+
+ // Verify buckets are distributed
+ int totalAssigned = 0;
+ for (List<String> buckets : assignments.values()) {
+ totalAssigned += buckets.size();
+ Assert.assertTrue("Each node should have buckets", !buckets.isEmpty());
+ }
+ Assert.assertEquals("All buckets should be assigned", 100,
totalAssigned);
+
+ // Verify balance (difference should be at most 1)
+ List<Integer> bucketCounts = new ArrayList<>();
+ for (List<String> buckets : assignments.values()) {
+ bucketCounts.add(buckets.size());
+ }
+ int max =
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+ int min =
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+ Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+ } finally {
+ haContainer2.close();
+ }
+ }
+
+ @Test
+ public void testNodeOfflineReassignment() throws Exception {
+ // Setup: 2 nodes with initial assignment
+ haContainer.registerAndElect();
+ Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer haContainer2 =
createContainerWithMockZk(config2);
+ haContainer2.registerAndElect();
+
+ try {
+ Thread.sleep(100);
+
+ // Initial assignment
+ assignService.doAssign();
+ Map<AmsServerInfo, List<String>> initialAssignments =
mockAssignStore.getAllAssignments();
+ Assert.assertEquals("Should have 2 nodes", 2, initialAssignments.size());
+
+ // Verify initial assignment is balanced
+ List<Integer> initialCounts = new ArrayList<>();
+ for (List<String> buckets : initialAssignments.values()) {
+ initialCounts.add(buckets.size());
+ }
+ int maxInitial =
initialCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+ int minInitial =
initialCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+ Assert.assertTrue("Initial assignment should be balanced", maxInitial -
minInitial <= 1);
+
+ // Simulate node2 going offline by removing it from mock state
+ mockZkState.deleteNodeByHost("127.0.0.2");
+ Thread.sleep(100);
+
+ // Trigger reassignment
+ assignService.doAssign();
+
+ // Check that node2's buckets are redistributed
+ Map<AmsServerInfo, List<String>> newAssignments =
mockAssignStore.getAllAssignments();
+ Assert.assertEquals("Should have 1 node after offline", 1,
newAssignments.size());
+
+ // The only remaining node (node1) should have all buckets. ZK stores
+ // optimizingServiceServerInfo (thrift port 1261), not table port
(1260), so we
+ // take the single entry instead of matching by node1's thriftBindPort.
+ List<String> remainingBuckets =
newAssignments.values().iterator().next();
+ Assert.assertNotNull("Node1 should have assignments", remainingBuckets);
+ Assert.assertEquals("Node1 should have all buckets", 100,
remainingBuckets.size());
+ } finally {
+ try {
+ haContainer2.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+
+ @Test
+ public void testNewNodeIncrementalAssignment() throws Exception {
+ // Setup: 1 node initially
+ haContainer.registerAndElect();
+ Thread.sleep(100);
+
+ // Initial assignment - all buckets to node1
+ assignService.doAssign();
+ Map<AmsServerInfo, List<String>> initialAssignments =
mockAssignStore.getAllAssignments();
+ // ZK stores optimizing port (1261), not table port (1260); match by host
only (single node).
+ List<String> node1InitialBuckets = findBucketsByHost(initialAssignments,
node1.getHost());
+ Assert.assertNotNull("Node1 should have assignments", node1InitialBuckets);
+ Assert.assertEquals("Node1 should have all buckets initially", 100,
node1InitialBuckets.size());
+
+ // Add new node
+ Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer haContainer2 =
createContainerWithMockZk(config2);
+ haContainer2.registerAndElect();
+
+ try {
+ Thread.sleep(100);
+
+ // Trigger reassignment
+ assignService.doAssign();
+
+ // Check assignments
+ Map<AmsServerInfo, List<String>> newAssignments =
mockAssignStore.getAllAssignments();
+ Assert.assertEquals("Should have 2 nodes", 2, newAssignments.size());
+
+ // Verify incremental assignment - node1 should keep most of its buckets.
+ // ZK stores optimizing port, not table port; match by host.
+ List<String> node1NewBuckets = findBucketsByHost(newAssignments,
node1.getHost());
+ Assert.assertNotNull("Node1 should still have assignments",
node1NewBuckets);
+
+ // Node1 should have kept most buckets (incremental assignment)
+ Assert.assertTrue("Node1 should keep some buckets",
node1NewBuckets.size() > 0);
+
+ // Verify balance
+ List<Integer> bucketCounts = new ArrayList<>();
+ for (List<String> buckets : newAssignments.values()) {
+ bucketCounts.add(buckets.size());
+ }
+ int max =
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+ int min =
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+ Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+
+ // Verify total
+ int total = bucketCounts.stream().mapToInt(Integer::intValue).sum();
+ Assert.assertEquals("Total buckets should be 100", 100, total);
+ } finally {
+ haContainer2.close();
+ }
+ }
+
+ @Test
+ public void testBalanceAfterNodeChanges() throws Exception {
+ // Setup: 3 nodes
+ haContainer.registerAndElect();
+ Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer haContainer2 =
createContainerWithMockZk(config2);
+ haContainer2.registerAndElect();
+ Configurations config3 = createNodeConfig("127.0.0.3", 1263, 1633);
+ HighAvailabilityContainer haContainer3 =
createContainerWithMockZk(config3);
+ haContainer3.registerAndElect();
+
+ try {
+ Thread.sleep(200);
+
+ // Initial assignment
+ assignService.doAssign();
+
+ // Verify balance
+ Map<AmsServerInfo, List<String>> assignments =
mockAssignStore.getAllAssignments();
+ Assert.assertEquals("Should have 3 nodes", 3, assignments.size());
+
+ List<Integer> bucketCounts = new ArrayList<>();
+ for (List<String> buckets : assignments.values()) {
+ bucketCounts.add(buckets.size());
+ }
+ int max =
bucketCounts.stream().mapToInt(Integer::intValue).max().orElse(0);
+ int min =
bucketCounts.stream().mapToInt(Integer::intValue).min().orElse(0);
+ Assert.assertTrue("Difference should be at most 1", max - min <= 1);
+
+ // Verify all buckets are assigned
+ int total = bucketCounts.stream().mapToInt(Integer::intValue).sum();
+ Assert.assertEquals("All buckets should be assigned", 100, total);
+ } finally {
+ haContainer2.close();
+ haContainer3.close();
+ }
+ }
+
+ @Test
+ public void testIncrementalAssignmentMinimizesMigration() throws Exception {
+ // Setup: 2 nodes initially
+ haContainer.registerAndElect();
+ Configurations config2 = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer haContainer2 =
createContainerWithMockZk(config2);
+ haContainer2.registerAndElect();
+ HighAvailabilityContainer haContainer3 = null;
+
+ try {
+ Thread.sleep(100);
+
+ // Initial assignment
+ assignService.doAssign();
+ Map<AmsServerInfo, List<String>> initialAssignments =
mockAssignStore.getAllAssignments();
+
+ // Record initial assignments
+ Set<String> node1InitialBuckets = new HashSet<>();
+ Set<String> node2InitialBuckets = new HashSet<>();
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
initialAssignments.entrySet()) {
+ if (entry.getKey().getHost().equals("127.0.0.1")) {
+ node1InitialBuckets.addAll(entry.getValue());
+ } else {
+ node2InitialBuckets.addAll(entry.getValue());
+ }
+ }
+
+ // Add new node
+ Configurations config3 = createNodeConfig("127.0.0.3", 1263, 1633);
+ haContainer3 = createContainerWithMockZk(config3);
+ haContainer3.registerAndElect();
+
+ Thread.sleep(100);
+
+ // Trigger reassignment
+ assignService.doAssign();
+
+ // Check new assignments
+ Map<AmsServerInfo, List<String>> newAssignments =
mockAssignStore.getAllAssignments();
+
+ // Calculate migration: buckets that moved from node1 or node2
+ Set<String> node1NewBuckets = new HashSet<>();
+ Set<String> node2NewBuckets = new HashSet<>();
+ Set<String> node3Buckets = new HashSet<>();
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
newAssignments.entrySet()) {
+ if (entry.getKey().getHost().equals("127.0.0.1")) {
+ node1NewBuckets.addAll(entry.getValue());
+ } else if (entry.getKey().getHost().equals("127.0.0.2")) {
+ node2NewBuckets.addAll(entry.getValue());
+ } else {
+ node3Buckets.addAll(entry.getValue());
+ }
+ }
+
+ // Node1 and Node2 should keep most of their buckets
+ Set<String> node1Kept = new HashSet<>(node1InitialBuckets);
+ node1Kept.retainAll(node1NewBuckets);
+ Set<String> node2Kept = new HashSet<>(node2InitialBuckets);
+ node2Kept.retainAll(node2NewBuckets);
+
+ // Verify incremental assignment: nodes should keep most buckets
+ Assert.assertTrue(
+ "Node1 should keep most buckets (incremental)",
+ node1Kept.size() > node1InitialBuckets.size() / 2);
+ Assert.assertTrue(
+ "Node2 should keep most buckets (incremental)",
+ node2Kept.size() > node2InitialBuckets.size() / 2);
+
+ // Node3 should get buckets from both
+ Assert.assertTrue("Node3 should have buckets", node3Buckets.size() > 0);
+ } finally {
+ haContainer2.close();
+ if (haContainer3 != null) {
+ try {
+ haContainer3.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testServiceStartStop() {
+ // Test that service can start and stop without errors
+ assignService.start();
+ Assert.assertTrue("Service should be running", assignService.isRunning());
+
+ assignService.stop();
+ Assert.assertFalse("Service should be stopped", assignService.isRunning());
+ }
+
+ @Test
+ public void testServiceSkipsWhenNotLeader() throws Exception {
+ // Create a non-leader container
+ mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+ Configurations nonLeaderConfig = createNodeConfig("127.0.0.2", 1262, 1632);
+ HighAvailabilityContainer nonLeaderContainer =
createContainerWithMockZk(nonLeaderConfig);
+ nonLeaderContainer.registerAndElect();
+
+ try {
+ // Wait a bit
+ Thread.sleep(100);
+
+ AmsAssignService nonLeaderService =
createAssignServiceWithMockStore(nonLeaderContainer);
+
+ // Should not throw exception even if not leader
+ nonLeaderService.doAssign();
+
+ // Should not have assignments if not leader
+ Map<AmsServerInfo, List<String>> assignments =
mockAssignStore.getAllAssignments();
+ // Verify that non-leader doesn't create assignments
+ Assert.assertTrue(
+ "Non-leader should not create assignments",
+ assignments.isEmpty() || assignments.size() == 0);
+ } finally {
+ nonLeaderContainer.close();
+ }
+ }
+
+ private Configurations createNodeConfig(String host, int thriftPort, int
httpPort) {
+ Configurations config = new Configurations();
+ config.setString(AmoroManagementConf.SERVER_EXPOSE_HOST, host);
+ config.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT,
thriftPort);
+ config.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
thriftPort + 1);
+ config.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, httpPort);
+ config.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+ config.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS,
"127.0.0.1:2181");
+ config.setString(AmoroManagementConf.HA_CLUSTER_NAME, "test-cluster");
+ config.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ config.setInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT, 100);
+ config.set(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT,
java.time.Duration.ofMinutes(5));
+ return config;
+ }
+
+ /**
+ * Find bucket list by node host (assignments use ZK node info with
optimizing port, not table
+ * port).
+ */
+ private static List<String> findBucketsByHost(
+ Map<AmsServerInfo, List<String>> assignments, String host) {
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
assignments.entrySet()) {
+ if (host.equals(entry.getKey().getHost())) {
+ return entry.getValue();
+ }
+ }
+ return null;
+ }
+
+ /** Create HighAvailabilityContainer with mocked ZK components using
reflection. */
+ private HighAvailabilityContainer createContainerWithMockZk() throws
Exception {
+ return createContainerWithMockZk(serviceConfig);
+ }
+
+ /** Create HighAvailabilityContainer with mocked ZK components using
reflection. */
+ private HighAvailabilityContainer createContainerWithMockZk(Configurations
config)
+ throws Exception {
+ // Create container without ZK connection to avoid any connection attempts
+ HighAvailabilityContainer container = createContainerWithoutZk(config);
+
+ // Inject mock ZK client and leader latch
+ java.lang.reflect.Field zkClientField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+ zkClientField.setAccessible(true);
+ zkClientField.set(container, mockZkClient);
+
+ java.lang.reflect.Field leaderLatchField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+ leaderLatchField.setAccessible(true);
+ leaderLatchField.set(container, mockLeaderLatch);
+
+ return container;
+ }
+
+ /** Create a HighAvailabilityContainer without initializing ZK connection. */
+ private HighAvailabilityContainer createContainerWithoutZk(Configurations
config)
+ throws Exception {
+ java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+ // Create a minimal config that disables HA to avoid ZK connection
+ Configurations tempConfig = new Configurations(config);
+ tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+ HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+ // Now set all required fields using reflection
+ java.lang.reflect.Field isMasterSlaveModeField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+ isMasterSlaveModeField.setAccessible(true);
+ isMasterSlaveModeField.set(
+ container,
config.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+ if (config.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+ String haClusterName =
config.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+ java.lang.reflect.Field tableServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+ tableServiceMasterPathField.setAccessible(true);
+ tableServiceMasterPathField.set(
+ container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field optimizingServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+ optimizingServiceMasterPathField.setAccessible(true);
+ optimizingServiceMasterPathField.set(
+ container,
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field nodesPathField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+ nodesPathField.setAccessible(true);
+ nodesPathField.set(container,
AmsHAProperties.getNodesPath(haClusterName));
+
+ java.lang.reflect.Field tableServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+ tableServiceServerInfoField.setAccessible(true);
+ AmsServerInfo tableServiceServerInfo =
+ buildServerInfo(
+ config.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
config.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+ config.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+ java.lang.reflect.Field optimizingServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+ optimizingServiceServerInfoField.setAccessible(true);
+ AmsServerInfo optimizingServiceServerInfo =
+ buildServerInfo(
+ config.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
config.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+ config.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ optimizingServiceServerInfoField.set(container,
optimizingServiceServerInfo);
+ }
+
+ return container;
+ }
+
+ /** Helper method to build AmsServerInfo. */
+ private AmsServerInfo buildServerInfo(String host, Integer thriftPort,
Integer httpPort) {
+ AmsServerInfo serverInfo = new AmsServerInfo();
+ serverInfo.setHost(host);
+ serverInfo.setThriftBindPort(thriftPort);
+ serverInfo.setRestBindPort(httpPort);
+ return serverInfo;
+ }
+
+ /** Create AmsAssignService with mock BucketAssignStore. */
+ private AmsAssignService createAssignServiceWithMockStore() throws Exception
{
+ return createAssignServiceWithMockStore(haContainer);
+ }
+
+ /** Create AmsAssignService with mock BucketAssignStore. */
+ private AmsAssignService
createAssignServiceWithMockStore(HighAvailabilityContainer container)
+ throws Exception {
+ AmsAssignService service = new AmsAssignService(container, serviceConfig);
+
+ // Use reflection to inject mock assign store
+ java.lang.reflect.Field assignStoreField =
+ AmsAssignService.class.getDeclaredField("assignStore");
+ assignStoreField.setAccessible(true);
+ assignStoreField.set(service, mockAssignStore);
+
+ return service;
+ }
+
+ /** Create a mock CuratorFramework that uses MockZkState for storage. */
+ @SuppressWarnings("unchecked")
+ private CuratorFramework createMockZkClient() throws Exception {
+ CuratorFramework mockClient = mock(CuratorFramework.class);
+
+ // Mock getChildren()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+ getChildrenBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .GetChildrenBuilder.class);
+ when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getChildren(path);
+ });
+
+ // Mock getData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ getDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ .class);
+ when(mockClient.getData()).thenReturn(getDataBuilder);
+ when(getDataBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getData(path);
+ });
+
+ // Mock create() - manually create the entire fluent API chain
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
createBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+ @SuppressWarnings("unchecked")
+ org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable<
+ String>
+ pathAndBytesable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable.class);
+
+ when(mockClient.create()).thenReturn(createBuilder);
+ when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+ // Mock forPath(path, data) - used by registAndElect() and
saveAssignments()
+ when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ return mockZkState.createNode(path, data);
+ });
+
+ // Mock forPath(path) - used by createPathIfNeeded()
+ when(pathAndBytesable.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ if (mockZkState.exists(path) == null) {
+ mockZkState.createNode(path, new byte[0]);
+ }
+ return null;
+ });
+
+ // Mock setData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+ setDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+ .class);
+ when(mockClient.setData()).thenReturn(setDataBuilder);
+ when(setDataBuilder.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ mockZkState.setData(path, data);
+ return null;
+ });
+
+ // Mock delete()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
deleteBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+ when(mockClient.delete()).thenReturn(deleteBuilder);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNode(path);
+ return null;
+ })
+ .when(deleteBuilder)
+ .forPath(anyString());
+
+ // Mock deletingChildrenIfNeeded()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+ childrenDeletable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+ .class);
+
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNodeRecursive(path);
+ return null;
+ })
+ .when(childrenDeletable)
+ .forPath(anyString());
+
+ // Mock checkExists()
+ @SuppressWarnings("unchecked")
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ checkExistsBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ .class);
+ when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+ when(checkExistsBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.exists(path);
+ });
+
+ // Mock start() and close()
+ doAnswer(invocation -> null).when(mockClient).start();
+ doAnswer(invocation -> null).when(mockClient).close();
+
+ return mockClient;
+ }
+
+ /** Create a mock LeaderLatch with specified leadership status. */
+ private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws
Exception {
+ LeaderLatch mockLatch = mock(LeaderLatch.class);
+ when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+ doAnswer(invocation -> null).when(mockLatch).addListener(any());
+ doAnswer(invocation -> null).when(mockLatch).start();
+ doAnswer(invocation -> null).when(mockLatch).close();
+ doAnswer(
+ invocation -> {
+ // Mock implementation - doesn't actually wait
+ return null;
+ })
+ .when(mockLatch)
+ .await();
+ return mockLatch;
+ }
+
+ /** In-memory ZK state simulator. */
+ private static class MockZkState {
+ private final Map<String, byte[]> nodes = new HashMap<>();
+ private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+ public List<String> getChildren(String path) throws KeeperException {
+ List<String> children = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+ String relativePath = nodePath.substring(prefix.length());
+ if (!relativePath.contains("/")) {
+ children.add(relativePath);
+ }
+ }
+ }
+ children.sort(String::compareTo);
+ return children;
+ }
+
+ public byte[] getData(String path) throws KeeperException {
+ byte[] data = nodes.get(path);
+ if (data == null) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ return data;
+ }
+
+ public void setData(String path, byte[] data) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.put(path, data);
+ }
+
+ public String createNode(String path, byte[] data) {
+ // Handle sequential nodes
+ if (path.endsWith("-")) {
+ int seq = sequenceCounter.incrementAndGet();
+ path = path + String.format("%010d", seq);
+ }
+ nodes.put(path, data);
+ return path;
+ }
+
+ public void deleteNode(String path) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.remove(path);
+ }
+
+ public void deleteNodeRecursive(String path) throws KeeperException {
+ // Delete the node and all its children
+ List<String> toDelete = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ if (nodePath.equals(path) || nodePath.startsWith(prefix)) {
+ toDelete.add(nodePath);
+ }
+ }
+ for (String nodePath : toDelete) {
+ nodes.remove(nodePath);
+ }
+ }
+
+ public void deleteNodeByHost(String host) {
+ // Delete all nodes that have this host in their data (JSON)
+ List<String> toDelete = new ArrayList<>();
+ for (Map.Entry<String, byte[]> entry : nodes.entrySet()) {
+ String nodePath = entry.getKey();
+ byte[] data = entry.getValue();
+ // Check if this is a node registration path (contains "/node-")
+ if (nodePath.contains("/node-") && data != null && data.length > 0) {
+ try {
+ String nodeInfoJson = new String(data,
java.nio.charset.StandardCharsets.UTF_8);
+ // Parse JSON to check host
+ AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson,
AmsServerInfo.class);
+ if (nodeInfo != null && host.equals(nodeInfo.getHost())) {
+ toDelete.add(nodePath);
+ }
+ } catch (Exception e) {
+ // Ignore parsing errors
+ }
+ }
+ }
+ for (String nodePath : toDelete) {
+ nodes.remove(nodePath);
+ }
+ }
+
+ public Stat exists(String path) {
+ return nodes.containsKey(path) ? new Stat() : null;
+ }
+
+ public void clear() {
+ nodes.clear();
+ sequenceCounter.set(0);
+ }
+ }
+
+ /** In-memory implementation of BucketAssignStore for testing. */
+ private static class MockBucketAssignStore implements BucketAssignStore {
+ private final Map<String, List<String>> assignments = new HashMap<>();
+ private final Map<String, Long> lastUpdateTimes = new HashMap<>();
+ // Store full AmsServerInfo for proper matching
+ private final Map<String, AmsServerInfo> nodeInfoMap = new HashMap<>();
+
+ private String getNodeKey(AmsServerInfo nodeInfo) {
+ return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
+ }
+
+ @Override
+ public void saveAssignments(AmsServerInfo nodeInfo, List<String> bucketIds)
+ throws BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ assignments.put(nodeKey, new ArrayList<>(bucketIds));
+ // Store full node info for proper matching
+ nodeInfoMap.put(nodeKey, nodeInfo);
+ updateLastUpdateTime(nodeInfo);
+ }
+
+ @Override
+ public List<String> getAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ return new ArrayList<>(assignments.getOrDefault(nodeKey, new
ArrayList<>()));
+ }
+
+ @Override
+ public void removeAssignments(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ assignments.remove(nodeKey);
+ lastUpdateTimes.remove(nodeKey);
+ nodeInfoMap.remove(nodeKey);
+ }
+
+ @Override
+ public Map<AmsServerInfo, List<String>> getAllAssignments() throws
BucketAssignStoreException {
+ Map<AmsServerInfo, List<String>> result = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry : assignments.entrySet()) {
+ String nodeKey = entry.getKey();
+ // Use stored full node info if available, otherwise parse from key
+ AmsServerInfo nodeInfo = nodeInfoMap.getOrDefault(nodeKey,
parseNodeKey(nodeKey));
+ result.put(nodeInfo, new ArrayList<>(entry.getValue()));
+ }
+ return result;
+ }
+
+ @Override
+ public long getLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ return lastUpdateTimes.getOrDefault(nodeKey, 0L);
+ }
+
+ @Override
+ public void updateLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
+ String nodeKey = getNodeKey(nodeInfo);
+ lastUpdateTimes.put(nodeKey, System.currentTimeMillis());
+ }
+
+ private AmsServerInfo parseNodeKey(String nodeKey) {
+ String[] parts = nodeKey.split(":");
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Invalid node key format: " +
nodeKey);
+ }
+ AmsServerInfo nodeInfo = new AmsServerInfo();
+ nodeInfo.setHost(parts[0]);
+ nodeInfo.setThriftBindPort(Integer.parseInt(parts[1]));
+ return nodeInfo;
+ }
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
new file mode 100644
index 000000000..19048896a
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestHighAvailabilityContainer.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.properties.AmsHAProperties;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import org.apache.amoro.server.ha.ZkHighAvailabilityContainer;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.apache.amoro.utils.JacksonUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for HighAvailabilityContainer using mocked ZK to avoid connection
issues. */
+public class TestHighAvailabilityContainer {
+
+ private Configurations serviceConfig;
+ private HighAvailabilityContainer haContainer;
+ private MockZkState mockZkState;
+ private CuratorFramework mockZkClient;
+ private LeaderLatch mockLeaderLatch;
+
+ @Before
+ public void setUp() throws Exception {
+ mockZkState = new MockZkState();
+ mockZkClient = createMockZkClient();
+ mockLeaderLatch = createMockLeaderLatch();
+
+ // Create test configuration
+ serviceConfig = new Configurations();
+ serviceConfig.setString(AmoroManagementConf.SERVER_EXPOSE_HOST,
"127.0.0.1");
+
serviceConfig.setInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT,
1260);
+
serviceConfig.setInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT,
1261);
+ serviceConfig.setInteger(AmoroManagementConf.HTTP_SERVER_PORT, 1630);
+ serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, true);
+ serviceConfig.setString(AmoroManagementConf.HA_ZOOKEEPER_ADDRESS,
"127.0.0.1:2181");
+ serviceConfig.setString(AmoroManagementConf.HA_CLUSTER_NAME,
"test-cluster");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (haContainer != null) {
+ haContainer.close();
+ }
+ mockZkState.clear();
+ }
+
+ @Test
+ public void testRegistAndElectWithoutMasterSlaveMode() throws Exception {
+ // Test that node registration is skipped when master-slave mode is
disabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+ haContainer = createContainerWithMockZk();
+
+ // Should not throw exception and should not register node
+ haContainer.registerAndElect();
+
+ // Verify no node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals(
+ "No nodes should be registered when master-slave mode is disabled", 0,
children.size());
+ }
+
+ @Test
+ public void testRegistAndElectWithMasterSlaveMode() throws Exception {
+ // Test that node registration works when master-slave mode is enabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("One node should be registered", 1, children.size());
+
+ // Verify node data
+ String nodePath = nodesPath + "/" + children.get(0);
+ byte[] data = mockZkState.getData(nodePath);
+ Assert.assertNotNull("Node data should not be null", data);
+ Assert.assertTrue("Node data should not be empty", data.length > 0);
+
+ // Verify node info
+ String nodeInfoJson = new String(data, StandardCharsets.UTF_8);
+ AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson,
AmsServerInfo.class);
+ Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+ Assert.assertEquals(
+ "Thrift port should match", Integer.valueOf(1261),
nodeInfo.getThriftBindPort());
+ }
+
+ @Test
+ public void testGetAliveNodesWithoutMasterSlaveMode() throws Exception {
+ // Test that getAliveNodes returns empty list when master-slave mode is
disabled
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, false);
+ haContainer = createContainerWithMockZk();
+
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals(
+ "Alive nodes list should be empty when master-slave mode is disabled",
+ 0,
+ aliveNodes.size());
+ }
+
+ @Test
+ public void testGetAliveNodesWhenNotLeader() throws Exception {
+ // Test that getAliveNodes returns empty list when not leader
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(false); // Not leader
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Since we're not the leader, should return empty list
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Alive nodes list should be empty when not leader", 0,
aliveNodes.size());
+ }
+
+ @Test
+ public void testGetAliveNodesAsLeader() throws Exception {
+ // Test that getAliveNodes returns nodes when leader
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify we are leader
+ Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+
+ // Get alive nodes
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Should have one alive node", 1, aliveNodes.size());
+
+ // Verify node info
+ AmsServerInfo nodeInfo = aliveNodes.get(0);
+ Assert.assertEquals("Host should match", "127.0.0.1", nodeInfo.getHost());
+ Assert.assertEquals(
+ "Thrift port should match", Integer.valueOf(1261),
nodeInfo.getThriftBindPort());
+ Assert.assertEquals(
+ "HTTP port should match", Integer.valueOf(1630),
nodeInfo.getRestBindPort());
+ }
+
+ @Test
+ public void testGetAliveNodesWithMultipleNodes() throws Exception {
+ // Test that getAliveNodes returns all registered nodes
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(true); // Is leader
+ haContainer = createContainerWithMockZk();
+
+ // Register first node
+ haContainer.registerAndElect();
+
+ // Verify first node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> childrenAfterFirst = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("First node should be registered", 1,
childrenAfterFirst.size());
+
+ // Register second node manually in mock state
+ // Use createNode with sequential path to get the correct sequence number
+ AmsServerInfo nodeInfo2 = new AmsServerInfo();
+ nodeInfo2.setHost("127.0.0.2");
+ nodeInfo2.setThriftBindPort(1262);
+ nodeInfo2.setRestBindPort(1631);
+ String nodeInfo2Json = JacksonUtil.toJSONString(nodeInfo2);
+ // Use sequential path ending with "-" to let createNode generate the
sequence number
+ // This ensures the second node gets the correct sequence number
(0000000001)
+ mockZkState.createNode(nodesPath + "/node-",
nodeInfo2Json.getBytes(StandardCharsets.UTF_8));
+
+ // Get alive nodes
+ List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+ Assert.assertNotNull("Alive nodes list should not be null", aliveNodes);
+ Assert.assertEquals("Should have two alive nodes", 2, aliveNodes.size());
+ }
+
+ @Test
+ public void testCloseUnregistersNode() throws Exception {
+ // Test that close() unregisters the node
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = createContainerWithMockZk();
+
+ // Register node
+ haContainer.registerAndElect();
+
+ // Verify node was registered
+ String nodesPath = AmsHAProperties.getNodesPath("test-cluster");
+ List<String> children = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("One node should be registered", 1, children.size());
+
+ // Close container
+ haContainer.close();
+ haContainer = null;
+
+ // Verify node was unregistered
+ List<String> childrenAfterClose = mockZkState.getChildren(nodesPath);
+ Assert.assertEquals("No nodes should be registered after close", 0,
childrenAfterClose.size());
+ }
+
+ @Test
+ public void testHasLeadership() throws Exception {
+ // Test hasLeadership() method
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ mockLeaderLatch = createMockLeaderLatch(false); // Not leader initially
+ haContainer = createContainerWithMockZk();
+
+ // Initially should not be leader
+ Assert.assertFalse("Should not be leader initially",
haContainer.hasLeadership());
+
+ // Change to leader
+ mockLeaderLatch = createMockLeaderLatch(true);
+ haContainer = createContainerWithMockZk();
+
+ // Should be leader now
+ Assert.assertTrue("Should be leader", haContainer.hasLeadership());
+ }
+
+ @Test
+ public void testRegistAndElectWithoutHAEnabled() throws Exception {
+ // Test that registAndElect skips when HA is not enabled
+ serviceConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+ serviceConfig.setBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE, true);
+ haContainer = new ZkHighAvailabilityContainer(serviceConfig);
+
+ // Should not throw exception
+ haContainer.registerAndElect();
+ }
+
+ /** Create HighAvailabilityContainer with mocked ZK components using
reflection. */
+ private HighAvailabilityContainer createContainerWithMockZk() throws
Exception {
+ // Create container without ZK connection to avoid any connection attempts
+ HighAvailabilityContainer container = createContainerWithoutZk();
+
+ // Inject mock ZK client and leader latch
+ java.lang.reflect.Field zkClientField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("zkClient");
+ zkClientField.setAccessible(true);
+ zkClientField.set(container, mockZkClient);
+
+ java.lang.reflect.Field leaderLatchField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("leaderLatch");
+ leaderLatchField.setAccessible(true);
+ leaderLatchField.set(container, mockLeaderLatch);
+
+ // Note: We don't need to create the paths themselves as nodes in ZK
+ // ZK paths are logical containers, not actual nodes
+ // The createPathIfNeeded() calls will be handled by the mock when needed
+
+ return container;
+ }
+
+ /**
+ * Create a HighAvailabilityContainer without initializing ZK connection.
This is used when we
+ * want to completely avoid ZK connection attempts.
+ */
+ private HighAvailabilityContainer createContainerWithoutZk() throws
Exception {
+ // Use reflection to create ZkHighAvailabilityContainer without calling
constructor
+ java.lang.reflect.Constructor<ZkHighAvailabilityContainer> constructor =
+
ZkHighAvailabilityContainer.class.getDeclaredConstructor(Configurations.class);
+
+ // Create a minimal config that disables HA to avoid ZK connection
+ Configurations tempConfig = new Configurations(serviceConfig);
+ tempConfig.setBoolean(AmoroManagementConf.HA_ENABLE, false);
+
+ HighAvailabilityContainer container = constructor.newInstance(tempConfig);
+
+ // Now set all required fields using reflection
+ java.lang.reflect.Field isMasterSlaveModeField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("isMasterSlaveMode");
+ isMasterSlaveModeField.setAccessible(true);
+ isMasterSlaveModeField.set(
+ container,
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE));
+
+ if (serviceConfig.getBoolean(AmoroManagementConf.HA_ENABLE)) {
+ String haClusterName =
serviceConfig.getString(AmoroManagementConf.HA_CLUSTER_NAME);
+
+ java.lang.reflect.Field tableServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceMasterPath");
+ tableServiceMasterPathField.setAccessible(true);
+ tableServiceMasterPathField.set(
+ container, AmsHAProperties.getTableServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field optimizingServiceMasterPathField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceMasterPath");
+ optimizingServiceMasterPathField.setAccessible(true);
+ optimizingServiceMasterPathField.set(
+ container,
AmsHAProperties.getOptimizingServiceMasterPath(haClusterName));
+
+ java.lang.reflect.Field nodesPathField =
+ ZkHighAvailabilityContainer.class.getDeclaredField("nodesPath");
+ nodesPathField.setAccessible(true);
+ nodesPathField.set(container,
AmsHAProperties.getNodesPath(haClusterName));
+
+ java.lang.reflect.Field tableServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("tableServiceServerInfo");
+ tableServiceServerInfoField.setAccessible(true);
+ AmsServerInfo tableServiceServerInfo =
+ buildServerInfo(
+ serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
serviceConfig.getInteger(AmoroManagementConf.TABLE_SERVICE_THRIFT_BIND_PORT),
+ serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ tableServiceServerInfoField.set(container, tableServiceServerInfo);
+
+ java.lang.reflect.Field optimizingServiceServerInfoField =
+
ZkHighAvailabilityContainer.class.getDeclaredField("optimizingServiceServerInfo");
+ optimizingServiceServerInfoField.setAccessible(true);
+ AmsServerInfo optimizingServiceServerInfo =
+ buildServerInfo(
+ serviceConfig.getString(AmoroManagementConf.SERVER_EXPOSE_HOST),
+
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZING_SERVICE_THRIFT_BIND_PORT),
+ serviceConfig.getInteger(AmoroManagementConf.HTTP_SERVER_PORT));
+ optimizingServiceServerInfoField.set(container,
optimizingServiceServerInfo);
+ }
+
+ return container;
+ }
+
+ /** Helper method to build AmsServerInfo (copied from
HighAvailabilityContainer). */
+ private AmsServerInfo buildServerInfo(String host, Integer thriftPort,
Integer httpPort) {
+ AmsServerInfo serverInfo = new AmsServerInfo();
+ serverInfo.setHost(host);
+ serverInfo.setThriftBindPort(thriftPort);
+ serverInfo.setRestBindPort(httpPort);
+ return serverInfo;
+ }
+
+ /** Create a mock CuratorFramework that uses MockZkState for storage. */
+ @SuppressWarnings("unchecked")
+ private CuratorFramework createMockZkClient() throws Exception {
+ CuratorFramework mockClient = mock(CuratorFramework.class);
+
+ // Mock getChildren() - create a chain of mocks
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+ getChildrenBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .GetChildrenBuilder.class);
+ when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getChildren(path);
+ });
+
+ // Mock getData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ getDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ .class);
+ when(mockClient.getData()).thenReturn(getDataBuilder);
+ when(getDataBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getData(path);
+ });
+
+ // Mock create() - manually create the entire fluent API chain to ensure
consistency
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
createBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+ @SuppressWarnings("unchecked")
+ org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable<
+ String>
+ pathAndBytesable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable.class);
+
+ when(mockClient.create()).thenReturn(createBuilder);
+
+ // Mock the chain: creatingParentsIfNeeded() -> withMode() -> forPath()
+ // Use the same mock object for the entire chain
+ when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+ // Mock forPath(path, data) - used by registAndElect()
+ when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ return mockZkState.createNode(path, data);
+ });
+
+ // Mock forPath(path) - used by createPathIfNeeded()
+ // Note: createPathIfNeeded() creates paths without data, but we still
need to store them
+ // so that getChildren() can work correctly
+ when(pathAndBytesable.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ // Create the path as an empty node (this simulates ZK path
creation)
+ // In real ZK, paths are logical containers, but we need to
store them
+ // to make getChildren() work correctly
+ if (mockZkState.exists(path) == null) {
+ mockZkState.createNode(path, new byte[0]);
+ }
+ return null;
+ });
+
+ // Mock delete()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
deleteBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+ when(mockClient.delete()).thenReturn(deleteBuilder);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNode(path);
+ return null;
+ })
+ .when(deleteBuilder)
+ .forPath(anyString());
+
+ // Mock checkExists()
+ @SuppressWarnings("unchecked")
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ checkExistsBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ .class);
+ when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+ when(checkExistsBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.exists(path);
+ });
+
+ // Mock start() and close()
+ doAnswer(invocation -> null).when(mockClient).start();
+ doAnswer(invocation -> null).when(mockClient).close();
+
+ return mockClient;
+ }
+
+ /** Create a mock LeaderLatch. */
+ private LeaderLatch createMockLeaderLatch() throws Exception {
+ return createMockLeaderLatch(true);
+ }
+
+ /** Create a mock LeaderLatch with specified leadership status. */
+ private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws
Exception {
+ LeaderLatch mockLatch = mock(LeaderLatch.class);
+ when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+ doAnswer(invocation -> null).when(mockLatch).addListener(any());
+ doAnswer(invocation -> null).when(mockLatch).start();
+ doAnswer(invocation -> null).when(mockLatch).close();
+ // Mock await() - it throws IOException and InterruptedException
+ doAnswer(
+ invocation -> {
+ // Mock implementation - doesn't actually wait
+ return null;
+ })
+ .when(mockLatch)
+ .await();
+ return mockLatch;
+ }
+
+ /** In-memory ZK state simulator. */
+ private static class MockZkState {
+ private final Map<String, byte[]> nodes = new HashMap<>();
+ private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+ public List<String> getChildren(String path) throws KeeperException {
+ List<String> children = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ // Only include direct children (not the path itself, and not nested
paths)
+ if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+ String relativePath = nodePath.substring(prefix.length());
+ // Only add direct children (no additional slashes)
+ // This means the path should be exactly: prefix + relativePath
+ if (!relativePath.contains("/")) {
+ children.add(relativePath);
+ }
+ }
+ }
+ // Sort to ensure consistent ordering
+ children.sort(String::compareTo);
+ return children;
+ }
+
+ public byte[] getData(String path) throws KeeperException {
+ byte[] data = nodes.get(path);
+ if (data == null) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ return data;
+ }
+
+ public String createNode(String path, byte[] data) {
+ // Handle sequential nodes
+ if (path.endsWith("-")) {
+ int seq = sequenceCounter.incrementAndGet();
+ path = path + String.format("%010d", seq);
+ }
+ nodes.put(path, data);
+ return path;
+ }
+
+ public void deleteNode(String path) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.remove(path);
+ }
+
+ public Stat exists(String path) {
+ return nodes.containsKey(path) ? new Stat() : null;
+ }
+
+ public void clear() {
+ nodes.clear();
+ sequenceCounter.set(0);
+ }
+ }
+}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java
new file mode 100644
index 000000000..a331001ed
--- /dev/null
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestZkBucketAssignStore.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.amoro.client.AmsServerInfo;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.CuratorFramework;
+import
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.CreateMode;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.KeeperException;
+import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Test for ZkBucketAssignStore using mocked ZK to avoid connection issues. */
+public class TestZkBucketAssignStore {
+
+ private CuratorFramework mockZkClient;
+ private ZkBucketAssignStore assignStore;
+ private AmsServerInfo node1;
+ private AmsServerInfo node2;
+ private MockZkState mockZkState;
+
+ @Before
+ public void setUp() throws Exception {
+ mockZkState = new MockZkState();
+ mockZkClient = createMockZkClient();
+
+ assignStore = new ZkBucketAssignStore(mockZkClient, "test-cluster");
+
+ node1 = new AmsServerInfo();
+ node1.setHost("127.0.0.1");
+ node1.setThriftBindPort(1260);
+ node1.setRestBindPort(1630);
+
+ node2 = new AmsServerInfo();
+ node2.setHost("127.0.0.2");
+ node2.setThriftBindPort(1261);
+ node2.setRestBindPort(1631);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (assignStore != null) {
+ try {
+ assignStore.removeAssignments(node1);
+ assignStore.removeAssignments(node2);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ mockZkState.clear();
+ }
+
+ @Test
+ public void testSaveAndGetAssignments() throws Exception {
+ List<String> bucketIds = Arrays.asList("1", "2", "3", "4", "5");
+
+ // Save assignments
+ assignStore.saveAssignments(node1, bucketIds);
+
+ // Get assignments
+ List<String> retrieved = assignStore.getAssignments(node1);
+ Assert.assertEquals("Bucket IDs should match", bucketIds, retrieved);
+ }
+
+ @Test
+ public void testGetAssignmentsForNonExistentNode() throws Exception {
+ List<String> retrieved = assignStore.getAssignments(node1);
+ Assert.assertNotNull("Should return empty list", retrieved);
+ Assert.assertTrue("Should return empty list", retrieved.isEmpty());
+ }
+
+ @Test
+ public void testUpdateAssignments() throws Exception {
+ List<String> initialBuckets = Arrays.asList("1", "2", "3");
+ List<String> updatedBuckets = Arrays.asList("4", "5", "6", "7");
+
+ // Save initial assignments
+ assignStore.saveAssignments(node1, initialBuckets);
+ Assert.assertEquals(initialBuckets, assignStore.getAssignments(node1));
+
+ // Update assignments
+ assignStore.saveAssignments(node1, updatedBuckets);
+ Assert.assertEquals(updatedBuckets, assignStore.getAssignments(node1));
+ }
+
+ @Test
+ public void testRemoveAssignments() throws Exception {
+ List<String> bucketIds = Arrays.asList("1", "2", "3");
+
+ // Save assignments
+ assignStore.saveAssignments(node1, bucketIds);
+ Assert.assertFalse("Should have assignments",
assignStore.getAssignments(node1).isEmpty());
+
+ // Remove assignments
+ assignStore.removeAssignments(node1);
+ Assert.assertTrue("Should be empty after removal",
assignStore.getAssignments(node1).isEmpty());
+ }
+
+ @Test
+ public void testGetAllAssignments() throws Exception {
+ List<String> buckets1 = Arrays.asList("1", "2", "3");
+ List<String> buckets2 = Arrays.asList("4", "5", "6");
+
+ // Save assignments for multiple nodes
+ assignStore.saveAssignments(node1, buckets1);
+ assignStore.saveAssignments(node2, buckets2);
+
+ // Get all assignments
+ Map<AmsServerInfo, List<String>> allAssignments =
assignStore.getAllAssignments();
+ Assert.assertEquals("Should have 2 nodes", 2, allAssignments.size());
+
+ // Find nodes by host and port since parseNodeKey doesn't set restBindPort
+ List<String> foundBuckets1 = null;
+ List<String> foundBuckets2 = null;
+ for (Map.Entry<AmsServerInfo, List<String>> entry :
allAssignments.entrySet()) {
+ AmsServerInfo node = entry.getKey();
+ if (node1.getHost().equals(node.getHost())
+ && node1.getThriftBindPort().equals(node.getThriftBindPort())) {
+ foundBuckets1 = entry.getValue();
+ } else if (node2.getHost().equals(node.getHost())
+ && node2.getThriftBindPort().equals(node.getThriftBindPort())) {
+ foundBuckets2 = entry.getValue();
+ }
+ }
+ Assert.assertEquals(buckets1, foundBuckets1);
+ Assert.assertEquals(buckets2, foundBuckets2);
+ }
+
+ @Test
+ public void testGetAllAssignmentsEmpty() throws Exception {
+ Map<AmsServerInfo, List<String>> allAssignments =
assignStore.getAllAssignments();
+ Assert.assertNotNull("Should return empty map", allAssignments);
+ Assert.assertTrue("Should be empty", allAssignments.isEmpty());
+ }
+
+ @Test
+ public void testLastUpdateTime() throws Exception {
+ List<String> bucketIds = Arrays.asList("1", "2", "3");
+
+ // Initially no update time
+ long initialTime = assignStore.getLastUpdateTime(node1);
+ Assert.assertEquals("Should be 0 initially", 0, initialTime);
+
+ // Save assignments (should update time)
+ long beforeSave = System.currentTimeMillis();
+ assignStore.saveAssignments(node1, bucketIds);
+ long afterSave = System.currentTimeMillis();
+
+ long updateTime = assignStore.getLastUpdateTime(node1);
+ Assert.assertTrue(
+ "Update time should be between before and after",
+ updateTime >= beforeSave && updateTime <= afterSave);
+
+ // Manually update time
+ Thread.sleep(10);
+ assignStore.updateLastUpdateTime(node1);
+ long newUpdateTime = assignStore.getLastUpdateTime(node1);
+ Assert.assertTrue("New update time should be later", newUpdateTime >
updateTime);
+ }
+
+ @Test
+ public void testEmptyBucketList() throws Exception {
+ List<String> emptyList = new ArrayList<>();
+ assignStore.saveAssignments(node1, emptyList);
+ List<String> retrieved = assignStore.getAssignments(node1);
+ Assert.assertNotNull("Should return empty list", retrieved);
+ Assert.assertTrue("Should be empty", retrieved.isEmpty());
+ }
+
+ @Test
+ public void testMultipleNodesWithSameHostDifferentPort() throws Exception {
+ AmsServerInfo node3 = new AmsServerInfo();
+ node3.setHost("127.0.0.1");
+ node3.setThriftBindPort(1262);
+ node3.setRestBindPort(1632);
+
+ List<String> buckets1 = Arrays.asList("1", "2");
+ List<String> buckets3 = Arrays.asList("3", "4");
+
+ assignStore.saveAssignments(node1, buckets1);
+ assignStore.saveAssignments(node3, buckets3);
+
+ Assert.assertEquals(buckets1, assignStore.getAssignments(node1));
+ Assert.assertEquals(buckets3, assignStore.getAssignments(node3));
+
+ Map<AmsServerInfo, List<String>> allAssignments =
assignStore.getAllAssignments();
+ Assert.assertEquals("Should have 2 nodes", 2, allAssignments.size());
+ }
+
+ /** Create a mock CuratorFramework that uses MockZkState for storage. */
+ @SuppressWarnings("unchecked")
+ private CuratorFramework createMockZkClient() throws Exception {
+ CuratorFramework mockClient = mock(CuratorFramework.class);
+
+ // Mock getChildren()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetChildrenBuilder
+ getChildrenBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .GetChildrenBuilder.class);
+ when(mockClient.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getChildren(path);
+ });
+
+ // Mock getData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ getDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.GetDataBuilder
+ .class);
+ when(mockClient.getData()).thenReturn(getDataBuilder);
+ when(getDataBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.getData(path);
+ });
+
+ // Mock create() - manually create the entire fluent API chain
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder
createBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.CreateBuilder.class);
+
+ @SuppressWarnings("unchecked")
+ org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable<
+ String>
+ pathAndBytesable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api
+ .ProtectACLCreateModeStatPathAndBytesable.class);
+
+ when(mockClient.create()).thenReturn(createBuilder);
+ when(createBuilder.creatingParentsIfNeeded()).thenReturn(pathAndBytesable);
+
when(pathAndBytesable.withMode(any(CreateMode.class))).thenReturn(pathAndBytesable);
+
+ // Mock forPath(path, data) - used by saveAssignments()
+ when(pathAndBytesable.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ return mockZkState.createNode(path, data);
+ });
+
+ // Mock forPath(path) - used by createPathIfNeeded()
+ when(pathAndBytesable.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ if (mockZkState.exists(path) == null) {
+ mockZkState.createNode(path, new byte[0]);
+ }
+ return null;
+ });
+
+ // Mock setData()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+ setDataBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.SetDataBuilder
+ .class);
+ when(mockClient.setData()).thenReturn(setDataBuilder);
+ when(setDataBuilder.forPath(anyString(), any(byte[].class)))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ byte[] data = invocation.getArgument(1);
+ mockZkState.setData(path, data);
+ return null;
+ });
+
+ // Mock delete()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder
deleteBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.DeleteBuilder.class);
+ when(mockClient.delete()).thenReturn(deleteBuilder);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNode(path);
+ return null;
+ })
+ .when(deleteBuilder)
+ .forPath(anyString());
+
+ // Mock deletingChildrenIfNeeded()
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+ childrenDeletable =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ChildrenDeletable
+ .class);
+
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(childrenDeletable);
+ doAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ mockZkState.deleteNodeRecursive(path);
+ return null;
+ })
+ .when(childrenDeletable)
+ .forPath(anyString());
+
+ // Mock checkExists()
+ @SuppressWarnings("unchecked")
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ checkExistsBuilder =
+ mock(
+
org.apache.amoro.shade.zookeeper3.org.apache.curator.framework.api.ExistsBuilder
+ .class);
+ when(mockClient.checkExists()).thenReturn(checkExistsBuilder);
+ when(checkExistsBuilder.forPath(anyString()))
+ .thenAnswer(
+ invocation -> {
+ String path = invocation.getArgument(0);
+ return mockZkState.exists(path);
+ });
+
+ // Mock start() and close()
+ doAnswer(invocation -> null).when(mockClient).start();
+ doAnswer(invocation -> null).when(mockClient).close();
+
+ return mockClient;
+ }
+
+ /** Create a mock LeaderLatch with specified leadership status. */
+ private LeaderLatch createMockLeaderLatch(boolean hasLeadership) throws
Exception {
+ LeaderLatch mockLatch = mock(LeaderLatch.class);
+ when(mockLatch.hasLeadership()).thenReturn(hasLeadership);
+ doAnswer(invocation -> null).when(mockLatch).addListener(any());
+ doAnswer(invocation -> null).when(mockLatch).start();
+ doAnswer(invocation -> null).when(mockLatch).close();
+ doAnswer(
+ invocation -> {
+ // Mock implementation - doesn't actually wait
+ return null;
+ })
+ .when(mockLatch)
+ .await();
+ return mockLatch;
+ }
+
+ /** In-memory ZK state simulator. */
+ private static class MockZkState {
+ private final Map<String, byte[]> nodes = new HashMap<>();
+ private final AtomicInteger sequenceCounter = new AtomicInteger(0);
+
+ public List<String> getChildren(String path) throws KeeperException {
+ List<String> children = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ if (nodePath.startsWith(prefix) && !nodePath.equals(path)) {
+ String relativePath = nodePath.substring(prefix.length());
+ if (!relativePath.contains("/")) {
+ children.add(relativePath);
+ }
+ }
+ }
+ children.sort(String::compareTo);
+ return children;
+ }
+
+ public byte[] getData(String path) throws KeeperException {
+ byte[] data = nodes.get(path);
+ if (data == null) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ return data;
+ }
+
+ public void setData(String path, byte[] data) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.put(path, data);
+ }
+
+ public String createNode(String path, byte[] data) {
+ // Handle sequential nodes
+ if (path.endsWith("-")) {
+ int seq = sequenceCounter.incrementAndGet();
+ path = path + String.format("%010d", seq);
+ }
+ // Create parent paths if they don't exist (simulating
creatingParentsIfNeeded)
+ createParentPaths(path);
+ nodes.put(path, data);
+ return path;
+ }
+
+ private void createParentPaths(String path) {
+ // Create all parent paths as empty nodes
+ // Handle absolute paths (starting with "/")
+ boolean isAbsolute = path.startsWith("/");
+ String[] parts = path.split("/");
+ StringBuilder currentPath = new StringBuilder();
+ if (isAbsolute) {
+ currentPath.append("/");
+ }
+ for (int i = 0; i < parts.length - 1; i++) {
+ if (parts[i].isEmpty()) {
+ continue; // Skip empty parts from split
+ }
+ if (currentPath.length() > 0 && !currentPath.toString().endsWith("/"))
{
+ currentPath.append("/");
+ }
+ currentPath.append(parts[i]);
+ String parentPath = currentPath.toString();
+ // Only create if it doesn't exist
+ if (!nodes.containsKey(parentPath)) {
+ nodes.put(parentPath, new byte[0]);
+ }
+ }
+ }
+
+ public void deleteNode(String path) throws KeeperException {
+ if (!nodes.containsKey(path)) {
+ throw new KeeperException.NoNodeException(path);
+ }
+ nodes.remove(path);
+ }
+
+ public void deleteNodeRecursive(String path) throws KeeperException {
+ // Delete the node and all its children
+ List<String> toDelete = new ArrayList<>();
+ String prefix = path.endsWith("/") ? path : path + "/";
+ for (String nodePath : nodes.keySet()) {
+ if (nodePath.equals(path) || nodePath.startsWith(prefix)) {
+ toDelete.add(nodePath);
+ }
+ }
+ for (String nodePath : toDelete) {
+ nodes.remove(nodePath);
+ }
+ }
+
+ public Stat exists(String path) {
+ return nodes.containsKey(path) ? new Stat() : null;
+ }
+
+ public void clear() {
+ nodes.clear();
+ sequenceCounter.set(0);
+ }
+ }
+}
diff --git a/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
b/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
index 76a60f4a4..df91badb7 100644
--- a/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
+++ b/amoro-common/src/main/java/org/apache/amoro/ErrorCodes.java
@@ -36,4 +36,6 @@ public final class ErrorCodes {
public static final int PLUGIN_RETRY_AUTH_ERROR_CODE = 2006;
public static final int BLOCKER_CONFLICT_ERROR_CODE = 3001;
+
+ public static final int BUCKET_ASSIGN_STORE_ERROR_CODE = 4000;
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
index 555c238c1..fd30c9de0 100644
---
a/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java
@@ -40,6 +40,7 @@ public class AmoroRuntimeException extends RuntimeException {
static {
CODE_MAP.put(PersistenceException.class,
ErrorCodes.PERSISTENCE_ERROR_CODE);
+ CODE_MAP.put(BucketAssignStoreException.class,
ErrorCodes.BUCKET_ASSIGN_STORE_ERROR_CODE);
CODE_MAP.put(ObjectNotExistsException.class,
ErrorCodes.OBJECT_NOT_EXISTS_ERROR_CODE);
CODE_MAP.put(AlreadyExistsException.class,
ErrorCodes.ALREADY_EXISTS_ERROR_CODE);
CODE_MAP.put(IllegalMetadataException.class,
ErrorCodes.ILLEGAL_METADATA_ERROR_CODE);
diff --git
a/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
b/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
new file mode 100644
index 000000000..ff02963cc
--- /dev/null
+++
b/amoro-common/src/main/java/org/apache/amoro/exception/BucketAssignStoreException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.exception;
+
+/**
+ * Exception thrown when bucket assignment store operations fail (e.g. save,
get, remove
+ * assignments).
+ */
+public class BucketAssignStoreException extends AmoroRuntimeException {
+
+ public BucketAssignStoreException(String message) {
+ super(message);
+ }
+
+ public BucketAssignStoreException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public BucketAssignStoreException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
index 08b9ef04a..5e4f0a8d8 100644
---
a/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
+++
b/amoro-common/src/main/java/org/apache/amoro/properties/AmsHAProperties.java
@@ -26,6 +26,7 @@ public class AmsHAProperties {
private static final String TABLE_SERVICE_MASTER_PATH = "/master";
private static final String OPTIMIZING_SERVICE_MASTER_PATH =
"/optimizing-service-master";
private static final String NODES_PATH = "/nodes";
+ private static final String BUCKET_ASSIGNMENTS_PATH = "/bucket-assignments";
private static final String NAMESPACE_DEFAULT = "default";
private static String getBasePath(String namespace) {
@@ -50,4 +51,8 @@ public class AmsHAProperties {
public static String getNodesPath(String namespace) {
return getBasePath(namespace) + NODES_PATH;
}
+
+ public static String getBucketAssignmentsPath(String namespace) {
+ return getBasePath(namespace) + BUCKET_ASSIGNMENTS_PATH;
+ }
}
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index 0f5f272da..da6cf20eb 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -70,11 +70,14 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| expire-snapshots.enabled | true | Enable snapshots expiring. |
| expire-snapshots.interval | 1 h | Interval for expiring snapshots. |
| expire-snapshots.thread-count | 10 | The number of threads used for
snapshots expiring. |
+| ha.bucket-assign.interval | 1 min | Interval for bucket assignment service
to detect node changes and redistribute bucket IDs. |
+| ha.bucket-id.total-count | 100 | Total count of bucket IDs for assignment.
Bucket IDs range from 1 to this value. |
| ha.cluster-name | default | Amoro management service cluster name. |
| ha.connection-timeout | 5 min | The Zookeeper connection timeout in
milliseconds. |
| ha.enabled | false | Whether to enable high availability mode. |
| ha.heartbeat-interval | 10 s | HA heartbeat interval. |
| ha.lease-ttl | 30 s | TTL of HA lease. |
+| ha.node-offline.timeout | 5 min | Timeout duration to determine if a node is
offline. After this duration, the node's bucket IDs will be reassigned. |
| ha.session-timeout | 30 s | The Zookeeper session timeout in milliseconds. |
| ha.type | zk | High availability implementation type: zk or database. |
| ha.zookeeper-address | | The Zookeeper address used for high availability. |