This is an automated email from the ASF dual-hosted git repository.
xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new bf222b7b6 [Subtask]: Modify DefaultTableService to be compatible with
master-slave mode. (#3927)
bf222b7b6 is described below
commit bf222b7b6b57b01b3d46a6ad469ea9afce89fb16
Author: can <[email protected]>
AuthorDate: Wed Mar 25 16:17:42 2026 +0800
[Subtask]: Modify DefaultTableService to be compatible with master-slave
mode. (#3927)
* [Subtask]: Use a new configuration item to control whether master & slave
mode is enabled. #3845
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Fix unit test failure issue #3923
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: Optimized based on CR feedback. #3923
* [Subtask]: Optimized based on CR feedback. #3923
* [Subtask]: Optimized based on CR feedback. #3923
* [Subtask]: Fixed a legacy bug that could cause unit tests to fail during
compilation. #3923
* [Subtask]: Optimized based on CR feedback. #3923
---
.../apache/amoro/server/AmoroManagementConf.java | 7 +
.../apache/amoro/server/AmoroServiceContainer.java | 17 +-
.../org/apache/amoro/server/AmsAssignService.java | 17 +-
.../ha/DataBaseHighAvailabilityContainer.java | 5 +
.../amoro/server/ha/HighAvailabilityContainer.java | 7 +
.../server/ha/NoopHighAvailabilityContainer.java | 5 +
.../amoro/server/persistence/BucketIdCount.java | 41 +++
.../persistence/mapper/TableRuntimeMapper.java | 50 +++
.../amoro/server/table/DefaultTableService.java | 364 ++++++++++++++++++++-
amoro-ams/src/main/resources/mysql/upgrade.sql | 12 +
amoro-ams/src/main/resources/postgres/upgrade.sql | 14 +-
.../apache/amoro/server/AMSServiceTestBase.java | 5 +-
.../apache/amoro/server/TestAmsAssignService.java | 10 +-
.../server/optimizing/TestOptimizingQueue.java | 38 ++-
.../optimizing/maintainer/TestDataExpire.java | 2 +-
.../optimizing/maintainer/TestSnapshotExpire.java | 2 +-
.../iceberg/maintainer/IcebergTableMaintainer.java | 9 +-
docs/configuration/ams-config.md | 1 +
18 files changed, 581 insertions(+), 25 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index a3c5599be..fd0e70cf2 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -307,6 +307,13 @@ public class AmoroManagementConf {
.withDescription(
"Interval for bucket assignment service to detect node changes
and redistribute bucket IDs.");
+ public static final ConfigOption<Duration> HA_BUCKET_TABLE_SYNC_INTERVAL =
+ ConfigOptions.key("ha.bucket-table-sync.interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "Interval for syncing tables assigned to bucket IDs in
master-slave mode. Each node periodically loads tables from database based on
its assigned bucket IDs.");
+
public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.table-service.bind-port")
.intType()
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 4b38c9842..c8144f714 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -250,12 +250,17 @@ public class AmoroServiceContainer {
DefaultTableRuntimeFactory defaultRuntimeFactory = new
DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
- // In master-slave mode, create AmsAssignService for bucket assignment
+
+ BucketAssignStore bucketAssignStore = null;
if (IS_MASTER_SLAVE_MODE && haContainer != null) {
+ bucketAssignStore = BucketAssignStoreFactory.create(haContainer,
serviceConfig);
+ }
+
+ // In master-slave mode, create AmsAssignService for bucket assignment
(shares BucketAssignStore
+ // with DefaultTableService).
+ if (IS_MASTER_SLAVE_MODE && haContainer != null && bucketAssignStore !=
null) {
try {
- // Create and start AmsAssignService for bucket assignment
- // The factory will handle different HA types (ZK, database, etc.)
- amsAssignService = new AmsAssignService(haContainer, serviceConfig);
+ amsAssignService = new AmsAssignService(haContainer, serviceConfig,
bucketAssignStore);
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
} catch (UnsupportedOperationException e) {
@@ -267,7 +272,9 @@ public class AmoroServiceContainer {
List<ActionCoordinator> actionCoordinators =
defaultRuntimeFactory.supportedCoordinators();
- tableService = new DefaultTableService(serviceConfig, catalogManager,
defaultRuntimeFactory);
+ tableService =
+ new DefaultTableService(
+ serviceConfig, catalogManager, defaultRuntimeFactory, haContainer,
bucketAssignStore);
processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager,
optimizerManager, tableService);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
index dfee1ce8d..186ba1bcf 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -67,6 +67,18 @@ public class AmsAssignService {
}
public AmsAssignService(HighAvailabilityContainer haContainer,
Configurations serviceConfig) {
+ this(haContainer, serviceConfig, null);
+ }
+
+ /**
+ * @param assignStore if non-null, used as the bucket assignment store;
otherwise one is created
+ * via {@link BucketAssignStoreFactory} (same instance can be shared
with {@code
+ * DefaultTableService}).
+ */
+ public AmsAssignService(
+ HighAvailabilityContainer haContainer,
+ Configurations serviceConfig,
+ BucketAssignStore assignStore) {
this.haContainer = haContainer;
this.serviceConfig = serviceConfig;
this.bucketIdTotalCount =
@@ -75,7 +87,10 @@ public class AmsAssignService {
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
this.assignIntervalSeconds =
serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
- this.assignStore = BucketAssignStoreFactory.create(haContainer,
serviceConfig);
+ this.assignStore =
+ assignStore != null
+ ? assignStore
+ : BucketAssignStoreFactory.create(haContainer, serviceConfig);
}
/**
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index c97951515..4edcd74fa 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -175,6 +175,11 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
return isLeader.get();
}
+ @Override
+ public AmsServerInfo getTableServiceServerInfo() {
+ return tableServiceServerInfo;
+ }
+
/** Closes the heartbeat executor safely. */
@Override
public void close() {
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 7139bb679..1afeb73f5 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -68,4 +68,11 @@ public interface HighAvailabilityContainer {
* @return true if the current AMS node is the primary node, false otherwise
*/
boolean hasLeadership();
+
+ /**
+ * Get current AMS node information.
+ *
+ * @return {@link AmsServerInfo}
+ */
+ AmsServerInfo getTableServiceServerInfo();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index f5fd040af..4638e5a17 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -61,4 +61,9 @@ public class NoopHighAvailabilityContainer implements
HighAvailabilityContainer
public boolean hasLeadership() {
return false;
}
+
+ @Override
+ public AmsServerInfo getTableServiceServerInfo() {
+ return null;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
new file mode 100644
index 000000000..6441b67fd
--- /dev/null
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+/** Simple class to hold bucketId and its table count. */
+public class BucketIdCount {
+ private String bucketId;
+ private Integer count;
+
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ public void setBucketId(String bucketId) {
+ this.bucketId = bucketId;
+ }
+
+ public Integer getCount() {
+ return count;
+ }
+
+ public void setCount(Integer count) {
+ this.count = count;
+ }
+}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
index 3cfb69f2e..8509c485c 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
@@ -18,6 +18,7 @@
package org.apache.amoro.server.persistence.mapper;
+import org.apache.amoro.server.persistence.BucketIdCount;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.TableRuntimeState;
import org.apache.ibatis.annotations.Delete;
@@ -59,6 +60,17 @@ public interface TableRuntimeMapper {
+ " WHERE table_id = #{tableId}")
int updateRuntime(TableRuntimeMeta meta);
+ /**
+ * Sets bucket_id only when it is still null. Avoids overwriting
status/config/summary with stale
+ * snapshots from a prior read.
+ */
+ @Update(
+ "UPDATE "
+ + TABLE_NAME
+ + " SET bucket_id = #{bucketId, jdbcType=VARCHAR} "
+ + "WHERE table_id = #{tableId} AND bucket_id IS NULL")
+ int updateBucketIdIfNull(@Param("tableId") Long tableId, @Param("bucketId")
String bucketId);
+
/* ---------- delete ---------- */
@Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}")
int deleteRuntime(@Param("tableId") Long tableId);
@@ -102,6 +114,29 @@ public interface TableRuntimeMapper {
@ResultMap("tableRuntimeMeta")
List<TableRuntimeMeta> selectAllRuntimes();
+ @Select(
+ "<script>"
+ + "SELECT "
+ + SELECT_COLS
+ + "FROM "
+ + TABLE_NAME
+ + " WHERE bucket_id IN "
+ + "<foreach item='item' collection='bucketIds' open='('
separator=',' close=')'>"
+ + "#{item}"
+ + "</foreach>"
+ + "<if test='includeNullBucketId'> OR bucket_id IS NULL </if>"
+ + "</script>")
+ /**
+ * Select runtimes by bucket ids.
+ *
+ * @param includeNullBucketId false = only rows with bucket_id in list
(master-slave); true = also
+ * include bucket_id IS NULL (e.g. for non-master-slave compatibility)
+ */
+ @ResultMap("tableRuntimeMeta")
+ List<TableRuntimeMeta> selectRuntimesByBucketIds(
+ @Param("bucketIds") List<String> bucketIds,
+ @Param("includeNullBucketId") boolean includeNullBucketId);
+
@Select(
"<script>"
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'mysql'\" />"
@@ -180,4 +215,19 @@ public interface TableRuntimeMapper {
@Delete("DELETE FROM " + STATE_TABLE_NAME + " WHERE table_id = #{tableId}")
void removeAllTableStates(@Param("tableId") long tableId);
+
+ /**
+ * Count tables per bucketId. Returns a map where key is bucketId and value
is the count of tables
+ * for that bucketId. Only counts non-null and non-empty bucketIds.
+ */
+ @Select(
+ "SELECT bucket_id, COUNT(*) as table_count FROM "
+ + TABLE_NAME
+ + " WHERE bucket_id IS NOT NULL AND bucket_id != '' "
+ + "GROUP BY bucket_id")
+ @Results({
+ @Result(column = "bucket_id", property = "bucketId"),
+ @Result(column = "table_count", property = "count")
+ })
+ List<BucketIdCount> countTablesByBucketId();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index fa0ee873c..27acb89d1 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -24,15 +24,19 @@ import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.api.CatalogMeta;
+import org.apache.amoro.client.AmsServerInfo;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.BucketAssignStore;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.persistence.BucketIdCount;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.TableRuntimeState;
@@ -50,11 +54,16 @@ import org.apache.amoro.utils.TablePropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -87,19 +96,41 @@ public class DefaultTableService extends PersistentBase
implements TableService
private final Configurations serverConfiguration;
private final CatalogManager catalogManager;
private final TableRuntimeFactory tableRuntimeFactory;
+ private final HighAvailabilityContainer haContainer;
+ private final BucketAssignStore bucketAssignStore;
+ private final boolean isMasterSlaveMode;
private RuntimeHandlerChain headHandler;
private ExecutorService tableExplorerExecutors;
+ // Master-slave mode related fields
+ private ScheduledExecutorService bucketTableSyncScheduler;
+ private volatile List<String> assignedBucketIds = new ArrayList<>();
+ private final long bucketTableSyncInterval;
+
public DefaultTableService(
Configurations configuration,
CatalogManager catalogManager,
TableRuntimeFactory tableRuntimeFactory) {
+ this(configuration, catalogManager, tableRuntimeFactory, null, null);
+ }
+
+ public DefaultTableService(
+ Configurations configuration,
+ CatalogManager catalogManager,
+ TableRuntimeFactory tableRuntimeFactory,
+ HighAvailabilityContainer haContainer,
+ BucketAssignStore bucketAssignStore) {
this.catalogManager = catalogManager;
this.externalCatalogRefreshingInterval =
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
this.serverConfiguration = configuration;
this.tableRuntimeFactory = tableRuntimeFactory;
this.tableRuntimeFactory.withTableLoader(this::loadTable);
+ this.haContainer = haContainer;
+ this.bucketAssignStore = bucketAssignStore;
+ this.isMasterSlaveMode =
configuration.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+ this.bucketTableSyncInterval =
+
configuration.get(AmoroManagementConf.HA_BUCKET_TABLE_SYNC_INTERVAL).toMillis();
}
@Override
@@ -156,8 +187,33 @@ public class DefaultTableService extends PersistentBase
implements TableService
public void initialize() {
checkNotStarted();
- List<TableRuntimeMeta> tableRuntimeMetaList =
- getAs(TableRuntimeMapper.class, TableRuntimeMapper::selectAllRuntimes);
+ List<TableRuntimeMeta> tableRuntimeMetaList;
+ if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null)
{
+ // In master-slave mode, load only tables assigned to this node
+ try {
+ updateAssignedBucketIds();
+ if (!assignedBucketIds.isEmpty()) {
+ tableRuntimeMetaList =
+ getAs(
+ TableRuntimeMapper.class,
+ mapper ->
mapper.selectRuntimesByBucketIds(assignedBucketIds, false));
+ LOG.info(
+ "Master-slave mode: Loaded {} tables for assigned bucketIds: {}",
+ tableRuntimeMetaList.size(),
+ assignedBucketIds);
+ } else {
+ tableRuntimeMetaList = new ArrayList<>();
+ LOG.info("Master-slave mode: No bucketIds assigned to this node
yet");
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load tables for assigned bucketIds in
master-slave mode", e);
+ tableRuntimeMetaList = new ArrayList<>();
+ }
+ } else {
+ // Non-master-slave mode: load all tables
+ tableRuntimeMetaList = getAs(TableRuntimeMapper.class,
TableRuntimeMapper::selectAllRuntimes);
+ }
+
Map<Long, ServerTableIdentifier> identifierMap =
getAs(TableMetaMapper.class,
TableMetaMapper::selectAllTableIdentifiers).stream()
.collect(Collectors.toMap(ServerTableIdentifier::getId,
Function.identity()));
@@ -222,6 +278,26 @@ public class DefaultTableService extends PersistentBase
implements TableService
initialized.complete(true);
tableExplorerScheduler.scheduleAtFixedRate(
this::exploreTableRuntimes, 0, externalCatalogRefreshingInterval,
TimeUnit.MILLISECONDS);
+
+ if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null)
{
+ // In master-slave mode, start periodic sync for assigned bucket tables
+ // Delay the first sync to allow AmsAssignService to assign bucketIds
first
+ bucketTableSyncScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setNameFormat("bucket-table-sync-scheduler-%d")
+ .setDaemon(true)
+ .build());
+ bucketTableSyncScheduler.scheduleAtFixedRate(
+ this::syncBucketTables,
+ bucketTableSyncInterval,
+ bucketTableSyncInterval,
+ TimeUnit.MILLISECONDS);
+ LOG.info(
+ "Master-slave mode: Started bucket table sync scheduler with
interval {} ms (first sync delayed by {} ms)",
+ bucketTableSyncInterval,
+ bucketTableSyncInterval);
+ }
}
@Override
@@ -250,6 +326,9 @@ public class DefaultTableService extends PersistentBase
implements TableService
@Override
public void dispose() {
tableExplorerScheduler.shutdown();
+ if (bucketTableSyncScheduler != null) {
+ bucketTableSyncScheduler.shutdown();
+ }
if (tableExplorerExecutors != null) {
tableExplorerExecutors.shutdown();
}
@@ -259,8 +338,212 @@ public class DefaultTableService extends PersistentBase
implements TableService
tableRuntimeMap.values().forEach(TableRuntime::unregisterMetric);
}
+ /**
+ * Update assigned bucket IDs from AssignStore. This should be called
periodically to refresh the
+ * bucket assignments.
+ */
+ private void updateAssignedBucketIds() {
+ if (haContainer == null || bucketAssignStore == null) {
+ LOG.warn(
+ "No assigned bucket ids found. check if haContainer == null or
bucketAssignStore == null");
+ return;
+ }
+ try {
+ AmsServerInfo currentServerInfo =
haContainer.getTableServiceServerInfo();
+ if (currentServerInfo == null) {
+ LOG.warn("Cannot get current server info, skip updating assigned
bucketIds");
+ return;
+ }
+ List<String> newBucketIds =
bucketAssignStore.getAssignments(currentServerInfo);
+ if (!newBucketIds.equals(assignedBucketIds)) {
+ LOG.info("Assigned bucketIds changed from {} to {}",
assignedBucketIds, newBucketIds);
+ assignedBucketIds = new ArrayList<>(newBucketIds);
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to update assigned bucketIds", e);
+ }
+ }
+
+ /**
+ * Sync tables for assigned bucket IDs. This method is called periodically
in master-slave mode.
+ */
+ private void syncBucketTables() {
+ if (!isMasterSlaveMode || haContainer == null || bucketAssignStore ==
null) {
+ return;
+ }
+ try {
+ updateAssignedBucketIds();
+ if (assignedBucketIds.isEmpty()) {
+ // In master-slave mode, if no bucketIds are assigned yet, it's normal
during startup
+ // The AmsAssignService will assign bucketIds later
+ LOG.debug("No bucketIds assigned to this node yet, skip syncing tables
(will retry later)");
+ return;
+ }
+
+ LOG.info("syncBucketTables assignedBucketIds:{}", assignedBucketIds);
+ // Load tables from database for assigned bucketIds
+ List<TableRuntimeMeta> tableRuntimeMetaList =
+ getAs(
+ TableRuntimeMapper.class,
+ mapper -> mapper.selectRuntimesByBucketIds(assignedBucketIds,
false));
+
+ Map<Long, ServerTableIdentifier> identifierMap =
+ getAs(TableMetaMapper.class,
TableMetaMapper::selectAllTableIdentifiers).stream()
+ .collect(Collectors.toMap(ServerTableIdentifier::getId,
Function.identity()));
+
+ Map<Long, List<TableRuntimeState>> statesMap =
+ getAs(TableRuntimeMapper.class,
TableRuntimeMapper::selectAllStates).stream()
+ .collect(
+ Collectors.toMap(
+ TableRuntimeState::getTableId,
+ Lists::newArrayList,
+ (a, b) -> {
+ a.addAll(b);
+ return a;
+ }));
+
+ // Find tables that should be added (in DB but not in memory)
+ Set<Long> currentTableIds = new HashSet<>(tableRuntimeMap.keySet());
+ Set<Long> dbTableIds =
+ tableRuntimeMetaList.stream()
+ .map(TableRuntimeMeta::getTableId)
+ .collect(Collectors.toSet());
+
+ // Add new tables
+ for (TableRuntimeMeta tableRuntimeMeta : tableRuntimeMetaList) {
+ Long tableId = tableRuntimeMeta.getTableId();
+ if (!currentTableIds.contains(tableId)) {
+ ServerTableIdentifier identifier = identifierMap.get(tableId);
+ if (identifier == null) {
+ LOG.warn("No available table identifier found for table runtime
meta id={}", tableId);
+ continue;
+ }
+ List<TableRuntimeState> states = statesMap.get(tableId);
+ // Use empty list if states is null to avoid NullPointerException
+ if (states == null) {
+ states = Collections.emptyList();
+ }
+ Optional<TableRuntime> tableRuntime =
+ createTableRuntime(identifier, tableRuntimeMeta, states);
+ if (tableRuntime.isPresent()) {
+ TableRuntime runtime = tableRuntime.get();
+
runtime.registerMetric(MetricManager.getInstance().getGlobalRegistry());
+ tableRuntimeMap.put(tableId, runtime);
+ if (headHandler != null) {
+ AmoroTable<?> table = loadTable(identifier);
+ if (table != null) {
+ headHandler.fireTableAdded(table, runtime);
+ }
+ }
+ LOG.info("Added table {} for bucketId {}", tableId,
tableRuntimeMeta.getBucketId());
+ }
+ }
+ }
+
+ // Remove tables that are no longer assigned to this node
+ List<Long> tablesToRemove = new ArrayList<>();
+ for (Long tableId : currentTableIds) {
+ if (!dbTableIds.contains(tableId)) {
+ // Check if this table's bucketId is still assigned to this node
+ TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+ if (tableRuntime != null) {
+ // Get bucketId from database
+ TableRuntimeMeta meta =
+ getAs(TableRuntimeMapper.class, mapper ->
mapper.selectRuntime(tableId));
+ if (meta != null && meta.getBucketId() != null) {
+ if (!assignedBucketIds.contains(meta.getBucketId())) {
+ tablesToRemove.add(tableId);
+ }
+ } else if (meta == null || meta.getBucketId() == null) {
+ // Table removed from database or bucketId is null
+ tablesToRemove.add(tableId);
+ }
+ }
+ }
+ }
+
+ for (Long tableId : tablesToRemove) {
+ TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+ if (tableRuntime != null) {
+ try {
+ if (headHandler != null) {
+ headHandler.fireTableRemoved(tableRuntime);
+ }
+ tableRuntime.dispose();
+ tableRuntimeMap.remove(tableId);
+ LOG.info("Removed table {} as it's no longer assigned to this
node", tableId);
+ } catch (Exception e) {
+ LOG.error("Error occurred while removing tableRuntime of table
{}", tableId, e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error during bucket table sync", e);
+ }
+ }
+
+ /**
+ * Assign bucketId to a table using min-heap strategy. This ensures tables
are evenly distributed
+ * across bucketIds.
+ *
+ * @return The assigned bucketId, or null if assignment fails
+ */
+ private String assignBucketIdForTable() {
+ try {
+ List<BucketIdCount> bucketIdCounts =
+ getAs(
+ TableRuntimeMapper.class,
+ (TableRuntimeMapper mapper) -> mapper.countTablesByBucketId());
+
+ Map<String, Integer> bucketTableCount = new HashMap<>();
+ int bucketIdTotalCount =
+
serverConfiguration.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+ for (int i = 1; i <= bucketIdTotalCount; i++) {
+ bucketTableCount.put(String.valueOf(i), 0);
+ }
+
+ for (BucketIdCount bucketIdCount : bucketIdCounts) {
+ String bucketId = bucketIdCount.getBucketId();
+ if (bucketId != null && !bucketId.trim().isEmpty()) {
+ bucketTableCount.put(bucketId, bucketIdCount.getCount());
+ }
+ }
+
+ PriorityQueue<Map.Entry<String, Integer>> minHeap =
+ new PriorityQueue<>(
+ Comparator.<Map.Entry<String,
Integer>>comparingInt(Map.Entry::getValue)
+ .thenComparing(Map.Entry::getKey));
+
+ for (Map.Entry<String, Integer> entry : bucketTableCount.entrySet()) {
+ minHeap.offer(new AbstractMap.SimpleEntry<>(entry.getKey(),
entry.getValue()));
+ }
+
+ if (!minHeap.isEmpty()) {
+ Map.Entry<String, Integer> selected = minHeap.poll();
+ String assignedBucketId = selected.getKey();
+ int tableCount = selected.getValue();
+ LOG.debug(
+ "Assigned bucketId {} to new table (min table count: {})",
+ assignedBucketId,
+ tableCount);
+ return assignedBucketId;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to assign bucketId for table", e);
+ }
+ return null;
+ }
+
@VisibleForTesting
void exploreTableRuntimes() {
+ // In master-slave mode (fully wired), only leader node should explore
table runtimes
+ if (isMasterSlaveMode
+ && haContainer != null
+ && bucketAssignStore != null
+ && !haContainer.hasLeadership()) {
+ LOG.debug("Not the leader node in master-slave mode, skip exploring
table runtimes");
+ return;
+ }
if (!initialized.isDone()) {
throw new IllegalStateException("TableService is not initialized");
}
@@ -289,12 +572,30 @@ public class DefaultTableService extends PersistentBase
implements TableService
// timely manner during the process of dropping the catalog due to
concurrency considerations.
// It is permissible to have some erroneous states in the middle, as long
as the final data is
// consistent.
+ // In master-slave mode, only clean up tables assigned to this node
Set<String> catalogNames =
catalogManager.listCatalogMetas().stream()
.map(CatalogMeta::getCatalogName)
.collect(Collectors.toSet());
for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
if
(!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
+ // In master-slave mode, only dispose tables assigned to this node
+ if (isMasterSlaveMode && haContainer != null && bucketAssignStore !=
null) {
+ try {
+ TableRuntimeMeta meta =
+ getAs(
+ TableRuntimeMapper.class,
+ mapper ->
mapper.selectRuntime(tableRuntime.getTableIdentifier().getId()));
+ if (meta != null && meta.getBucketId() != null) {
+ if (!assignedBucketIds.contains(meta.getBucketId())) {
+ // Not assigned to this node, skip
+ continue;
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to check bucketId for table {}",
tableRuntime.getTableIdentifier(), e);
+ }
+ }
disposeTable(tableRuntime.getTableIdentifier());
}
}
@@ -495,8 +796,67 @@ public class DefaultTableService extends PersistentBase
implements TableService
meta.setStatusCode(OptimizingStatus.IDLE.getCode());
meta.setGroupName(configuration.getOptimizingConfig().getOptimizerGroup());
meta.setTableSummary(new TableSummary());
+
+ // In master-slave mode (fully wired), assign bucketId to the table if
it's not assigned yet.
+ // Only leader node should assign bucketIds; follower may still persist
the table with null
+ // bucketId (e.g. onTableCreated on follower), and leader will assign
later via exploration.
+ String assignedBucketId = null;
+ if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null)
{
+ if (haContainer.hasLeadership()) {
+ TableRuntimeMeta existingMeta =
+ getAs(
+ TableRuntimeMapper.class,
+ mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
+ if (existingMeta != null) {
+ // Runtime already exists (e.g. inserted by follower with null
bucketId)
+ if (existingMeta.getBucketId() != null) {
+ return true; // already assigned
+ }
+ assignedBucketId = assignBucketIdForTable();
+ if (assignedBucketId != null) {
+ String finalAssignedBucketId = assignedBucketId;
+ long updated =
+ updateAs(
+ TableRuntimeMapper.class,
+ mapper ->
+ mapper.updateBucketIdIfNull(
+ serverTableIdentifier.getId(),
finalAssignedBucketId));
+ if (updated == 1) {
+ LOG.info(
+ "Assigned bucketId {} to existing table {} (was null)",
+ assignedBucketId,
+ serverTableIdentifier);
+ } else {
+ LOG.debug(
+ "Skipped backfill bucketId {} for table {} (row missing or
bucket_id already set)",
+ assignedBucketId,
+ serverTableIdentifier);
+ }
+ }
+ return true; // handled existing row (assigned or failed to assign)
+ }
+ assignedBucketId = assignBucketIdForTable();
+ if (assignedBucketId != null) {
+ meta.setBucketId(assignedBucketId);
+ LOG.info("Assigned bucketId {} to table {}", assignedBucketId,
serverTableIdentifier);
+ } else {
+ LOG.warn(
+ "Failed to assign bucketId to table {}, will be assigned later",
+ serverTableIdentifier);
+ }
+ }
+ // else: follower does not assign; meta.bucketId remains null until
leader assigns
+ }
+
doAs(TableRuntimeMapper.class, mapper -> mapper.insertRuntime(meta));
+ // Only skip local runtime creation when master-slave mode is fully wired
(bucketAssignStore
+ // is non-null). When bucketAssignStore is null (e.g. 3-arg test
constructor), fall through
+ // and create the runtime in memory as in non-master-slave mode.
+ if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null)
{
+ return true;
+ }
+
Optional<TableRuntime> tableRuntimeOpt =
createTableRuntime(serverTableIdentifier, meta,
Collections.emptyList());
if (!tableRuntimeOpt.isPresent()) {
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index b36064109..dd3c85cb2 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -161,3 +161,15 @@ ALTER TABLE `table_process`
ADD COLUMN `external_process_identifier` varchar(256) DEFAULT NULL COMMENT
'Table optimizing external process identifier',
ADD COLUMN `retry_number` int(11) NOT NULL DEFAULT 0 COMMENT 'Retry times',
ADD COLUMN `process_parameters` mediumtext COMMENT 'Table process parameters';
+
+-- ADD table bucket_assignments for storing assigned info
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+ cluster_name
VARCHAR(64) NOT NULL COMMENT 'AMS cluster name',
+ node_key VARCHAR(256) NOT NULL COMMENT 'Node key
(host:thriftBindPort)',
+ server_info_json TEXT NULL COMMENT 'JSON encoded AmsServerInfo',
+ assignments_json TEXT NULL COMMENT 'JSON array of bucket IDs',
+ last_update_time BIGINT NOT NULL DEFAULT 0 COMMENT 'Last update
timestamp (ms since epoch)',
+ PRIMARY KEY (cluster_name, node_key)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per
AMS node for master-slave mode';
+
+
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 5cae2d34f..5797d704e 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -220,4 +220,16 @@ ALTER TABLE table_runtime ADD COLUMN bucket_id varchar(4);
ALTER TABLE table_process
ADD COLUMN external_process_identifier varchar(256) DEFAULT NULL,
ADD COLUMN retry_number int NOT NULL,
-ADD COLUMN process_parameters text;
\ No newline at end of file
+ADD COLUMN process_parameters text;
+
+-- ADD table bucket_assignments for storing assigned info
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+ cluster_name
VARCHAR(64) NOT NULL,
+ node_key VARCHAR(256) NOT NULL,
+ server_info_json TEXT NULL,
+ assignments_json TEXT NULL,
+ last_update_time BIGINT NOT NULL DEFAULT 0,
+ PRIMARY KEY (cluster_name, node_key)
+ );
+
+CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON
bucket_assignments (cluster_name);
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 34f8f4685..390c93446 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -58,9 +58,12 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain());
TABLE_SERVICE.initialize();
+ ResourceGroup group = defaultResourceGroup();
try {
- ResourceGroup group = defaultResourceGroup();
OPTIMIZER_MANAGER.createResourceGroup(group);
+ } catch (Throwable ignored) {
+ }
+ try {
OPTIMIZING_SERVICE.createResourceGroup(group);
} catch (Throwable ignored) {
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
index 57f678db8..4b283c668 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -553,15 +553,7 @@ public class TestAmsAssignService {
/** Create AmsAssignService with mock BucketAssignStore. */
private AmsAssignService
createAssignServiceWithMockStore(HighAvailabilityContainer container)
throws Exception {
- AmsAssignService service = new AmsAssignService(container, serviceConfig);
-
- // Use reflection to inject mock assign store
- java.lang.reflect.Field assignStoreField =
- AmsAssignService.class.getDeclaredField("assignStore");
- assignStoreField.setAccessible(true);
- assignStoreField.set(service, mockAssignStore);
-
- return service;
+ return new AmsAssignService(container, serviceConfig, mockAssignStore);
}
/** Create a mock CuratorFramework that uses MockZkState for storage. */
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 85316660b..014189f64 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -116,6 +117,24 @@ public class TestOptimizingQueue extends AMSTableTestBase {
return new ResourceGroup.Builder("test", "local").build();
}
+ @Before
+ public void setUp() {
+ // Clean up any existing metrics for the test resource group before each
test
+ // to avoid "Metric is already been registered" errors
+ MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry();
+ String testGroupName = testResourceGroup().getName();
+
+ // Unregister all metrics for the test resource group
+ List<MetricKey> keysToRemove = new ArrayList<>();
+ for (MetricKey key : registry.getMetrics().keySet()) {
+ if (key.getDefine().getName().startsWith("optimizer_group_")
+ && testGroupName.equals(key.valueOfTag(GROUP_TAG))) {
+ keysToRemove.add(key);
+ }
+ }
+ keysToRemove.forEach(registry::unregister);
+ }
+
protected OptimizingQueue buildOptimizingGroupService(DefaultTableRuntime
tableRuntime) {
return new OptimizingQueue(
CATALOG_MANAGER,
@@ -289,12 +308,27 @@ public class TestOptimizingQueue extends AMSTableTestBase
{
queue.refreshTable(tableRuntime2);
queue.refreshTable(tableRuntime);
+ // Poll two tasks and verify they come from different tables
+ // The order may vary due to async planning, so we check both possibilities
TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task2);
- Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() ==
task2.getTableId());
TaskRuntime<?> task3 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
Assert.assertNotNull(task3);
- Assert.assertTrue(tableRuntime.getTableIdentifier().getId() ==
task3.getTableId());
+
+ // Verify that the two tasks come from the two different tables
+ long tableId1 = tableRuntime.getTableIdentifier().getId();
+ long tableId2 = tableRuntime2.getTableIdentifier().getId();
+ long task2TableId = task2.getTableId();
+ long task3TableId = task3.getTableId();
+
+ Assert.assertTrue(
+ "Task2 should come from one of the two tables",
+ task2TableId == tableId1 || task2TableId == tableId2);
+ Assert.assertTrue(
+ "Task3 should come from one of the two tables",
+ task3TableId == tableId1 || task3TableId == tableId2);
+ Assert.assertNotEquals(
+ "Task2 and Task3 should come from different tables", task2TableId,
task3TableId);
queue.dispose();
tableRuntime2.dispose();
}
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 737bd3878..9b25ba19d 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -542,7 +542,7 @@ public class TestDataExpire extends ExecutorTestBase {
@Test
public void testExpireByPartitionWhenMetricsModeIsNone() {
- assumeTrue(getMixedTable().format().in(TableFormat.MIXED_ICEBERG,
TableFormat.ICEBERG));
+ assumeTrue(getTestFormat().in(TableFormat.MIXED_ICEBERG,
TableFormat.ICEBERG));
getMixedTable()
.updateProperties()
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index 1a391a404..fcd9373a2 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -471,7 +471,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
@Test
public void testBaseTableGcDisabled() {
- Assume.assumeFalse(isKeyedTable());
+ Assume.assumeFalse(getMixedTable().isKeyedTable());
UnkeyedTable testUnkeyedTable = getMixedTable().asUnkeyedTable();
testUnkeyedTable.updateProperties().set("gc.enabled", "false").commit();
diff --git
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index 3d1d4bfb1..2a96a0e9a 100644
---
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
+++
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -685,7 +685,7 @@ public class IcebergTableMaintainer implements
TableMaintainer {
Comparable<?> expireValue = getExpireValue(expirationConfig, field,
expireTimestamp);
return CloseableIterable.transform(
CloseableIterable.withNoopClose(Iterables.concat(dataFiles,
deleteFiles)),
- contentFile -> {
+ (ContentFile<?> contentFile) -> {
Literal<Long> literal =
getExpireTimestampLiteral(
contentFile,
@@ -694,7 +694,12 @@ public class IcebergTableMaintainer implements
TableMaintainer {
expirationConfig.getDateTimePattern(),
Locale.getDefault()),
expirationConfig.getNumberDateFormat(),
expireValue);
- return new FileEntry(contentFile.copyWithoutStats(), literal);
+ // copyWithoutStats() returns ContentFile<?> but with a captured
wildcard type
+ // that needs explicit casting for type inference
+ @SuppressWarnings("rawtypes")
+ ContentFile<?> fileWithoutStats =
+ (ContentFile<?>) ((ContentFile) contentFile.copyWithoutStats());
+ return new FileEntry(fileWithoutStats, literal);
});
}
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index da6cf20eb..ec4f34c39 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -72,6 +72,7 @@ table td:last-child, table th:last-child { width: 40%;
word-break: break-all; }
| expire-snapshots.thread-count | 10 | The number of threads used for
snapshots expiring. |
| ha.bucket-assign.interval | 1 min | Interval for bucket assignment service
to detect node changes and redistribute bucket IDs. |
| ha.bucket-id.total-count | 100 | Total count of bucket IDs for assignment.
Bucket IDs range from 1 to this value. |
+| ha.bucket-table-sync.interval | 1 min | Interval for syncing tables assigned
to bucket IDs in master-slave mode. Each node periodically loads tables from
database based on its assigned bucket IDs. |
| ha.cluster-name | default | Amoro management service cluster name. |
| ha.connection-timeout | 5 min | The Zookeeper connection timeout in
milliseconds. |
| ha.enabled | false | Whether to enable high availability mode. |