This is an automated email from the ASF dual-hosted git repository.

xuba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new bf222b7b6 [Subtask]: Modify DefaultTableService to be compatible with 
master-slave mode. (#3927)
bf222b7b6 is described below

commit bf222b7b6b57b01b3d46a6ad469ea9afce89fb16
Author: can <[email protected]>
AuthorDate: Wed Mar 25 16:17:42 2026 +0800

    [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode. (#3927)
    
    * [Subtask]: Use a new configuration item to control whether master & slave 
mode is enabled. #3845
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]:  Fix unit test failure issue #3923
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: Optimized based on CR feedback. #3923
    
    * [Subtask]: Optimized based on CR feedback. #3923
    
    * [Subtask]: Optimized based on CR feedback. #3923
    
    * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during 
compilation. #3923
    
    * [Subtask]: Optimized based on CR feedback. #3923
---
 .../apache/amoro/server/AmoroManagementConf.java   |   7 +
 .../apache/amoro/server/AmoroServiceContainer.java |  17 +-
 .../org/apache/amoro/server/AmsAssignService.java  |  17 +-
 .../ha/DataBaseHighAvailabilityContainer.java      |   5 +
 .../amoro/server/ha/HighAvailabilityContainer.java |   7 +
 .../server/ha/NoopHighAvailabilityContainer.java   |   5 +
 .../amoro/server/persistence/BucketIdCount.java    |  41 +++
 .../persistence/mapper/TableRuntimeMapper.java     |  50 +++
 .../amoro/server/table/DefaultTableService.java    | 364 ++++++++++++++++++++-
 amoro-ams/src/main/resources/mysql/upgrade.sql     |  12 +
 amoro-ams/src/main/resources/postgres/upgrade.sql  |  14 +-
 .../apache/amoro/server/AMSServiceTestBase.java    |   5 +-
 .../apache/amoro/server/TestAmsAssignService.java  |  10 +-
 .../server/optimizing/TestOptimizingQueue.java     |  38 ++-
 .../optimizing/maintainer/TestDataExpire.java      |   2 +-
 .../optimizing/maintainer/TestSnapshotExpire.java  |   2 +-
 .../iceberg/maintainer/IcebergTableMaintainer.java |   9 +-
 docs/configuration/ams-config.md                   |   1 +
 18 files changed, 581 insertions(+), 25 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
index a3c5599be..fd0e70cf2 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java
@@ -307,6 +307,13 @@ public class AmoroManagementConf {
           .withDescription(
               "Interval for bucket assignment service to detect node changes 
and redistribute bucket IDs.");
 
+  public static final ConfigOption<Duration> HA_BUCKET_TABLE_SYNC_INTERVAL =
+      ConfigOptions.key("ha.bucket-table-sync.interval")
+          .durationType()
+          .defaultValue(Duration.ofSeconds(60))
+          .withDescription(
+              "Interval for syncing tables assigned to bucket IDs in 
master-slave mode. Each node periodically loads tables from database based on 
its assigned bucket IDs.");
+
   public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
       ConfigOptions.key("thrift-server.table-service.bind-port")
           .intType()
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 4b38c9842..c8144f714 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -250,12 +250,17 @@ public class AmoroServiceContainer {
 
     DefaultTableRuntimeFactory defaultRuntimeFactory = new 
DefaultTableRuntimeFactory();
     defaultRuntimeFactory.initialize(processFactories);
-    // In master-slave mode, create AmsAssignService for bucket assignment
+
+    BucketAssignStore bucketAssignStore = null;
     if (IS_MASTER_SLAVE_MODE && haContainer != null) {
+      bucketAssignStore = BucketAssignStoreFactory.create(haContainer, 
serviceConfig);
+    }
+
+    // In master-slave mode, create AmsAssignService for bucket assignment 
(shares BucketAssignStore
+    // with DefaultTableService).
+    if (IS_MASTER_SLAVE_MODE && haContainer != null && bucketAssignStore != 
null) {
       try {
-        // Create and start AmsAssignService for bucket assignment
-        // The factory will handle different HA types (ZK, database, etc.)
-        amsAssignService = new AmsAssignService(haContainer, serviceConfig);
+        amsAssignService = new AmsAssignService(haContainer, serviceConfig, 
bucketAssignStore);
         amsAssignService.start();
         LOG.info("AmsAssignService started for master-slave mode");
       } catch (UnsupportedOperationException e) {
@@ -267,7 +272,9 @@ public class AmoroServiceContainer {
 
     List<ActionCoordinator> actionCoordinators = 
defaultRuntimeFactory.supportedCoordinators();
 
-    tableService = new DefaultTableService(serviceConfig, catalogManager, 
defaultRuntimeFactory);
+    tableService =
+        new DefaultTableService(
+            serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, 
bucketAssignStore);
     processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
     optimizingService =
         new DefaultOptimizingService(serviceConfig, catalogManager, 
optimizerManager, tableService);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
index dfee1ce8d..186ba1bcf 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmsAssignService.java
@@ -67,6 +67,18 @@ public class AmsAssignService {
   }
 
   public AmsAssignService(HighAvailabilityContainer haContainer, 
Configurations serviceConfig) {
+    this(haContainer, serviceConfig, null);
+  }
+
+  /**
+   * @param assignStore if non-null, used as the bucket assignment store; 
otherwise one is created
+   *     via {@link BucketAssignStoreFactory} (same instance can be shared 
with {@code
+   *     DefaultTableService}).
+   */
+  public AmsAssignService(
+      HighAvailabilityContainer haContainer,
+      Configurations serviceConfig,
+      BucketAssignStore assignStore) {
     this.haContainer = haContainer;
     this.serviceConfig = serviceConfig;
     this.bucketIdTotalCount =
@@ -75,7 +87,10 @@ public class AmsAssignService {
         
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
     this.assignIntervalSeconds =
         serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
-    this.assignStore = BucketAssignStoreFactory.create(haContainer, 
serviceConfig);
+    this.assignStore =
+        assignStore != null
+            ? assignStore
+            : BucketAssignStoreFactory.create(haContainer, serviceConfig);
   }
 
   /**
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index c97951515..4edcd74fa 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -175,6 +175,11 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     return isLeader.get();
   }
 
+  @Override
+  public AmsServerInfo getTableServiceServerInfo() {
+    return tableServiceServerInfo;
+  }
+
   /** Closes the heartbeat executor safely. */
   @Override
   public void close() {
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 7139bb679..1afeb73f5 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -68,4 +68,11 @@ public interface HighAvailabilityContainer {
    * @return true if the current AMS node is the primary node, false otherwise
    */
   boolean hasLeadership();
+
+  /**
+   * Get current AMS node information.
+   *
+   * @return {@link AmsServerInfo}
+   */
+  AmsServerInfo getTableServiceServerInfo();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index f5fd040af..4638e5a17 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -61,4 +61,9 @@ public class NoopHighAvailabilityContainer implements 
HighAvailabilityContainer
   public boolean hasLeadership() {
     return false;
   }
+
+  @Override
+  public AmsServerInfo getTableServiceServerInfo() {
+    return null;
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
new file mode 100644
index 000000000..6441b67fd
--- /dev/null
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketIdCount.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.server.persistence;
+
+/** Simple class to hold bucketId and its table count. */
+public class BucketIdCount {
+  private String bucketId;
+  private Integer count;
+
+  public String getBucketId() {
+    return bucketId;
+  }
+
+  public void setBucketId(String bucketId) {
+    this.bucketId = bucketId;
+  }
+
+  public Integer getCount() {
+    return count;
+  }
+
+  public void setCount(Integer count) {
+    this.count = count;
+  }
+}
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
index 3cfb69f2e..8509c485c 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableRuntimeMapper.java
@@ -18,6 +18,7 @@
 
 package org.apache.amoro.server.persistence.mapper;
 
+import org.apache.amoro.server.persistence.BucketIdCount;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
 import org.apache.amoro.server.persistence.TableRuntimeState;
 import org.apache.ibatis.annotations.Delete;
@@ -59,6 +60,17 @@ public interface TableRuntimeMapper {
           + " WHERE table_id = #{tableId}")
   int updateRuntime(TableRuntimeMeta meta);
 
+  /**
+   * Sets bucket_id only when it is still null. Avoids overwriting 
status/config/summary with stale
+   * snapshots from a prior read.
+   */
+  @Update(
+      "UPDATE "
+          + TABLE_NAME
+          + " SET bucket_id = #{bucketId, jdbcType=VARCHAR} "
+          + "WHERE table_id = #{tableId} AND bucket_id IS NULL")
+  int updateBucketIdIfNull(@Param("tableId") Long tableId, @Param("bucketId") 
String bucketId);
+
   /* ---------- delete ---------- */
   @Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}")
   int deleteRuntime(@Param("tableId") Long tableId);
@@ -102,6 +114,29 @@ public interface TableRuntimeMapper {
   @ResultMap("tableRuntimeMeta")
   List<TableRuntimeMeta> selectAllRuntimes();
 
+  @Select(
+      "<script>"
+          + "SELECT "
+          + SELECT_COLS
+          + "FROM "
+          + TABLE_NAME
+          + " WHERE bucket_id IN "
+          + "<foreach item='item' collection='bucketIds' open='(' 
separator=',' close=')'>"
+          + "#{item}"
+          + "</foreach>"
+          + "<if test='includeNullBucketId'> OR bucket_id IS NULL </if>"
+          + "</script>")
+  /**
+   * Select runtimes by bucket ids.
+   *
+   * @param includeNullBucketId false = only rows with bucket_id in list 
(master-slave); true = also
+   *     include bucket_id IS NULL (e.g. for non-master-slave compatibility)
+   */
+  @ResultMap("tableRuntimeMeta")
+  List<TableRuntimeMeta> selectRuntimesByBucketIds(
+      @Param("bucketIds") List<String> bucketIds,
+      @Param("includeNullBucketId") boolean includeNullBucketId);
+
   @Select(
       "<script>"
           + "<bind name=\"isMySQL\" value=\"_databaseId == 'mysql'\" />"
@@ -180,4 +215,19 @@ public interface TableRuntimeMapper {
 
   @Delete("DELETE FROM " + STATE_TABLE_NAME + " WHERE table_id = #{tableId}")
   void removeAllTableStates(@Param("tableId") long tableId);
+
+  /**
+   * Count tables per bucketId. Returns a map where key is bucketId and value 
is the count of tables
+   * for that bucketId. Only counts non-null and non-empty bucketIds.
+   */
+  @Select(
+      "SELECT bucket_id, COUNT(*) as table_count FROM "
+          + TABLE_NAME
+          + " WHERE bucket_id IS NOT NULL AND bucket_id != '' "
+          + "GROUP BY bucket_id")
+  @Results({
+    @Result(column = "bucket_id", property = "bucketId"),
+    @Result(column = "table_count", property = "count")
+  })
+  List<BucketIdCount> countTablesByBucketId();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index fa0ee873c..27acb89d1 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -24,15 +24,19 @@ import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableIDWithFormat;
 import org.apache.amoro.TableRuntime;
 import org.apache.amoro.api.CatalogMeta;
+import org.apache.amoro.client.AmsServerInfo;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.BucketAssignStore;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.catalog.ExternalCatalog;
 import org.apache.amoro.server.catalog.InternalCatalog;
 import org.apache.amoro.server.catalog.ServerCatalog;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
 import org.apache.amoro.server.manager.MetricManager;
 import org.apache.amoro.server.optimizing.OptimizingStatus;
+import org.apache.amoro.server.persistence.BucketIdCount;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
 import org.apache.amoro.server.persistence.TableRuntimeState;
@@ -50,11 +54,16 @@ import org.apache.amoro.utils.TablePropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -87,19 +96,41 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   private final Configurations serverConfiguration;
   private final CatalogManager catalogManager;
   private final TableRuntimeFactory tableRuntimeFactory;
+  private final HighAvailabilityContainer haContainer;
+  private final BucketAssignStore bucketAssignStore;
+  private final boolean isMasterSlaveMode;
   private RuntimeHandlerChain headHandler;
   private ExecutorService tableExplorerExecutors;
 
+  // Master-slave mode related fields
+  private ScheduledExecutorService bucketTableSyncScheduler;
+  private volatile List<String> assignedBucketIds = new ArrayList<>();
+  private final long bucketTableSyncInterval;
+
   public DefaultTableService(
       Configurations configuration,
       CatalogManager catalogManager,
       TableRuntimeFactory tableRuntimeFactory) {
+    this(configuration, catalogManager, tableRuntimeFactory, null, null);
+  }
+
+  public DefaultTableService(
+      Configurations configuration,
+      CatalogManager catalogManager,
+      TableRuntimeFactory tableRuntimeFactory,
+      HighAvailabilityContainer haContainer,
+      BucketAssignStore bucketAssignStore) {
     this.catalogManager = catalogManager;
     this.externalCatalogRefreshingInterval =
         
configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis();
     this.serverConfiguration = configuration;
     this.tableRuntimeFactory = tableRuntimeFactory;
     this.tableRuntimeFactory.withTableLoader(this::loadTable);
+    this.haContainer = haContainer;
+    this.bucketAssignStore = bucketAssignStore;
+    this.isMasterSlaveMode = 
configuration.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+    this.bucketTableSyncInterval =
+        
configuration.get(AmoroManagementConf.HA_BUCKET_TABLE_SYNC_INTERVAL).toMillis();
   }
 
   @Override
@@ -156,8 +187,33 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   public void initialize() {
     checkNotStarted();
 
-    List<TableRuntimeMeta> tableRuntimeMetaList =
-        getAs(TableRuntimeMapper.class, TableRuntimeMapper::selectAllRuntimes);
+    List<TableRuntimeMeta> tableRuntimeMetaList;
+    if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null) 
{
+      // In master-slave mode, load only tables assigned to this node
+      try {
+        updateAssignedBucketIds();
+        if (!assignedBucketIds.isEmpty()) {
+          tableRuntimeMetaList =
+              getAs(
+                  TableRuntimeMapper.class,
+                  mapper -> 
mapper.selectRuntimesByBucketIds(assignedBucketIds, false));
+          LOG.info(
+              "Master-slave mode: Loaded {} tables for assigned bucketIds: {}",
+              tableRuntimeMetaList.size(),
+              assignedBucketIds);
+        } else {
+          tableRuntimeMetaList = new ArrayList<>();
+          LOG.info("Master-slave mode: No bucketIds assigned to this node 
yet");
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to load tables for assigned bucketIds in 
master-slave mode", e);
+        tableRuntimeMetaList = new ArrayList<>();
+      }
+    } else {
+      // Non-master-slave mode: load all tables
+      tableRuntimeMetaList = getAs(TableRuntimeMapper.class, 
TableRuntimeMapper::selectAllRuntimes);
+    }
+
     Map<Long, ServerTableIdentifier> identifierMap =
         getAs(TableMetaMapper.class, 
TableMetaMapper::selectAllTableIdentifiers).stream()
             .collect(Collectors.toMap(ServerTableIdentifier::getId, 
Function.identity()));
@@ -222,6 +278,26 @@ public class DefaultTableService extends PersistentBase 
implements TableService
     initialized.complete(true);
     tableExplorerScheduler.scheduleAtFixedRate(
         this::exploreTableRuntimes, 0, externalCatalogRefreshingInterval, 
TimeUnit.MILLISECONDS);
+
+    if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null) 
{
+      // In master-slave mode, start periodic sync for assigned bucket tables
+      // Delay the first sync to allow AmsAssignService to assign bucketIds 
first
+      bucketTableSyncScheduler =
+          Executors.newSingleThreadScheduledExecutor(
+              new ThreadFactoryBuilder()
+                  .setNameFormat("bucket-table-sync-scheduler-%d")
+                  .setDaemon(true)
+                  .build());
+      bucketTableSyncScheduler.scheduleAtFixedRate(
+          this::syncBucketTables,
+          bucketTableSyncInterval,
+          bucketTableSyncInterval,
+          TimeUnit.MILLISECONDS);
+      LOG.info(
+          "Master-slave mode: Started bucket table sync scheduler with 
interval {} ms (first sync delayed by {} ms)",
+          bucketTableSyncInterval,
+          bucketTableSyncInterval);
+    }
   }
 
   @Override
@@ -250,6 +326,9 @@ public class DefaultTableService extends PersistentBase 
implements TableService
   @Override
   public void dispose() {
     tableExplorerScheduler.shutdown();
+    if (bucketTableSyncScheduler != null) {
+      bucketTableSyncScheduler.shutdown();
+    }
     if (tableExplorerExecutors != null) {
       tableExplorerExecutors.shutdown();
     }
@@ -259,8 +338,212 @@ public class DefaultTableService extends PersistentBase 
implements TableService
     tableRuntimeMap.values().forEach(TableRuntime::unregisterMetric);
   }
 
+  /**
+   * Update assigned bucket IDs from AssignStore. This should be called 
periodically to refresh the
+   * bucket assignments.
+   */
+  private void updateAssignedBucketIds() {
+    if (haContainer == null || bucketAssignStore == null) {
+      LOG.warn(
+          "No assigned bucket ids found. check if haContainer == null or 
bucketAssignStore == null");
+      return;
+    }
+    try {
+      AmsServerInfo currentServerInfo = 
haContainer.getTableServiceServerInfo();
+      if (currentServerInfo == null) {
+        LOG.warn("Cannot get current server info, skip updating assigned 
bucketIds");
+        return;
+      }
+      List<String> newBucketIds = 
bucketAssignStore.getAssignments(currentServerInfo);
+      if (!newBucketIds.equals(assignedBucketIds)) {
+        LOG.info("Assigned bucketIds changed from {} to {}", 
assignedBucketIds, newBucketIds);
+        assignedBucketIds = new ArrayList<>(newBucketIds);
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to update assigned bucketIds", e);
+    }
+  }
+
+  /**
+   * Sync tables for assigned bucket IDs. This method is called periodically 
in master-slave mode.
+   */
+  private void syncBucketTables() {
+    if (!isMasterSlaveMode || haContainer == null || bucketAssignStore == 
null) {
+      return;
+    }
+    try {
+      updateAssignedBucketIds();
+      if (assignedBucketIds.isEmpty()) {
+        // In master-slave mode, if no bucketIds are assigned yet, it's normal 
during startup
+        // The AmsAssignService will assign bucketIds later
+        LOG.debug("No bucketIds assigned to this node yet, skip syncing tables 
(will retry later)");
+        return;
+      }
+
+      LOG.info("syncBucketTables assignedBucketIds:{}", assignedBucketIds);
+      // Load tables from database for assigned bucketIds
+      List<TableRuntimeMeta> tableRuntimeMetaList =
+          getAs(
+              TableRuntimeMapper.class,
+              mapper -> mapper.selectRuntimesByBucketIds(assignedBucketIds, 
false));
+
+      Map<Long, ServerTableIdentifier> identifierMap =
+          getAs(TableMetaMapper.class, 
TableMetaMapper::selectAllTableIdentifiers).stream()
+              .collect(Collectors.toMap(ServerTableIdentifier::getId, 
Function.identity()));
+
+      Map<Long, List<TableRuntimeState>> statesMap =
+          getAs(TableRuntimeMapper.class, 
TableRuntimeMapper::selectAllStates).stream()
+              .collect(
+                  Collectors.toMap(
+                      TableRuntimeState::getTableId,
+                      Lists::newArrayList,
+                      (a, b) -> {
+                        a.addAll(b);
+                        return a;
+                      }));
+
+      // Find tables that should be added (in DB but not in memory)
+      Set<Long> currentTableIds = new HashSet<>(tableRuntimeMap.keySet());
+      Set<Long> dbTableIds =
+          tableRuntimeMetaList.stream()
+              .map(TableRuntimeMeta::getTableId)
+              .collect(Collectors.toSet());
+
+      // Add new tables
+      for (TableRuntimeMeta tableRuntimeMeta : tableRuntimeMetaList) {
+        Long tableId = tableRuntimeMeta.getTableId();
+        if (!currentTableIds.contains(tableId)) {
+          ServerTableIdentifier identifier = identifierMap.get(tableId);
+          if (identifier == null) {
+            LOG.warn("No available table identifier found for table runtime 
meta id={}", tableId);
+            continue;
+          }
+          List<TableRuntimeState> states = statesMap.get(tableId);
+          // Use empty list if states is null to avoid NullPointerException
+          if (states == null) {
+            states = Collections.emptyList();
+          }
+          Optional<TableRuntime> tableRuntime =
+              createTableRuntime(identifier, tableRuntimeMeta, states);
+          if (tableRuntime.isPresent()) {
+            TableRuntime runtime = tableRuntime.get();
+            
runtime.registerMetric(MetricManager.getInstance().getGlobalRegistry());
+            tableRuntimeMap.put(tableId, runtime);
+            if (headHandler != null) {
+              AmoroTable<?> table = loadTable(identifier);
+              if (table != null) {
+                headHandler.fireTableAdded(table, runtime);
+              }
+            }
+            LOG.info("Added table {} for bucketId {}", tableId, 
tableRuntimeMeta.getBucketId());
+          }
+        }
+      }
+
+      // Remove tables that are no longer assigned to this node
+      List<Long> tablesToRemove = new ArrayList<>();
+      for (Long tableId : currentTableIds) {
+        if (!dbTableIds.contains(tableId)) {
+          // Check if this table's bucketId is still assigned to this node
+          TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+          if (tableRuntime != null) {
+            // Get bucketId from database
+            TableRuntimeMeta meta =
+                getAs(TableRuntimeMapper.class, mapper -> 
mapper.selectRuntime(tableId));
+            if (meta != null && meta.getBucketId() != null) {
+              if (!assignedBucketIds.contains(meta.getBucketId())) {
+                tablesToRemove.add(tableId);
+              }
+            } else if (meta == null || meta.getBucketId() == null) {
+              // Table removed from database or bucketId is null
+              tablesToRemove.add(tableId);
+            }
+          }
+        }
+      }
+
+      for (Long tableId : tablesToRemove) {
+        TableRuntime tableRuntime = tableRuntimeMap.get(tableId);
+        if (tableRuntime != null) {
+          try {
+            if (headHandler != null) {
+              headHandler.fireTableRemoved(tableRuntime);
+            }
+            tableRuntime.dispose();
+            tableRuntimeMap.remove(tableId);
+            LOG.info("Removed table {} as it's no longer assigned to this 
node", tableId);
+          } catch (Exception e) {
+            LOG.error("Error occurred while removing tableRuntime of table 
{}", tableId, e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Error during bucket table sync", e);
+    }
+  }
+
+  /**
+   * Assign bucketId to a table using min-heap strategy. This ensures tables 
are evenly distributed
+   * across bucketIds.
+   *
+   * @return The assigned bucketId, or null if assignment fails
+   */
+  private String assignBucketIdForTable() {
+    try {
+      List<BucketIdCount> bucketIdCounts =
+          getAs(
+              TableRuntimeMapper.class,
+              (TableRuntimeMapper mapper) -> mapper.countTablesByBucketId());
+
+      Map<String, Integer> bucketTableCount = new HashMap<>();
+      int bucketIdTotalCount =
+          
serverConfiguration.getInteger(AmoroManagementConf.HA_BUCKET_ID_TOTAL_COUNT);
+      for (int i = 1; i <= bucketIdTotalCount; i++) {
+        bucketTableCount.put(String.valueOf(i), 0);
+      }
+
+      for (BucketIdCount bucketIdCount : bucketIdCounts) {
+        String bucketId = bucketIdCount.getBucketId();
+        if (bucketId != null && !bucketId.trim().isEmpty()) {
+          bucketTableCount.put(bucketId, bucketIdCount.getCount());
+        }
+      }
+
+      PriorityQueue<Map.Entry<String, Integer>> minHeap =
+          new PriorityQueue<>(
+              Comparator.<Map.Entry<String, 
Integer>>comparingInt(Map.Entry::getValue)
+                  .thenComparing(Map.Entry::getKey));
+
+      for (Map.Entry<String, Integer> entry : bucketTableCount.entrySet()) {
+        minHeap.offer(new AbstractMap.SimpleEntry<>(entry.getKey(), 
entry.getValue()));
+      }
+
+      if (!minHeap.isEmpty()) {
+        Map.Entry<String, Integer> selected = minHeap.poll();
+        String assignedBucketId = selected.getKey();
+        int tableCount = selected.getValue();
+        LOG.debug(
+            "Assigned bucketId {} to new table (min table count: {})",
+            assignedBucketId,
+            tableCount);
+        return assignedBucketId;
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to assign bucketId for table", e);
+    }
+    return null;
+  }
+
   @VisibleForTesting
   void exploreTableRuntimes() {
+    // In master-slave mode (fully wired), only leader node should explore 
table runtimes
+    if (isMasterSlaveMode
+        && haContainer != null
+        && bucketAssignStore != null
+        && !haContainer.hasLeadership()) {
+      LOG.debug("Not the leader node in master-slave mode, skip exploring 
table runtimes");
+      return;
+    }
     if (!initialized.isDone()) {
       throw new IllegalStateException("TableService is not initialized");
     }
@@ -289,12 +572,30 @@ public class DefaultTableService extends PersistentBase 
implements TableService
     // timely manner during the process of dropping the catalog due to 
concurrency considerations.
     // It is permissible to have some erroneous states in the middle, as long 
as the final data is
     // consistent.
+    // In master-slave mode, only clean up tables assigned to this node
     Set<String> catalogNames =
         catalogManager.listCatalogMetas().stream()
             .map(CatalogMeta::getCatalogName)
             .collect(Collectors.toSet());
     for (TableRuntime tableRuntime : tableRuntimeMap.values()) {
       if 
(!catalogNames.contains(tableRuntime.getTableIdentifier().getCatalog())) {
+        // In master-slave mode, only dispose tables assigned to this node
+        if (isMasterSlaveMode && haContainer != null && bucketAssignStore != 
null) {
+          try {
+            TableRuntimeMeta meta =
+                getAs(
+                    TableRuntimeMapper.class,
+                    mapper -> 
mapper.selectRuntime(tableRuntime.getTableIdentifier().getId()));
+            if (meta != null && meta.getBucketId() != null) {
+              if (!assignedBucketIds.contains(meta.getBucketId())) {
+                // Not assigned to this node, skip
+                continue;
+              }
+            }
+          } catch (Exception e) {
+            LOG.warn("Failed to check bucketId for table {}", 
tableRuntime.getTableIdentifier(), e);
+          }
+        }
         disposeTable(tableRuntime.getTableIdentifier());
       }
     }
@@ -495,8 +796,67 @@ public class DefaultTableService extends PersistentBase 
implements TableService
     meta.setStatusCode(OptimizingStatus.IDLE.getCode());
     meta.setGroupName(configuration.getOptimizingConfig().getOptimizerGroup());
     meta.setTableSummary(new TableSummary());
+
+    // In master-slave mode (fully wired), assign bucketId to the table if 
it's not assigned yet.
+    // Only leader node should assign bucketIds; follower may still persist 
the table with null
+    // bucketId (e.g. onTableCreated on follower), and leader will assign 
later via exploration.
+    String assignedBucketId = null;
+    if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null) 
{
+      if (haContainer.hasLeadership()) {
+        TableRuntimeMeta existingMeta =
+            getAs(
+                TableRuntimeMapper.class,
+                mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
+        if (existingMeta != null) {
+          // Runtime already exists (e.g. inserted by follower with null 
bucketId)
+          if (existingMeta.getBucketId() != null) {
+            return true; // already assigned
+          }
+          assignedBucketId = assignBucketIdForTable();
+          if (assignedBucketId != null) {
+            String finalAssignedBucketId = assignedBucketId;
+            long updated =
+                updateAs(
+                    TableRuntimeMapper.class,
+                    mapper ->
+                        mapper.updateBucketIdIfNull(
+                            serverTableIdentifier.getId(), 
finalAssignedBucketId));
+            if (updated == 1) {
+              LOG.info(
+                  "Assigned bucketId {} to existing table {} (was null)",
+                  assignedBucketId,
+                  serverTableIdentifier);
+            } else {
+              LOG.debug(
+                  "Skipped backfill bucketId {} for table {} (row missing or 
bucket_id already set)",
+                  assignedBucketId,
+                  serverTableIdentifier);
+            }
+          }
+          return true; // handled existing row (assigned or failed to assign)
+        }
+        assignedBucketId = assignBucketIdForTable();
+        if (assignedBucketId != null) {
+          meta.setBucketId(assignedBucketId);
+          LOG.info("Assigned bucketId {} to table {}", assignedBucketId, 
serverTableIdentifier);
+        } else {
+          LOG.warn(
+              "Failed to assign bucketId to table {}, will be assigned later",
+              serverTableIdentifier);
+        }
+      }
+      // else: follower does not assign; meta.bucketId remains null until 
leader assigns
+    }
+
     doAs(TableRuntimeMapper.class, mapper -> mapper.insertRuntime(meta));
 
+    // Only skip local runtime creation when master-slave mode is fully wired 
(bucketAssignStore
+    // is non-null). When bucketAssignStore is null (e.g. 3-arg test 
constructor), fall through
+    // and create the runtime in memory as in non-master-slave mode.
+    if (isMasterSlaveMode && haContainer != null && bucketAssignStore != null) 
{
+      return true;
+    }
+
     Optional<TableRuntime> tableRuntimeOpt =
         createTableRuntime(serverTableIdentifier, meta, 
Collections.emptyList());
     if (!tableRuntimeOpt.isPresent()) {
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql 
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index b36064109..dd3c85cb2 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -161,3 +161,15 @@ ALTER TABLE `table_process`
 ADD COLUMN `external_process_identifier` varchar(256) DEFAULT NULL COMMENT 
'Table optimizing external process identifier',
 ADD COLUMN `retry_number` int(11) NOT NULL DEFAULT 0 COMMENT 'Retry times',
 ADD COLUMN `process_parameters` mediumtext COMMENT 'Table process parameters';
+
+-- ADD table bucket_assignments for storing assigned info
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+                                                  cluster_name       
VARCHAR(64)   NOT NULL COMMENT 'AMS cluster name',
+    node_key           VARCHAR(256) NOT NULL COMMENT 'Node key 
(host:thriftBindPort)',
+    server_info_json   TEXT         NULL COMMENT 'JSON encoded AmsServerInfo',
+    assignments_json   TEXT         NULL COMMENT 'JSON array of bucket IDs',
+    last_update_time   BIGINT       NOT NULL DEFAULT 0 COMMENT 'Last update 
timestamp (ms since epoch)',
+    PRIMARY KEY (cluster_name, node_key)
+    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per 
AMS node for master-slave mode';
+
+
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql 
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 5cae2d34f..5797d704e 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -220,4 +220,16 @@ ALTER TABLE table_runtime ADD COLUMN bucket_id varchar(4);
 ALTER TABLE table_process
 ADD COLUMN external_process_identifier varchar(256) DEFAULT NULL,
 ADD COLUMN retry_number int NOT NULL,
-ADD COLUMN process_parameters text;
\ No newline at end of file
+ADD COLUMN process_parameters text;
+
+-- ADD table bucket_assignments for storing assigned info
+CREATE TABLE IF NOT EXISTS bucket_assignments (
+                                                  cluster_name       
VARCHAR(64)  NOT NULL,
+    node_key           VARCHAR(256) NOT NULL,
+    server_info_json   TEXT         NULL,
+    assignments_json   TEXT         NULL,
+    last_update_time   BIGINT       NOT NULL DEFAULT 0,
+    PRIMARY KEY (cluster_name, node_key)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON 
bucket_assignments (cluster_name);
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 34f8f4685..390c93446 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -58,9 +58,12 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
       
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());
       TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain());
       TABLE_SERVICE.initialize();
+      ResourceGroup group = defaultResourceGroup();
       try {
-        ResourceGroup group = defaultResourceGroup();
         OPTIMIZER_MANAGER.createResourceGroup(group);
+      } catch (Throwable ignored) {
+      }
+      try {
         OPTIMIZING_SERVICE.createResourceGroup(group);
       } catch (Throwable ignored) {
       }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
index 57f678db8..4b283c668 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -553,15 +553,7 @@ public class TestAmsAssignService {
   /** Create AmsAssignService with mock BucketAssignStore. */
   private AmsAssignService 
createAssignServiceWithMockStore(HighAvailabilityContainer container)
       throws Exception {
-    AmsAssignService service = new AmsAssignService(container, serviceConfig);
-
-    // Use reflection to inject mock assign store
-    java.lang.reflect.Field assignStoreField =
-        AmsAssignService.class.getDeclaredField("assignStore");
-    assignStoreField.setAccessible(true);
-    assignStoreField.set(service, mockAssignStore);
-
-    return service;
+    return new AmsAssignService(container, serviceConfig, mockAssignStore);
   }
 
   /** Create a mock CuratorFramework that uses MockZkState for storage. */
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 85316660b..014189f64 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -64,6 +64,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -116,6 +117,24 @@ public class TestOptimizingQueue extends AMSTableTestBase {
     return new ResourceGroup.Builder("test", "local").build();
   }
 
+  @Before
+  public void setUp() {
+    // Clean up any existing metrics for the test resource group before each 
test
+    // to avoid "Metric is already been registered" errors
+    MetricRegistry registry = MetricManager.getInstance().getGlobalRegistry();
+    String testGroupName = testResourceGroup().getName();
+
+    // Unregister all metrics for the test resource group
+    List<MetricKey> keysToRemove = new ArrayList<>();
+    for (MetricKey key : registry.getMetrics().keySet()) {
+      if (key.getDefine().getName().startsWith("optimizer_group_")
+          && testGroupName.equals(key.valueOfTag(GROUP_TAG))) {
+        keysToRemove.add(key);
+      }
+    }
+    keysToRemove.forEach(registry::unregister);
+  }
+
   protected OptimizingQueue buildOptimizingGroupService(DefaultTableRuntime 
tableRuntime) {
     return new OptimizingQueue(
         CATALOG_MANAGER,
@@ -289,12 +308,27 @@ public class TestOptimizingQueue extends AMSTableTestBase 
{
     queue.refreshTable(tableRuntime2);
     queue.refreshTable(tableRuntime);
 
+    // Poll two tasks and verify they come from different tables
+    // The order may vary due to async planning, so we check both possibilities
     TaskRuntime<?> task2 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task2);
-    Assert.assertTrue(tableRuntime2.getTableIdentifier().getId() == 
task2.getTableId());
     TaskRuntime<?> task3 = queue.pollTask(optimizerThread, MAX_POLLING_TIME);
     Assert.assertNotNull(task3);
-    Assert.assertTrue(tableRuntime.getTableIdentifier().getId() == 
task3.getTableId());
+
+    // Verify that the two tasks come from the two different tables
+    long tableId1 = tableRuntime.getTableIdentifier().getId();
+    long tableId2 = tableRuntime2.getTableIdentifier().getId();
+    long task2TableId = task2.getTableId();
+    long task3TableId = task3.getTableId();
+
+    Assert.assertTrue(
+        "Task2 should come from one of the two tables",
+        task2TableId == tableId1 || task2TableId == tableId2);
+    Assert.assertTrue(
+        "Task3 should come from one of the two tables",
+        task3TableId == tableId1 || task3TableId == tableId2);
+    Assert.assertNotEquals(
+        "Task2 and Task3 should come from different tables", task2TableId, 
task3TableId);
     queue.dispose();
     tableRuntime2.dispose();
   }
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
index 737bd3878..9b25ba19d 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestDataExpire.java
@@ -542,7 +542,7 @@ public class TestDataExpire extends ExecutorTestBase {
 
   @Test
   public void testExpireByPartitionWhenMetricsModeIsNone() {
-    assumeTrue(getMixedTable().format().in(TableFormat.MIXED_ICEBERG, 
TableFormat.ICEBERG));
+    assumeTrue(getTestFormat().in(TableFormat.MIXED_ICEBERG, 
TableFormat.ICEBERG));
 
     getMixedTable()
         .updateProperties()
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
index 1a391a404..fcd9373a2 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/maintainer/TestSnapshotExpire.java
@@ -471,7 +471,7 @@ public class TestSnapshotExpire extends ExecutorTestBase {
 
   @Test
   public void testBaseTableGcDisabled() {
-    Assume.assumeFalse(isKeyedTable());
+    Assume.assumeFalse(getMixedTable().isKeyedTable());
     UnkeyedTable testUnkeyedTable = getMixedTable().asUnkeyedTable();
     testUnkeyedTable.updateProperties().set("gc.enabled", "false").commit();
 
diff --git 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
index 3d1d4bfb1..2a96a0e9a 100644
--- 
a/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
+++ 
b/amoro-format-iceberg/src/main/java/org/apache/amoro/formats/iceberg/maintainer/IcebergTableMaintainer.java
@@ -685,7 +685,7 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
     Comparable<?> expireValue = getExpireValue(expirationConfig, field, 
expireTimestamp);
     return CloseableIterable.transform(
         CloseableIterable.withNoopClose(Iterables.concat(dataFiles, 
deleteFiles)),
-        contentFile -> {
+        (ContentFile<?> contentFile) -> {
           Literal<Long> literal =
               getExpireTimestampLiteral(
                   contentFile,
@@ -694,7 +694,12 @@ public class IcebergTableMaintainer implements 
TableMaintainer {
                       expirationConfig.getDateTimePattern(), 
Locale.getDefault()),
                   expirationConfig.getNumberDateFormat(),
                   expireValue);
-          return new FileEntry(contentFile.copyWithoutStats(), literal);
+          // copyWithoutStats() returns ContentFile<?> but with a captured 
wildcard type
+          // that needs explicit casting for type inference
+          @SuppressWarnings("rawtypes")
+          ContentFile<?> fileWithoutStats =
+              (ContentFile<?>) ((ContentFile) contentFile.copyWithoutStats());
+          return new FileEntry(fileWithoutStats, literal);
         });
   }
 
diff --git a/docs/configuration/ams-config.md b/docs/configuration/ams-config.md
index da6cf20eb..ec4f34c39 100644
--- a/docs/configuration/ams-config.md
+++ b/docs/configuration/ams-config.md
@@ -72,6 +72,7 @@ table td:last-child, table th:last-child { width: 40%; 
word-break: break-all; }
 | expire-snapshots.thread-count | 10 | The number of threads used for 
snapshots expiring. |
 | ha.bucket-assign.interval | 1 min | Interval for bucket assignment service 
to detect node changes and redistribute bucket IDs. |
 | ha.bucket-id.total-count | 100 | Total count of bucket IDs for assignment. 
Bucket IDs range from 1 to this value. |
+| ha.bucket-table-sync.interval | 1 min | Interval for syncing tables assigned 
to bucket IDs in master-slave mode. Each node periodically loads tables from 
database based on its assigned bucket IDs. |
 | ha.cluster-name | default | Amoro management service cluster name. |
 | ha.connection-timeout | 5 min | The Zookeeper connection timeout in 
milliseconds. |
 | ha.enabled | false | Whether to enable high availability mode. |


Reply via email to