wardlican commented on code in PR #3922:
URL: https://github.com/apache/amoro/pull/3922#discussion_r2923888908


##########
amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java:
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.config.Configurations;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
+import 
org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service for assigning bucket IDs to AMS nodes in master-slave mode. 
Periodically detects node
+ * changes and redistributes bucket IDs evenly.
+ */
+public class AmsAssignService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AmsAssignService.class);
+
+  private final ScheduledExecutorService assignScheduler =
+      Executors.newSingleThreadScheduledExecutor(
+          new ThreadFactoryBuilder()
+              .setNameFormat("ams-assign-scheduler-%d")
+              .setDaemon(true)
+              .build());
+
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore assignStore;
+  private final Configurations serviceConfig;
+  private final int bucketIdTotalCount;
+  private final long nodeOfflineTimeoutMs;
+  private final long assignIntervalSeconds;
+  private volatile boolean running = false;
+
+  boolean isRunning() {
+    return running;
+  }
+
+  void doAssignForTest() {
+    doAssign();
+  }
+
+  public AmsAssignService(HighAvailabilityContainer haContainer, 
Configurations serviceConfig) {
+    this.haContainer = haContainer;
+    this.serviceConfig = serviceConfig;
+    this.bucketIdTotalCount =
+        serviceConfig.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+    this.nodeOfflineTimeoutMs =
+        serviceConfig.get(AmoroManagementConf.NODE_OFFLINE_TIMEOUT).toMillis();
+    this.assignIntervalSeconds =
+        serviceConfig.get(AmoroManagementConf.ASSIGN_INTERVAL).getSeconds();
+    this.assignStore = BucketAssignStoreFactory.create(haContainer, 
serviceConfig);
+  }
+
+  /**
+   * Start the assignment service. Only works in master-slave mode and when 
current node is leader.
+   */
+  public void start() {
+    if (!serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE)) {
+      LOG.info("Master-slave mode is not enabled, skip starting bucket 
assignment service");
+      return;
+    }
+    if (running) {
+      LOG.warn("Bucket assignment service is already running");
+      return;
+    }
+    running = true;
+    assignScheduler.scheduleWithFixedDelay(
+        this::doAssign, 10, assignIntervalSeconds, TimeUnit.SECONDS);
+    LOG.info("Bucket assignment service started with interval: {} seconds", 
assignIntervalSeconds);
+  }
+
+  /** Stop the assignment service. */
+  public void stop() {
+    if (!running) {
+      return;
+    }
+    running = false;
+    assignScheduler.shutdown();
+    try {
+      if (!assignScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+        assignScheduler.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      assignScheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Bucket assignment service stopped");
+  }
+
+  private void doAssign() {
+    try {
+      if (!haContainer.hasLeadership()) {
+        LOG.debug("Current node is not leader, skip bucket assignment");
+        return;
+      }
+
+      List<AmsServerInfo> aliveNodes = haContainer.getAliveNodes();
+      if (aliveNodes.isEmpty()) {
+        LOG.debug("No alive nodes found, skip bucket assignment");
+        return;
+      }
+
+      Map<AmsServerInfo, List<String>> currentAssignments = 
assignStore.getAllAssignments();
+
+      // Create a mapping from stored nodes (may have null restBindPort) to 
alive nodes (complete
+      // info)
+      // Use host:thriftBindPort as the key for matching
+      Map<String, AmsServerInfo> aliveNodeMap = new java.util.HashMap<>();
+      for (AmsServerInfo node : aliveNodes) {
+        String key = getNodeKey(node);
+        aliveNodeMap.put(key, node);
+      }
+
+      // Normalize current assignments: map stored nodes to their 
corresponding alive nodes
+      Map<AmsServerInfo, List<String>> normalizedAssignments = new 
java.util.HashMap<>();
+      Set<AmsServerInfo> currentAssignedNodes = new HashSet<>();
+      for (Map.Entry<AmsServerInfo, List<String>> entry : 
currentAssignments.entrySet()) {
+        AmsServerInfo storedNode = entry.getKey();
+        String nodeKey = getNodeKey(storedNode);
+        AmsServerInfo aliveNode = aliveNodeMap.get(nodeKey);
+        if (aliveNode != null) {
+          // Node is alive, use the complete node info from aliveNodes
+          normalizedAssignments.put(aliveNode, entry.getValue());
+          currentAssignedNodes.add(aliveNode);
+        } else {
+          // Node is not in alive list, keep the stored node info for offline 
detection
+          normalizedAssignments.put(storedNode, entry.getValue());
+          currentAssignedNodes.add(storedNode);
+        }
+      }
+
+      Set<AmsServerInfo> aliveNodeSet = new HashSet<>(aliveNodes);
+
+      // Detect new nodes and offline nodes
+      Set<AmsServerInfo> newNodes = new HashSet<>(aliveNodeSet);
+      newNodes.removeAll(currentAssignedNodes);
+
+      Set<AmsServerInfo> offlineNodes = new HashSet<>();
+      for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+        String nodeKey = getNodeKey(storedNode);
+        if (!aliveNodeMap.containsKey(nodeKey)) {
+          offlineNodes.add(storedNode);
+        }
+      }
+
+      // Check for nodes that haven't updated for a long time
+      long currentTime = System.currentTimeMillis();
+      Set<String> aliveNodeKeys = new HashSet<>();
+      for (AmsServerInfo node : aliveNodes) {
+        aliveNodeKeys.add(getNodeKey(node));
+      }
+      for (AmsServerInfo node : currentAssignedNodes) {
+        String nodeKey = getNodeKey(node);
+        if (aliveNodeKeys.contains(nodeKey)) {
+          long lastUpdateTime = assignStore.getLastUpdateTime(node);
+          if (lastUpdateTime > 0 && (currentTime - lastUpdateTime) > 
nodeOfflineTimeoutMs) {
+            // Find the stored node for this alive node to add to offlineNodes
+            for (AmsServerInfo storedNode : currentAssignments.keySet()) {
+              if (getNodeKey(storedNode).equals(nodeKey)) {
+                offlineNodes.add(storedNode);
+                break;
+              }
+            }
+            LOG.warn(
+                "Node {} is considered offline due to timeout. Last update: 
{}",
+                node,
+                lastUpdateTime);
+          }
+        }
+      }
+
+      boolean needReassign = !newNodes.isEmpty() || !offlineNodes.isEmpty();
+
+      if (needReassign) {
+        LOG.info(
+            "Detected node changes - New nodes: {}, Offline nodes: {}, 
Performing incremental reassignment...",
+            newNodes.size(),
+            offlineNodes.size());
+
+        // Step 1: Handle offline nodes - collect their buckets for 
redistribution
+        List<String> bucketsToRedistribute = new ArrayList<>();
+        for (AmsServerInfo offlineNode : offlineNodes) {
+          try {
+            List<String> offlineBuckets = currentAssignments.get(offlineNode);
+            if (offlineBuckets != null && !offlineBuckets.isEmpty()) {
+              bucketsToRedistribute.addAll(offlineBuckets);
+              LOG.info(
+                  "Collected {} buckets from offline node {} for 
redistribution",
+                  offlineBuckets.size(),
+                  offlineNode);
+            }
+            assignStore.removeAssignments(offlineNode);
+          } catch (Exception e) {
+            LOG.warn("Failed to remove assignments for offline node {}", 
offlineNode, e);
+          }
+        }
+
+        // Step 2: Calculate target assignment for balanced distribution
+        List<String> allBuckets = generateBucketIds();
+        int totalBuckets = allBuckets.size();
+        int totalAliveNodes = aliveNodes.size();
+        int targetBucketsPerNode = totalBuckets / totalAliveNodes;
+        int remainder = totalBuckets % totalAliveNodes;
+
+        // Step 3: Incremental reassignment
+        // Keep existing assignments for nodes that are still alive
+        Map<AmsServerInfo, List<String>> newAssignments = new 
java.util.HashMap<>();
+        Set<String> offlineNodeKeys = new HashSet<>();
+        for (AmsServerInfo offlineNode : offlineNodes) {
+          offlineNodeKeys.add(getNodeKey(offlineNode));
+        }
+        for (AmsServerInfo node : aliveNodes) {
+          String nodeKey = getNodeKey(node);
+          if (!offlineNodeKeys.contains(nodeKey)) {
+            // Node is alive and not offline, check if it has existing 
assignments
+            List<String> existingBuckets = normalizedAssignments.get(node);
+            if (existingBuckets != null && !existingBuckets.isEmpty()) {
+              // Keep existing buckets for alive nodes (not offline)
+              newAssignments.put(node, new ArrayList<>(existingBuckets));
+            } else {
+              // New node
+              newAssignments.put(node, new ArrayList<>());
+            }
+          } else {
+            // Node was offline, start with empty assignment
+            newAssignments.put(node, new ArrayList<>());
+          }
+        }
+
+        // Step 4: Redistribute buckets from offline nodes to alive nodes
+        if (!bucketsToRedistribute.isEmpty()) {
+          redistributeBucketsIncrementally(aliveNodes, bucketsToRedistribute, 
newAssignments);
+        }
+
+        // Step 5: Handle new nodes - balance buckets from existing nodes
+        if (!newNodes.isEmpty()) {
+          balanceBucketsForNewNodes(
+              aliveNodes, newNodes, newAssignments, targetBucketsPerNode, 
remainder);
+        }
+
+        // Step 6: Handle unassigned buckets (if any)
+        Set<String> allAssignedBuckets = new HashSet<>();
+        for (List<String> buckets : newAssignments.values()) {
+          allAssignedBuckets.addAll(buckets);
+        }
+        List<String> unassignedBuckets = new ArrayList<>();
+        for (String bucket : allBuckets) {
+          if (!allAssignedBuckets.contains(bucket)) {
+            unassignedBuckets.add(bucket);
+          }
+        }
+        if (!unassignedBuckets.isEmpty()) {
+          redistributeBucketsIncrementally(aliveNodes, unassignedBuckets, 
newAssignments);
+        }
+
+        // Step 7: Save all new assignments
+        for (Map.Entry<AmsServerInfo, List<String>> entry : 
newAssignments.entrySet()) {
+          try {
+            assignStore.saveAssignments(entry.getKey(), entry.getValue());
+            LOG.info(
+                "Assigned {} buckets to node {}: {}",
+                entry.getValue().size(),
+                entry.getKey(),
+                entry.getValue());
+          } catch (Exception e) {
+            LOG.error("Failed to save assignments for node {}", 
entry.getKey(), e);
+          }
+        }
+      } else {
+        // Update last update time for alive nodes
+        for (AmsServerInfo node : aliveNodes) {
+          assignStore.updateLastUpdateTime(node);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error during bucket assignment", e);
+    }
+  }
+
+  /**
+   * Redistribute buckets incrementally to alive nodes using round-robin. This 
minimizes bucket
+   * migration by only redistributing buckets from offline nodes.
+   *
+   * @param aliveNodes List of alive nodes
+   * @param bucketsToRedistribute Buckets to redistribute (from offline nodes)
+   * @param currentAssignments Current assignments map (will be modified)
+   */
+  private void redistributeBucketsIncrementally(
+      List<AmsServerInfo> aliveNodes,
+      List<String> bucketsToRedistribute,
+      Map<AmsServerInfo, List<String>> currentAssignments) {
+    if (aliveNodes.isEmpty() || bucketsToRedistribute.isEmpty()) {
+      return;
+    }
+
+    // Distribute buckets using round-robin to minimize migration
+    int nodeIndex = 0;
+    for (String bucketId : bucketsToRedistribute) {
+      AmsServerInfo node = aliveNodes.get(nodeIndex % aliveNodes.size());
+      currentAssignments.get(node).add(bucketId);

Review Comment:
   This problem will not occur.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to