xxubai commented on code in PR #3927:
URL: https://github.com/apache/amoro/pull/3927#discussion_r2981101992


##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -87,19 +94,46 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   private final Configurations serverConfiguration;
   private final CatalogManager catalogManager;
   private final TableRuntimeFactory tableRuntimeFactory;
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore bucketAssignStore;
+  private final boolean isMasterSlaveMode;
   private RuntimeHandlerChain headHandler;
   private ExecutorService tableExplorerExecutors;
 
+  // Master-slave mode related fields
+  private ScheduledExecutorService bucketTableSyncScheduler;
+  private volatile List<String> assignedBucketIds = new ArrayList<>();
+  private final long bucketTableSyncInterval;
+  // Lock for bucketId assignment to prevent concurrent assignment conflicts
+  private final Object bucketIdAssignmentLock = new Object();

Review Comment:
   The current bucketIdAssignmentLock only serializes assignment inside a 
single JVM, so it cannot guarantee correctness across AMS nodes, while still 
making the whole DB-read + heap-build path single-threaded locally. 
   
   I’d suggest narrowing the lock to local pending-counter bookkeeping only, 
and moving correctness to persistence-layer atomicity instead, e.g. a dedicated 
`UPDATE table_runtime SET bucket_id = ? WHERE table_id = ? AND bucket_id IS 
NULL` path. 
   
   Longer term, bucket selection itself should be coordinated by the shared 
HA/store layer rather than local in-memory state.



##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -495,7 +821,78 @@ private boolean triggerTableAdded(
     meta.setStatusCode(OptimizingStatus.IDLE.getCode());
     meta.setGroupName(configuration.getOptimizingConfig().getOptimizerGroup());
     meta.setTableSummary(new TableSummary());
-    doAs(TableRuntimeMapper.class, mapper -> mapper.insertRuntime(meta));
+
+    // In master-slave mode, assign bucketId to the table if it's not assigned 
yet.
+    // Only leader node should assign bucketIds; follower may still persist 
the table with null
+    // bucketId (e.g. onTableCreated on follower), and leader will assign 
later via exploration.
+    String assignedBucketId = null;
+    if (isMasterSlaveMode) {
+      if (haContainer != null && haContainer.hasLeadership()) {
+        TableRuntimeMeta existingMeta =
+            getAs(
+                TableRuntimeMapper.class,
+                mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
+        if (existingMeta != null) {
+          // Runtime already exists (e.g. inserted by follower with null 
bucketId)
+          if (existingMeta.getBucketId() != null) {
+            return true; // already assigned
+          }
+          assignedBucketId = assignBucketIdForTable();
+          if (assignedBucketId != null) {
+            existingMeta.setBucketId(assignedBucketId);
+            doAs(TableRuntimeMapper.class, mapper -> 
mapper.updateRuntime(existingMeta));

Review Comment:
   Backfilling bucket_id for an existing runtime uses 
updateRuntime(existingMeta), which updates the full row, not just bucket_id. 
   That can overwrite concurrent changes to status/config/summary with stale 
values read earlier. 
   This should be a dedicated `UPDATE ... SET bucket_id = ? WHERE table_id = ? 
AND bucket_id IS NULL` instead.



##########
amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java:
##########
@@ -62,6 +62,13 @@ public class AmoroManagementConf {
               "This setting controls whether to enable the AMS horizontal 
scaling feature, "
                   + "which is currently under development and testing.");
 
+  public static final ConfigOption<Duration> BUCKET_TABLE_SYNC_INTERVAL =
+      ConfigOptions.key("bucket-table-sync.interval")

Review Comment:
   The new config naming is a bit misleading. `bucket-table-sync.interval` is 
HA/master-slave-specific, but it is exposed as a top-level AMS config. 
`ha.bucket-table-sync.interval` would better reflect its scope and align with 
the rest of the HA settings. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to