xxubai commented on code in PR #3927:
URL: https://github.com/apache/amoro/pull/3927#discussion_r2981101992
##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -87,19 +94,46 @@ public class DefaultTableService extends PersistentBase
implements TableService
private final Configurations serverConfiguration;
private final CatalogManager catalogManager;
private final TableRuntimeFactory tableRuntimeFactory;
+ private final HighAvailabilityContainer haContainer;
+ private final BucketAssignStore bucketAssignStore;
+ private final boolean isMasterSlaveMode;
private RuntimeHandlerChain headHandler;
private ExecutorService tableExplorerExecutors;
+ // Master-slave mode related fields
+ private ScheduledExecutorService bucketTableSyncScheduler;
+ private volatile List<String> assignedBucketIds = new ArrayList<>();
+ private final long bucketTableSyncInterval;
+ // Lock for bucketId assignment to prevent concurrent assignment conflicts
+ private final Object bucketIdAssignmentLock = new Object();
Review Comment:
The current bucketIdAssignmentLock only serializes assignment inside a
single JVM, so it cannot guarantee correctness across AMS nodes, while still
making the whole DB-read + heap-build path single-threaded locally.
I’d suggest narrowing the lock to local pending-counter bookkeeping only,
and moving correctness to persistence-layer atomicity instead, e.g. a dedicated
`UPDATE table_runtime SET bucket_id = ? WHERE table_id = ? AND bucket_id IS
NULL` path.
Longer term, bucket selection itself should be coordinated by the shared
HA/store layer rather than local in-memory state.
##########
amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java:
##########
@@ -495,7 +821,78 @@ private boolean triggerTableAdded(
meta.setStatusCode(OptimizingStatus.IDLE.getCode());
meta.setGroupName(configuration.getOptimizingConfig().getOptimizerGroup());
meta.setTableSummary(new TableSummary());
- doAs(TableRuntimeMapper.class, mapper -> mapper.insertRuntime(meta));
+
+ // In master-slave mode, assign bucketId to the table if it's not assigned
yet.
+ // Only leader node should assign bucketIds; follower may still persist
the table with null
+ // bucketId (e.g. onTableCreated on follower), and leader will assign
later via exploration.
+ String assignedBucketId = null;
+ if (isMasterSlaveMode) {
+ if (haContainer != null && haContainer.hasLeadership()) {
+ TableRuntimeMeta existingMeta =
+ getAs(
+ TableRuntimeMapper.class,
+ mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
+ if (existingMeta != null) {
+ // Runtime already exists (e.g. inserted by follower with null
bucketId)
+ if (existingMeta.getBucketId() != null) {
+ return true; // already assigned
+ }
+ assignedBucketId = assignBucketIdForTable();
+ if (assignedBucketId != null) {
+ existingMeta.setBucketId(assignedBucketId);
+ doAs(TableRuntimeMapper.class, mapper ->
mapper.updateRuntime(existingMeta));
Review Comment:
Backfilling bucket_id for an existing runtime uses
updateRuntime(existingMeta), which updates the full row, not just bucket_id.
That can overwrite concurrent changes to status/config/summary with stale
values read earlier.
This should be a dedicated `UPDATE ... SET bucket_id = ? WHERE table_id = ?
AND bucket_id IS NULL` instead.
##########
amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java:
##########
@@ -62,6 +62,13 @@ public class AmoroManagementConf {
"This setting controls whether to enable the AMS horizontal
scaling feature, "
+ "which is currently under development and testing.");
+ public static final ConfigOption<Duration> BUCKET_TABLE_SYNC_INTERVAL =
+ ConfigOptions.key("bucket-table-sync.interval")
Review Comment:
The new config naming is a bit misleading. `bucket-table-sync.interval` is
HA/master-slave-specific, but it is exposed as a top-level AMS config.
`ha.bucket-table-sync.interval` would better reflect its scope and align with
the rest of the HA settings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]