wardlican commented on code in PR #3927:
URL: https://github.com/apache/amoro/pull/3927#discussion_r2985304677


##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -259,8 +341,234 @@ public void dispose() {
     tableRuntimeMap.values().forEach(TableRuntime::unregisterMetric);
   }
 
+  /**
+   * Update assigned bucket IDs from AssignStore. This should be called 
periodically to refresh the
+   * bucket assignments.
+   */
+  private void updateAssignedBucketIds() {
+    if (haContainer == null || bucketAssignStore == null) {
+      LOG.warn(
+          "No assigned bucket ids found. check if haContainer == null or 
bucketAssignStore == null");
+      return;
+    }
+    try {
+      AmsServerInfo currentServerInfo = 
haContainer.getTableServiceServerInfo();
+      if (currentServerInfo == null) {
+        LOG.warn("Cannot get current server info, skip updating assigned 
bucketIds");
+        return;
+      }
+      List<String> newBucketIds = 
bucketAssignStore.getAssignments(currentServerInfo);
+      if (!newBucketIds.equals(assignedBucketIds)) {
+        LOG.info("Assigned bucketIds changed from {} to {}", 
assignedBucketIds, newBucketIds);
+        assignedBucketIds = new ArrayList<>(newBucketIds);
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to update assigned bucketIds", e);
+    }
+  }
+
+  /**
+   * Sync tables for assigned bucket IDs. This method is called periodically 
in master-slave mode.
+   */
+  private void syncBucketTables() {
+    if (!isMasterSlaveMode || haContainer == null || bucketAssignStore == 
null) {
+      return;
+    }
+    try {
+      updateAssignedBucketIds();
+      if (assignedBucketIds.isEmpty()) {
+        // In master-slave mode, if no bucketIds are assigned yet, it's normal 
during startup
+        // The AmsAssignService will assign bucketIds later
+        LOG.debug("No bucketIds assigned to this node yet, skip syncing tables 
(will retry later)");
+        return;
+      }
+
+      LOG.info("syncBucketTables assignedBucketIds:{}", assignedBucketIds);
+      // Load tables from database for assigned bucketIds
+      List<TableRuntimeMeta> tableRuntimeMetaList =
+          getAs(
+              TableRuntimeMapper.class,
+              mapper -> mapper.selectRuntimesByBucketIds(assignedBucketIds, 
false));
+
+      Map<Long, ServerTableIdentifier> identifierMap =
+          getAs(TableMetaMapper.class, 
TableMetaMapper::selectAllTableIdentifiers).stream()
+              .collect(Collectors.toMap(ServerTableIdentifier::getId, 
Function.identity()));
+
+      Map<Long, List<TableRuntimeState>> statesMap =
+          getAs(TableRuntimeMapper.class, 
TableRuntimeMapper::selectAllStates).stream()
+              .collect(
+                  Collectors.toMap(
+                      TableRuntimeState::getTableId,
+                      Lists::newArrayList,
+                      (a, b) -> {
+                        a.addAll(b);
+                        return a;
+                      }));
+
+      // Find tables that should be added (in DB but not in memory)
+      Set<Long> currentTableIds = new HashSet<>(tableRuntimeMap.keySet());
+      Set<Long> dbTableIds =
+          tableRuntimeMetaList.stream()
+              .map(TableRuntimeMeta::getTableId)
+              .collect(Collectors.toSet());
+
+      // Add new tables
+      for (TableRuntimeMeta tableRuntimeMeta : tableRuntimeMetaList) {
+        Long tableId = tableRuntimeMeta.getTableId();
+        if (!currentTableIds.contains(tableId)) {
+          ServerTableIdentifier identifier = identifierMap.get(tableId);
+          if (identifier == null) {
+            LOG.warn("No available table identifier found for table runtime 
meta id={}", tableId);
+            continue;
+          }
+          List<TableRuntimeState> states = statesMap.get(tableId);
+          // Use empty list if states is null to avoid NullPointerException
+          if (states == null) {
+            states = Collections.emptyList();
+          }
+          Optional<TableRuntime> tableRuntime =
+              createTableRuntime(identifier, tableRuntimeMeta, states);
+          if (tableRuntime.isPresent()) {
+            TableRuntime runtime = tableRuntime.get();
+            
runtime.registerMetric(MetricManager.getInstance().getGlobalRegistry());
+            tableRuntimeMap.put(tableId, runtime);
+            if (headHandler != null) {
+              AmoroTable<?> table = loadTable(identifier);
+              if (table != null) {
+                headHandler.fireTableAdded(table, runtime);
+              }
+            }
+            LOG.info("Added table {} for bucketId {}", tableId, 
tableRuntimeMeta.getBucketId());
+          }
+        }
+      }
+
+      // Remove tables that are no longer assigned to this node
+      List<Long> tablesToRemove = new ArrayList<>();
+      for (Long tableId : currentTableIds) {
+        if (!dbTableIds.contains(tableId)) {
+          // Check if this table's bucketId is still assigned to this node
+          TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+          if (tableRuntime != null) {
+            // Get bucketId from database
+            TableRuntimeMeta meta =
+                getAs(TableRuntimeMapper.class, mapper -> 
mapper.selectRuntime(tableId));
+            if (meta != null && meta.getBucketId() != null) {
+              if (!assignedBucketIds.contains(meta.getBucketId())) {
+                tablesToRemove.add(tableId);
+              }
+            } else if (meta == null || meta.getBucketId() == null) {
+              // Table removed from database or bucketId is null
+              tablesToRemove.add(tableId);
+            }
+          }
+        }
+      }
+
+      for (Long tableId : tablesToRemove) {
+        TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+        if (tableRuntime != null) {
+          try {
+            if (headHandler != null) {
+              headHandler.fireTableRemoved(tableRuntime);
+            }
+            tableRuntime.dispose();
+            tableRuntimeMap.remove(tableId);
+            LOG.info("Removed table {} as it's no longer assigned to this 
node", tableId);
+          } catch (Exception e) {
+            LOG.error("Error occurred while removing tableRuntime of table 
{}", tableId, e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error during bucket table sync", e);
+    }
+  }
+
+  /**
+   * Assign bucketId to a table using min-heap strategy. This ensures tables 
are evenly distributed
+   * across bucketIds.
+   *
+   * @return The assigned bucketId, or null if assignment fails
+   */
+  private String assignBucketIdForTable() {
+    // Synchronize to prevent concurrent assignments from selecting the same 
bucketId
+    synchronized (bucketIdAssignmentLock) {
+      try {
+        // Get bucketId distribution statistics from database (optimized query)
+        List<BucketIdCount> bucketIdCounts =
+            getAs(
+                TableRuntimeMapper.class,
+                (TableRuntimeMapper mapper) -> mapper.countTablesByBucketId());
+
+        // Initialize bucketId count map with all possible bucketIds
+        Map<String, Integer> bucketTableCount = new ConcurrentHashMap<>();
+        int bucketIdTotalCount =
+            
serverConfiguration.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+        for (int i = 1; i <= bucketIdTotalCount; i++) {
+          bucketTableCount.put(String.valueOf(i), 0);
+        }
+
+        // Fill in counts from database query results
+        for (BucketIdCount bucketIdCount : bucketIdCounts) {
+          String bucketId = bucketIdCount.getBucketId();
+          if (bucketId != null && !bucketId.trim().isEmpty()) {
+            bucketTableCount.put(bucketId, bucketIdCount.getCount());
+          }
+        }
+
+        // Add pending assignments (assigned but not yet saved to DB) to the 
count
+        // This ensures concurrent table creation gets different bucketIds
+        // Note: Pending counts are decremented when tables are successfully 
saved to DB
+        for (Map.Entry<String, Integer> pendingEntry : 
pendingBucketIdCounts.entrySet()) {
+          String bucketId = pendingEntry.getKey();
+          int pendingCount = pendingEntry.getValue();
+          bucketTableCount.put(bucketId, 
bucketTableCount.getOrDefault(bucketId, 0) + pendingCount);
+        }
+
+        // Use min-heap to find bucketId with minimum table count
+        // Create new entries instead of using Map.Entry directly to avoid 
reference issues
+        PriorityQueue<Map.Entry<String, Integer>> minHeap =
+            new PriorityQueue<>(
+                Comparator.<Map.Entry<String, 
Integer>>comparingInt(Map.Entry::getValue)
+                    .thenComparing(Map.Entry::getKey)); // Use bucketId as 
tie-breaker
+
+        // Create independent entries for the heap to avoid reference issues
+        for (Map.Entry<String, Integer> entry : bucketTableCount.entrySet()) {
+          minHeap.offer(new 
java.util.AbstractMap.SimpleEntry<>(entry.getKey(), entry.getValue()));

Review Comment:
   Okay, I will fix this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to