This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new f6b1e683f [AMORO-3928] Modify the optimizer to support obtaining tasks 
from each AMS node for processing. (#3950)
f6b1e683f is described below

commit f6b1e683f4bf650b860e39a09273f52ba703a128
Author: can <[email protected]>
AuthorDate: Thu Apr 2 11:30:25 2026 +0800

    [AMORO-3928] Modify the optimizer to support obtaining tasks from each AMS 
node for processing. (#3950)
    
    * [Subtask]: Use a new configuration item to control whether master & slave 
mode is enabled. #3845
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    * [Subtask]: Optimize the logic for retrieving the AMS list from ZooKeeper 
in master-slave mode. #3928
    
    * This addresses the conflict issues with the latest main branch and 
introduces a new solution for storing allocation information based on a 
database.
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during 
compilation. #3928
    
    * [Subtask]: Optimized based on CR feedback. #3928
    
    * [Subtask]: Fixed a legacy bug that could cause unit tests to fail during 
compilation. #3928
    
    * Revert "[Subtask]: Fixed a legacy bug that could cause unit tests to fail 
during compilation. #3928"
    
    This reverts commit d54c126fdc32a77b4d1a5f8cf648cb3bbbca4a65.
    
    * [Subtask]: Optimized based on CR feedback. #3928
    
    * [Subtask]: Optimized based on CR feedback. #3928
    
    * [Subtask]: Optimized based on CR feedback. #3928
    
    ---------
    
    Co-authored-by: wardli <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |   3 +-
 .../org/apache/amoro/server/BucketAssignStore.java |   9 +
 .../amoro/server/BucketAssignStoreFactory.java     |   3 +-
 .../apache/amoro/server/DBBucketAssignStore.java   |  30 +-
 .../amoro/server/DefaultOptimizingService.java     |  35 ++
 .../apache/amoro/server/ZkBucketAssignStore.java   |  29 ++
 .../ha/DataBaseHighAvailabilityContainer.java      | 132 ++++--
 .../amoro/server/ha/HighAvailabilityContainer.java |  11 +-
 .../server/ha/NoopHighAvailabilityContainer.java   |   5 +
 .../server/ha/ZkHighAvailabilityContainer.java     |  11 +
 .../server/manager/AbstractOptimizerContainer.java |   7 +
 .../server/persistence/BucketAssignmentMeta.java   |  26 ++
 .../persistence/mapper/BucketAssignMapper.java     |  23 +-
 .../amoro/server/table/DefaultTableService.java    |   4 +-
 .../src/main/resources/derby/ams-derby-init.sql    |   1 +
 .../src/main/resources/mysql/ams-mysql-init.sql    |   1 +
 amoro-ams/src/main/resources/mysql/upgrade.sql     |   6 +-
 .../main/resources/postgres/ams-postgres-init.sql  |   1 +
 amoro-ams/src/main/resources/postgres/upgrade.sql  |   7 +-
 .../apache/amoro/server/TestAmsAssignService.java  |   9 +
 .../server/TestInternalIcebergCatalogService.java  |   6 +-
 .../org/apache/amoro/api/OptimizingService.java    | 472 +++++++++++++++++++++
 .../java/org/apache/amoro/OptimizerProperties.java |   1 +
 .../java/org/apache/amoro/client/AmsThriftUrl.java |  91 ++++
 .../org/apache/amoro/client/ZookeeperService.java  |   5 +
 .../main/thrift/amoro_optimizing_service.thrift    |   3 +
 .../apache/amoro/MockAmoroManagementServer.java    |   5 +
 .../common/AbstractOptimizerOperator.java          | 174 +++++++-
 .../amoro/optimizer/common/AmsNodeManager.java     | 116 +++++
 .../amoro/optimizer/common/OptimizerConfig.java    |  15 +
 .../amoro/optimizer/common/OptimizerExecutor.java  | 143 +++++--
 .../amoro/optimizer/common/OptimizerToucher.java   |   2 +-
 .../optimizer/common/ThriftAmsNodeManager.java     | 110 +++++
 33 files changed, 1390 insertions(+), 106 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index c8144f714..99108c5ff 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -277,7 +277,8 @@ public class AmoroServiceContainer {
             serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, 
bucketAssignStore);
     processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
     optimizingService =
-        new DefaultOptimizingService(serviceConfig, catalogManager, 
optimizerManager, tableService);
+        new DefaultOptimizingService(
+            serviceConfig, catalogManager, optimizerManager, tableService, 
bucketAssignStore);
 
     LOG.info("Setting up AMS table executors...");
     InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
index 8db61265b..b85751bf8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
@@ -65,6 +65,15 @@ public interface BucketAssignStore {
    */
   Map<AmsServerInfo, List<String>> getAllAssignments() throws 
BucketAssignStoreException;
 
+  /**
+   * Get all alive AMS nodes that have bucket assignments. Used by optimizers 
in master-slave mode
+   * to discover all AMS optimizing endpoints.
+   *
+   * @return List of AmsServerInfo for all nodes with bucket assignments, 
empty list if none
+   * @throws BucketAssignStoreException If retrieval operation fails
+   */
+  List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException;
+
   /**
    * Get the last update time for a node's assignments.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
index 12a494450..18f6d00e3 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -63,8 +63,9 @@ public final class BucketAssignStoreFactory {
             "Cannot create ZkBucketAssignStore: ZK client not available or 
invalid container type");
 
       case AmoroManagementConf.HA_TYPE_DATABASE:
+        long nodeHeartbeatTtlMs = 
conf.get(AmoroManagementConf.HA_LEASE_TTL).toMillis();
         LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
-        return new DBBucketAssignStore(clusterName);
+        return new DBBucketAssignStore(clusterName, nodeHeartbeatTtlMs);
 
       default:
         throw new IllegalArgumentException(
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
index b044f7b6c..0727e850c 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
@@ -45,9 +45,11 @@ public class DBBucketAssignStore extends PersistentBase 
implements BucketAssignS
       new TypeReference<List<String>>() {};
 
   private final String clusterName;
+  private final long nodeHeartbeatTtlMs;
 
-  public DBBucketAssignStore(String clusterName) {
+  public DBBucketAssignStore(String clusterName, long nodeHeartbeatTtlMs) {
     this.clusterName = clusterName;
+    this.nodeHeartbeatTtlMs = nodeHeartbeatTtlMs;
   }
 
   @Override
@@ -180,6 +182,32 @@ public class DBBucketAssignStore extends PersistentBase 
implements BucketAssignS
     }
   }
 
+  @Override
+  public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException 
{
+    try {
+      long cutoff = System.currentTimeMillis() - nodeHeartbeatTtlMs;
+      List<BucketAssignmentMeta> rows =
+          getAs(BucketAssignMapper.class, mapper -> 
mapper.selectAllByCluster(clusterName));
+      List<AmsServerInfo> nodes = new ArrayList<>();
+      for (BucketAssignmentMeta meta : rows) {
+        Long heartbeatTs = meta.getNodeHeartbeatTs();
+        if (heartbeatTs == null || heartbeatTs < cutoff) {
+          LOG.debug(
+              "Skipping stale node key={}, node_heartbeat_ts={}", 
meta.getNodeKey(), heartbeatTs);
+          continue;
+        }
+        AmsServerInfo nodeInfo = parseNodeInfo(meta);
+        if (nodeInfo.getThriftBindPort() != null && 
nodeInfo.getThriftBindPort() > 0) {
+          nodes.add(nodeInfo);
+        }
+      }
+      return nodes;
+    } catch (Exception e) {
+      LOG.error("Failed to get alive nodes", e);
+      throw new BucketAssignStoreException("Failed to get alive nodes", e);
+    }
+  }
+
   private static String getNodeKey(AmsServerInfo nodeInfo) {
     return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 73ee0dda8..66734682e 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -26,6 +26,7 @@ import org.apache.amoro.api.OptimizingService;
 import org.apache.amoro.api.OptimizingTask;
 import org.apache.amoro.api.OptimizingTaskId;
 import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.client.AmsServerInfo;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.config.TableConfiguration;
 import org.apache.amoro.exception.ForbiddenException;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -116,12 +118,22 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   private final TableService tableService;
   private final RuntimeHandlerChain tableHandlerChain;
   private final ExecutorService planExecutor;
+  private final BucketAssignStore bucketAssignStore;
 
   public DefaultOptimizingService(
       Configurations serviceConfig,
       CatalogManager catalogManager,
       OptimizerManager optimizerManager,
       TableService tableService) {
+    this(serviceConfig, catalogManager, optimizerManager, tableService, null);
+  }
+
+  public DefaultOptimizingService(
+      Configurations serviceConfig,
+      CatalogManager catalogManager,
+      OptimizerManager optimizerManager,
+      TableService tableService,
+      BucketAssignStore bucketAssignStore) {
     this.optimizerTouchTimeout =
         
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
     this.taskAckTimeout =
@@ -144,6 +156,7 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     this.tableService = tableService;
     this.catalogManager = catalogManager;
     this.optimizerManager = optimizerManager;
+    this.bucketAssignStore = bucketAssignStore;
     this.tableHandlerChain = new TableRuntimeHandlerImpl();
     this.planExecutor =
         Executors.newCachedThreadPool(
@@ -322,6 +335,28 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     return true;
   }
 
+  @Override
+  public List<String> getOptimizingNodeUrls() {
+    if (bucketAssignStore == null) {
+      return Collections.emptyList();
+    }
+    try {
+      List<AmsServerInfo> nodes = bucketAssignStore.getAliveNodes();
+      List<String> urls = new ArrayList<>(nodes.size());
+      for (AmsServerInfo node : nodes) {
+        if (node.getHost() != null
+            && node.getThriftBindPort() != null
+            && node.getThriftBindPort() > 0) {
+          urls.add(String.format("thrift://%s:%d", node.getHost(), 
node.getThriftBindPort()));
+        }
+      }
+      return urls;
+    } catch (Exception e) {
+      LOG.warn("Failed to get optimizing node URLs from bucket assign store", 
e);
+      return Collections.emptyList();
+    }
+  }
+
   /**
    * Get optimizing queue.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
index 9ece14aa4..ed498c3e3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
@@ -170,6 +170,35 @@ public class ZkBucketAssignStore implements 
BucketAssignStore {
     return allAssignments;
   }
 
+  @Override
+  public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException 
{
+    List<AmsServerInfo> nodes = new ArrayList<>();
+    try {
+      if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
+        return nodes;
+      }
+      List<String> nodeKeys = 
zkClient.getChildren().forPath(assignmentsBasePath);
+      for (String nodeKey : nodeKeys) {
+        try {
+          AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
+          if (nodeInfo.getThriftBindPort() != null && 
nodeInfo.getThriftBindPort() > 0) {
+            nodes.add(nodeInfo);
+          }
+        } catch (Exception e) {
+          LOG.warn("Failed to parse node key: {}", nodeKey, e);
+        }
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // path doesn't exist
+    } catch (BucketAssignStoreException e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.error("Failed to get alive nodes", e);
+      throw new BucketAssignStoreException("Failed to get alive nodes", e);
+    }
+    return nodes;
+  }
+
   @Override
   public long getLastUpdateTime(AmsServerInfo nodeInfo) throws 
BucketAssignStoreException {
     String nodeKey = getNodeKey(nodeInfo);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index 4edcd74fa..7d6b6b37e 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -21,8 +21,10 @@ package org.apache.amoro.server.ha;
 import org.apache.amoro.client.AmsServerInfo;
 import org.apache.amoro.config.Configurations;
 import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
 import org.apache.amoro.server.persistence.HaLeaseMeta;
 import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
 import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
 import org.apache.amoro.utils.JacksonUtil;
 import org.slf4j.Logger;
@@ -144,29 +146,50 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
       LOG.debug("Master-slave mode is not enabled, skip node registration");
       return;
     }
-    // In master-slave mode, register node to database by writing 
OPTIMIZING_SERVICE info
-    // This is similar to ZK mode registering ephemeral nodes
+    // Register this node in bucket_assignments so that all nodes can be 
discovered via
+    // getAliveNodes(). ha_lease has PK (cluster_name, service_name) and 
cannot store multiple
+    // nodes for the same service, so we use the per-node bucket_assignments 
table instead.
+    upsertNodeHeartbeat();
+    LOG.info(
+        "Registered AMS node to bucket_assignments: nodeKey={}, 
optimizingService={}",
+        getNodeKey(),
+        optimizingServiceServerInfo);
+  }
+
+  /** Returns nodeKey used as the bucket_assignments row identifier: 
host:optimizingPort. */
+  private String getNodeKey() {
+    return optimizingServiceServerInfo.getHost()
+        + ":"
+        + optimizingServiceServerInfo.getThriftBindPort();
+  }
+
+  /**
+   * Upsert this node's heartbeat row into bucket_assignments. Only updates 
node_heartbeat_ts (and
+   * server_info_json on first insert); never touches assignments_json so the 
leader's assignments
+   * are preserved.
+   */
+  private void upsertNodeHeartbeat() {
     long now = System.currentTimeMillis();
-    String optimizingInfoJson = 
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+    String nodeKey = getNodeKey();
+    String serverInfoJson = 
JacksonUtil.toJSONString(optimizingServiceServerInfo);
     try {
-      doAsIgnoreError(
-          HaLeaseMapper.class,
-          mapper -> {
-            int updated =
-                mapper.updateServerInfo(
-                    clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
-            if (updated == 0) {
-              mapper.insertServerInfoIfAbsent(
-                  clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, 
optimizingInfoJson, now);
-            }
-          });
-      LOG.info(
-          "Registered AMS node to database: nodeId={}, optimizingService={}",
-          nodeId,
-          optimizingServiceServerInfo);
+      int updated =
+          updateAs(
+                  BucketAssignMapper.class,
+                  mapper -> mapper.updateNodeHeartbeat(clusterName, nodeKey, 
now))
+              .intValue();
+      if (updated == 0) {
+        // First registration: insert a new row with empty assignments
+        doAsIgnoreError(
+            BucketAssignMapper.class,
+            mapper ->
+                mapper.insert(
+                    new BucketAssignmentMeta(
+                        clusterName, nodeKey, serverInfoJson, null, now, 
now)));
+      }
     } catch (Exception e) {
-      LOG.error("Failed to register node to database", e);
-      throw e;
+      LOG.error("Failed to upsert node heartbeat for nodeKey={}", nodeKey, e);
+      throw new RuntimeException("Failed to register node in 
bucket_assignments", e);
     }
   }
 
@@ -180,7 +203,12 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     return tableServiceServerInfo;
   }
 
-  /** Closes the heartbeat executor safely. */
+  @Override
+  public AmsServerInfo getOptimizingServiceServerInfo() {
+    return optimizingServiceServerInfo;
+  }
+
+  /** Closes the heartbeat executor safely and removes this node's 
registration row. */
   @Override
   public void close() {
     try {
@@ -190,6 +218,18 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
     } catch (Exception e) {
       LOG.error("Close Database HighAvailabilityContainer failed", e);
     }
+    // Remove this node from bucket_assignments so the leader immediately 
stops seeing it
+    // as alive without waiting for the heartbeat TTL to expire.
+    boolean isMasterSlaveMode = 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+    if (isMasterSlaveMode) {
+      try {
+        String nodeKey = getNodeKey();
+        doAs(BucketAssignMapper.class, mapper -> 
mapper.deleteByNode(clusterName, nodeKey));
+        LOG.info("Unregistered AMS node from bucket_assignments: nodeKey={}", 
nodeKey);
+      } catch (Exception e) {
+        LOG.warn("Failed to unregister node from bucket_assignments on close", 
e);
+      }
+    }
   }
 
   private class HeartbeatRunnable implements Runnable {
@@ -204,6 +244,14 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
           return;
         }
 
+        // Each node independently refreshes its own heartbeat in 
bucket_assignments so that
+        // getAliveNodes() can correctly detect liveness without relying on 
ha_lease.
+        try {
+          upsertNodeHeartbeat();
+        } catch (Exception e) {
+          LOG.warn("Failed to refresh node heartbeat in bucket_assignments", 
e);
+        }
+
         if (!isLeader.get()) {
           // First attempt to acquire the lease (similar to candidate/await)
           boolean success = tryAcquireLease(newExpireTs, now);
@@ -353,29 +401,35 @@ public class DataBaseHighAvailabilityContainer extends 
PersistentBase
       LOG.warn("Only leader node can get alive nodes list");
       return aliveNodes;
     }
+    // Read alive nodes from bucket_assignments keyed by node_heartbeat_ts. 
ha_lease has
+    // PK (cluster_name, service_name) which only allows one row per service 
and cannot
+    // represent multiple AMS nodes. bucket_assignments has PK (cluster_name, 
node_key) and
+    // stores one row per node; node_heartbeat_ts is updated exclusively by 
the owning node
+    // so the leader's refreshLastUpdateTime calls cannot mask a dead node's 
staleness.
     try {
-      long currentTime = System.currentTimeMillis();
-      List<HaLeaseMeta> leases =
-          getAs(
-              HaLeaseMapper.class,
-              mapper -> mapper.selectLeasesByService(clusterName, 
OPTIMIZING_SERVICE));
-      for (HaLeaseMeta lease : leases) {
-        // Only include nodes with valid (non-expired) leases
-        if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > 
currentTime) {
-          if (lease.getServerInfoJson() != null && 
!lease.getServerInfoJson().isEmpty()) {
-            try {
-              AmsServerInfo nodeInfo =
-                  JacksonUtil.parseObject(lease.getServerInfoJson(), 
AmsServerInfo.class);
-              aliveNodes.add(nodeInfo);
-            } catch (Exception e) {
-              LOG.warn("Failed to parse server info for node {}", 
lease.getNodeId(), e);
-            }
+      long cutoff = System.currentTimeMillis() - 
TimeUnit.SECONDS.toMillis(ttlSeconds);
+      List<BucketAssignmentMeta> rows =
+          getAs(BucketAssignMapper.class, mapper -> 
mapper.selectAllByCluster(clusterName));
+      for (BucketAssignmentMeta meta : rows) {
+        Long heartbeatTs = meta.getNodeHeartbeatTs();
+        if (heartbeatTs == null || heartbeatTs < cutoff) {
+          LOG.debug(
+              "Skipping stale node key={}, node_heartbeat_ts={}", 
meta.getNodeKey(), heartbeatTs);
+          continue;
+        }
+        if (meta.getServerInfoJson() != null && 
!meta.getServerInfoJson().isEmpty()) {
+          try {
+            AmsServerInfo nodeInfo =
+                JacksonUtil.parseObject(meta.getServerInfoJson(), 
AmsServerInfo.class);
+            aliveNodes.add(nodeInfo);
+          } catch (Exception e) {
+            LOG.warn("Failed to parse server_info_json for node {}", 
meta.getNodeKey(), e);
           }
         }
       }
     } catch (Exception e) {
-      LOG.error("Failed to get alive nodes from database", e);
-      throw e;
+      LOG.error("Failed to get alive nodes from bucket_assignments", e);
+      throw new RuntimeException("Failed to get alive nodes", e);
     }
     return aliveNodes;
   }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 1afeb73f5..30d01a606 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -70,9 +70,18 @@ public interface HighAvailabilityContainer {
   boolean hasLeadership();
 
   /**
-   * Get current AMS node information.
+   * Get current AMS node's table service server info (host + table-service 
Thrift port).
    *
    * @return {@link AmsServerInfo}
    */
   AmsServerInfo getTableServiceServerInfo();
+
+  /**
+   * Get current AMS node's optimizing service server info (host + optimizing 
Thrift port). This
+   * must be consistent with the {@link AmsServerInfo} written into {@link
+   * org.apache.amoro.server.BucketAssignStore} so that bucket lookups can 
match the stored nodeKey.
+   *
+   * @return {@link AmsServerInfo}
+   */
+  AmsServerInfo getOptimizingServiceServerInfo();
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index 4638e5a17..48282f2a9 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -66,4 +66,9 @@ public class NoopHighAvailabilityContainer implements 
HighAvailabilityContainer
   public AmsServerInfo getTableServiceServerInfo() {
     return null;
   }
+
+  @Override
+  public AmsServerInfo getOptimizingServiceServerInfo() {
+    return null;
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 0aa7b6e35..5c93a6777 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -294,6 +294,17 @@ public class ZkHighAvailabilityContainer implements 
HighAvailabilityContainer, L
     return tableServiceServerInfo;
   }
 
+  /**
+   * Get the current node's optimizing service server info. ZK-registered node 
data uses this info,
+   * so bucket-assignment lookups must use the same port.
+   *
+   * @return The current node's optimizing service server info, null if HA is 
not enabled
+   */
+  @Override
+  public AmsServerInfo getOptimizingServiceServerInfo() {
+    return optimizingServiceServerInfo;
+  }
+
   /**
    * Get the ZooKeeper client. This is used for creating BucketAssignStore.
    *
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
index 16628406d..05745c79b 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
@@ -136,6 +136,13 @@ public abstract class AbstractOptimizerContainer 
implements InternalResourceCont
     if (StringUtils.isNotEmpty(resource.getResourceId())) {
       stringBuilder.append(" -id ").append(resource.getResourceId());
     }
+    // Add master-slave mode flag if enabled
+    if (containerProperties != null
+        && "true"
+            .equalsIgnoreCase(
+                
containerProperties.get(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE_ENABLED)))
 {
+      stringBuilder.append(" -msm");
+    }
     return stringBuilder.toString();
   }
 
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
index 4f6ab7fb4..5cdb59b90 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
@@ -28,6 +28,8 @@ public class BucketAssignmentMeta {
   private String serverInfoJson;
   private String assignmentsJson;
   private Long lastUpdateTime;
+  /** Per-node heartbeat timestamp. Updated only by the owning node, never by 
the leader. */
+  private Long nodeHeartbeatTs;
 
   public BucketAssignmentMeta() {}
 
@@ -42,6 +44,22 @@ public class BucketAssignmentMeta {
     this.serverInfoJson = serverInfoJson;
     this.assignmentsJson = assignmentsJson;
     this.lastUpdateTime = lastUpdateTime;
+    this.nodeHeartbeatTs = lastUpdateTime;
+  }
+
+  public BucketAssignmentMeta(
+      String clusterName,
+      String nodeKey,
+      String serverInfoJson,
+      String assignmentsJson,
+      Long lastUpdateTime,
+      Long nodeHeartbeatTs) {
+    this.clusterName = clusterName;
+    this.nodeKey = nodeKey;
+    this.serverInfoJson = serverInfoJson;
+    this.assignmentsJson = assignmentsJson;
+    this.lastUpdateTime = lastUpdateTime;
+    this.nodeHeartbeatTs = nodeHeartbeatTs;
   }
 
   public String getClusterName() {
@@ -83,4 +101,12 @@ public class BucketAssignmentMeta {
   public void setLastUpdateTime(Long lastUpdateTime) {
     this.lastUpdateTime = lastUpdateTime;
   }
+
+  public Long getNodeHeartbeatTs() {
+    return nodeHeartbeatTs;
+  }
+
+  public void setNodeHeartbeatTs(Long nodeHeartbeatTs) {
+    this.nodeHeartbeatTs = nodeHeartbeatTs;
+  }
 }
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
index 55fab0175..e53a3a4ed 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
@@ -34,8 +34,8 @@ import java.util.List;
 public interface BucketAssignMapper {
 
   @Insert(
-      "INSERT INTO bucket_assignments (cluster_name, node_key, 
server_info_json, assignments_json, last_update_time) "
-          + "VALUES (#{meta.clusterName}, #{meta.nodeKey}, 
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime})")
+      "INSERT INTO bucket_assignments (cluster_name, node_key, 
server_info_json, assignments_json, last_update_time, node_heartbeat_ts) "
+          + "VALUES (#{meta.clusterName}, #{meta.nodeKey}, 
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime}, 
#{meta.nodeHeartbeatTs})")
   int insert(@Param("meta") BucketAssignmentMeta meta);
 
   @Update(
@@ -56,8 +56,20 @@ public interface BucketAssignMapper {
       @Param("nodeKey") String nodeKey,
       @Param("lastUpdateTime") Long lastUpdateTime);
 
+  /**
+   * Update only the per-node heartbeat timestamp. Must be called only by the 
owning node, never by
+   * the leader on behalf of other nodes.
+   */
+  @Update(
+      "UPDATE bucket_assignments SET node_heartbeat_ts = #{nodeHeartbeatTs} "
+          + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+  int updateNodeHeartbeat(
+      @Param("clusterName") String clusterName,
+      @Param("nodeKey") String nodeKey,
+      @Param("nodeHeartbeatTs") Long nodeHeartbeatTs);
+
   @Select(
-      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time "
+      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time, node_heartbeat_ts "
           + "FROM bucket_assignments WHERE cluster_name = #{clusterName} AND 
node_key = #{nodeKey}")
   @Results(
       id = "BucketAssignmentMetaMap",
@@ -66,13 +78,14 @@ public interface BucketAssignMapper {
         @Result(column = "node_key", property = "nodeKey"),
         @Result(column = "server_info_json", property = "serverInfoJson"),
         @Result(column = "assignments_json", property = "assignmentsJson"),
-        @Result(column = "last_update_time", property = "lastUpdateTime")
+        @Result(column = "last_update_time", property = "lastUpdateTime"),
+        @Result(column = "node_heartbeat_ts", property = "nodeHeartbeatTs")
       })
   BucketAssignmentMeta selectByNode(
       @Param("clusterName") String clusterName, @Param("nodeKey") String 
nodeKey);
 
   @Select(
-      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time "
+      "SELECT cluster_name, node_key, server_info_json, assignments_json, 
last_update_time, node_heartbeat_ts "
           + "FROM bucket_assignments WHERE cluster_name = #{clusterName}")
   @ResultMap("BucketAssignmentMetaMap")
   List<BucketAssignmentMeta> selectAllByCluster(@Param("clusterName") String 
clusterName);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 27acb89d1..c64b7d923 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -349,7 +349,9 @@ public class DefaultTableService extends PersistentBase 
implements TableService
       return;
     }
     try {
-      AmsServerInfo currentServerInfo = 
haContainer.getTableServiceServerInfo();
+      // Must use optimizingServiceServerInfo because AmsAssignService stores 
bucket assignments
+      // keyed by host:optimizingPort (from haContainer.getAliveNodes()), not 
host:tableServicePort.
+      AmsServerInfo currentServerInfo = 
haContainer.getOptimizingServiceServerInfo();
       if (currentServerInfo == null) {
         LOG.warn("Cannot get current server info, skip updating assigned 
bucketIds");
         return;
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql 
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 6f4be2ae5..32d663288 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -272,5 +272,6 @@ CREATE TABLE bucket_assignments (
   server_info_json   VARCHAR(32672),
   assignments_json   VARCHAR(32672),
   last_update_time   BIGINT        NOT NULL DEFAULT 0,
+  node_heartbeat_ts  BIGINT        NOT NULL DEFAULT 0,
   PRIMARY KEY (cluster_name, node_key)
 );
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql 
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index e9fd1a244..f202695c8 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -288,5 +288,6 @@ CREATE TABLE IF NOT EXISTS bucket_assignments (
   server_info_json   TEXT         NULL COMMENT 'JSON encoded AmsServerInfo',
   assignments_json   TEXT         NULL COMMENT 'JSON array of bucket IDs',
   last_update_time   BIGINT       NOT NULL DEFAULT 0 COMMENT 'Last update 
timestamp (ms since epoch)',
+  node_heartbeat_ts  BIGINT       NOT NULL DEFAULT 0 COMMENT 'Per-node 
heartbeat timestamp updated only by the owning node (ms since epoch)',
   PRIMARY KEY (cluster_name, node_key)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS 
node for master-slave mode';
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql 
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index dd3c85cb2..c67807751 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -164,12 +164,14 @@ ADD COLUMN `process_parameters` mediumtext COMMENT 'Table 
process parameters';
 
 -- ADD table bucket_assignments for storing assigned info
 CREATE TABLE IF NOT EXISTS bucket_assignments (
-                                                  cluster_name       
VARCHAR(64)   NOT NULL COMMENT 'AMS cluster name',
+    cluster_name       VARCHAR(64)   NOT NULL COMMENT 'AMS cluster name',
     node_key           VARCHAR(256) NOT NULL COMMENT 'Node key 
(host:thriftBindPort)',
     server_info_json   TEXT         NULL COMMENT 'JSON encoded AmsServerInfo',
     assignments_json   TEXT         NULL COMMENT 'JSON array of bucket IDs',
     last_update_time   BIGINT       NOT NULL DEFAULT 0 COMMENT 'Last update 
timestamp (ms since epoch)',
     PRIMARY KEY (cluster_name, node_key)
-    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per 
AMS node for master-slave mode';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS 
node for master-slave mode';
 
+-- ADD node_heartbeat_ts to table bucket_assignments
+ALTER TABLE `bucket_assignments` ADD COLUMN `node_heartbeat_ts` BIGINT NOT 
NULL DEFAULT 0 COMMENT 'Per-node heartbeat timestamp updated only by the owning 
node (ms since epoch)';
 
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql 
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index eeb1348bb..245e37744 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -462,6 +462,7 @@ CREATE TABLE IF NOT EXISTS bucket_assignments (
   server_info_json   TEXT         NULL,
   assignments_json   TEXT         NULL,
   last_update_time   BIGINT       NOT NULL DEFAULT 0,
+  node_heartbeat_ts  BIGINT       NOT NULL DEFAULT 0,
   PRIMARY KEY (cluster_name, node_key)
 );
 
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql 
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 5797d704e..8f26023f0 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -224,12 +224,15 @@ ADD COLUMN process_parameters text;
 
 -- ADD table bucket_assignments for storing assigned info
 CREATE TABLE IF NOT EXISTS bucket_assignments (
-                                                  cluster_name       
VARCHAR(64)  NOT NULL,
+    cluster_name       VARCHAR(64)  NOT NULL,
     node_key           VARCHAR(256) NOT NULL,
     server_info_json   TEXT         NULL,
     assignments_json   TEXT         NULL,
     last_update_time   BIGINT       NOT NULL DEFAULT 0,
     PRIMARY KEY (cluster_name, node_key)
-    );
+);
 
 CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON 
bucket_assignments (cluster_name);
+
+-- ADD node_heartbeat_ts to table bucket_assignments
+ALTER TABLE bucket_assignments ADD COLUMN node_heartbeat_ts BIGINT NOT NULL 
DEFAULT 0;
\ No newline at end of file
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
index 4b283c668..e441a6d1f 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -872,6 +872,15 @@ public class TestAmsAssignService {
       lastUpdateTimes.put(nodeKey, System.currentTimeMillis());
     }
 
+    @Override
+    public List<AmsServerInfo> getAliveNodes() throws 
BucketAssignStoreException {
+      List<AmsServerInfo> nodes = new ArrayList<>();
+      for (String nodeKey : assignments.keySet()) {
+        nodes.add(nodeInfoMap.getOrDefault(nodeKey, parseNodeKey(nodeKey)));
+      }
+      return nodes;
+    }
+
     private AmsServerInfo parseNodeKey(String nodeKey) {
       String[] parts = nodeKey.split(":");
       if (parts.length != 2) {
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
index 936b54c5b..19d4ace8a 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
@@ -76,7 +76,11 @@ public class TestInternalIcebergCatalogService extends 
RestCatalogServiceTestBas
       meta.putToCatalogProperties("cache-enabled", "false");
       meta.putToCatalogProperties("cache.expiration-interval-ms", "10000");
       catalogManager.updateCatalog(meta);
-      String warehouseInAMS = 
meta.getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE);
+      // Force a cache reload after invalidation to prevent the background 
catalog-scan task from
+      // overwriting the cache with a stale DB snapshot it read before the 
update completed.
+      CatalogMeta updatedMeta = catalogManager.getCatalogMeta(catalogName());
+      String warehouseInAMS =
+          
updatedMeta.getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE);
 
       Map<String, String> clientSideConfiguration = Maps.newHashMap();
       clientSideConfiguration.put("cache-enabled", "true");
diff --git 
a/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java 
b/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
index 331e69388..a13600c51 100644
--- a/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
+++ b/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
@@ -26,6 +26,8 @@ public class OptimizingService {
 
     public boolean cancelProcess(long processId) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
 
+    public java.util.List<java.lang.String> getOptimizingNodeUrls() throws 
org.apache.amoro.api.AmoroException, 
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface {
@@ -44,6 +46,8 @@ public class OptimizingService {
 
     public void cancelProcess(long processId, 
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>
 resultHandler) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
 
+    public void 
getOptimizingNodeUrls(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
 resultHandler) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+
   }
 
   public static class Client extends 
org.apache.amoro.shade.thrift.org.apache.thrift.TServiceClient implements Iface 
{
@@ -242,6 +246,32 @@ public class OptimizingService {
       throw new 
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.MISSING_RESULT,
 "cancelProcess failed: unknown result");
     }
 
+    @Override
+    public java.util.List<java.lang.String> getOptimizingNodeUrls() throws 
org.apache.amoro.api.AmoroException, 
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+    {
+      send_getOptimizingNodeUrls();
+      return recv_getOptimizingNodeUrls();
+    }
+
+    public void send_getOptimizingNodeUrls() throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+    {
+      getOptimizingNodeUrls_args args = new getOptimizingNodeUrls_args();
+      sendBase("getOptimizingNodeUrls", args);
+    }
+
+    public java.util.List<java.lang.String> recv_getOptimizingNodeUrls() 
throws org.apache.amoro.api.AmoroException, 
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+    {
+      getOptimizingNodeUrls_result result = new getOptimizingNodeUrls_result();
+      receiveBase(result, "getOptimizingNodeUrls");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      if (result.e1 != null) {
+        throw result.e1;
+      }
+      throw new 
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.MISSING_RESULT,
 "getOptimizingNodeUrls failed: unknown result");
+    }
+
   }
   public static class AsyncClient extends 
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClient implements 
AsyncIface {
     public static class Factory implements 
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClientFactory<AsyncClient>
 {
@@ -519,6 +549,38 @@ public class OptimizingService {
       }
     }
 
+    @Override
+    public void 
getOptimizingNodeUrls(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
 resultHandler) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+      checkReady();
+      getOptimizingNodeUrls_call method_call = new 
getOptimizingNodeUrls_call(resultHandler, this, ___protocolFactory, 
___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class getOptimizingNodeUrls_call extends 
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncMethodCall<java.util.List<java.lang.String>>
 {
+      public 
getOptimizingNodeUrls_call(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
 resultHandler, 
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClient client, 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolFactory 
protocolFactory, 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingTransport 
transport) throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      @Override
+      public void 
write_args(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol 
prot) throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        prot.writeMessageBegin(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessage("getOptimizingNodeUrls",
 org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.CALL, 
0));
+        getOptimizingNodeUrls_args args = new getOptimizingNodeUrls_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      @Override
+      public java.util.List<java.lang.String> getResult() throws 
org.apache.amoro.api.AmoroException, 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        if (getState() != 
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ)
 {
+          throw new java.lang.IllegalStateException("Method call not 
finished!");
+        }
+        
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TMemoryInputTransport 
memoryTransport = new 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol 
prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_getOptimizingNodeUrls();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends 
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseProcessor<I> implements 
org.apache.amoro.shade.thrift.org.apache.thrift.TProcessor {
@@ -539,6 +601,7 @@ public class OptimizingService {
       processMap.put("completeTask", new completeTask());
       processMap.put("authenticate", new authenticate());
       processMap.put("cancelProcess", new cancelProcess());
+      processMap.put("getOptimizingNodeUrls", new getOptimizingNodeUrls());
       return processMap;
     }
 
@@ -759,6 +822,38 @@ public class OptimizingService {
       }
     }
 
+    public static class getOptimizingNodeUrls<I extends Iface> extends 
org.apache.amoro.shade.thrift.org.apache.thrift.ProcessFunction<I, 
getOptimizingNodeUrls_args> {
+      public getOptimizingNodeUrls() {
+        super("getOptimizingNodeUrls");
+      }
+
+      @Override
+      public getOptimizingNodeUrls_args getEmptyArgsInstance() {
+        return new getOptimizingNodeUrls_args();
+      }
+
+      @Override
+      protected boolean isOneway() {
+        return false;
+      }
+
+      @Override
+      protected boolean rethrowUnhandledExceptions() {
+        return false;
+      }
+
+      @Override
+      public getOptimizingNodeUrls_result getResult(I iface, 
getOptimizingNodeUrls_args args) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        getOptimizingNodeUrls_result result = new 
getOptimizingNodeUrls_result();
+        try {
+          result.success = iface.getOptimizingNodeUrls();
+        } catch (org.apache.amoro.api.AmoroException e1) {
+          result.e1 = e1;
+        }
+        return result;
+      }
+    }
+
   }
 
   public static class AsyncProcessor<I extends AsyncIface> extends 
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -779,6 +874,7 @@ public class OptimizingService {
       processMap.put("completeTask", new completeTask());
       processMap.put("authenticate", new authenticate());
       processMap.put("cancelProcess", new cancelProcess());
+      processMap.put("getOptimizingNodeUrls", new getOptimizingNodeUrls());
       return processMap;
     }
 
@@ -1268,6 +1364,77 @@ public class OptimizingService {
       }
     }
 
+    public static class getOptimizingNodeUrls<I extends AsyncIface> extends 
org.apache.amoro.shade.thrift.org.apache.thrift.AsyncProcessFunction<I, 
getOptimizingNodeUrls_args, java.util.List<java.lang.String>> {
+      public getOptimizingNodeUrls() {
+        super("getOptimizingNodeUrls");
+      }
+
+      @Override
+      public getOptimizingNodeUrls_args getEmptyArgsInstance() {
+        return new getOptimizingNodeUrls_args();
+      }
+
+      @Override
+      public 
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
 getResultHandler(final 
org.apache.amoro.shade.thrift.org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer
 fb, final int seqid) {
+        final 
org.apache.amoro.shade.thrift.org.apache.thrift.AsyncProcessFunction fcall = 
this;
+        return new 
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>()
 {
+          @Override
+          public void onComplete(java.util.List<java.lang.String> o) {
+            getOptimizingNodeUrls_result result = new 
getOptimizingNodeUrls_result();
+            result.success = o;
+            try {
+              fcall.sendResponse(fb, result, 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.REPLY, 
seqid);
+            } catch 
(org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException 
e) {
+              _LOGGER.error("TTransportException writing to internal frame 
buffer", e);
+              fb.close();
+            } catch (java.lang.Exception e) {
+              _LOGGER.error("Exception writing to internal frame buffer", e);
+              onError(e);
+            }
+          }
+          @Override
+          public void onError(java.lang.Exception e) {
+            byte msgType = 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.amoro.shade.thrift.org.apache.thrift.TSerializable msg;
+            getOptimizingNodeUrls_result result = new 
getOptimizingNodeUrls_result();
+            if (e instanceof org.apache.amoro.api.AmoroException) {
+              result.e1 = (org.apache.amoro.api.AmoroException) e;
+              result.setE1IsSet(true);
+              msg = result;
+            } else if (e instanceof 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException) {
+              _LOGGER.error("TTransportException inside handler", e);
+              fb.close();
+              return;
+            } else if (e instanceof 
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException) {
+              _LOGGER.error("TApplicationException inside handler", e);
+              msgType = 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = 
(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException) e;
+            } else {
+              _LOGGER.error("Exception inside handler", e);
+              msgType = 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = new 
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.INTERNAL_ERROR,
 e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb, msg, msgType, seqid);
+            } catch (java.lang.Exception ex) {
+              _LOGGER.error("Exception writing to internal frame buffer", ex);
+              fb.close();
+            }
+          }
+        };
+      }
+
+      @Override
+      protected boolean isOneway() {
+        return false;
+      }
+
+      @Override
+      public void start(I iface, getOptimizingNodeUrls_args args, 
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
 resultHandler) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        iface.getOptimizingNodeUrls(resultHandler);
+      }
+    }
+
   }
 
   @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@@ -6985,4 +7152,309 @@ public class OptimizingService {
     }
   }
 
+  @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+  public static class getOptimizingNodeUrls_args implements 
org.apache.amoro.shade.thrift.org.apache.thrift.TBase<getOptimizingNodeUrls_args,
 getOptimizingNodeUrls_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<getOptimizingNodeUrls_args>   {
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct STRUCT_DESC = 
new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct("getOptimizingNodeUrls_args");
+
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new getOptimizingNodeUrls_argsStandardSchemeFactory();
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new getOptimizingNodeUrls_argsTupleSchemeFactory();
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements 
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldIdEnum {
+;
+      private static final java.util.Map<java.lang.String, _Fields> byName = 
new java.util.HashMap<java.lang.String, _Fields>();
+      static {
+        for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+      @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default: return null;
+        }
+      }
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new 
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+      @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) { return 
byName.get(name); }
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+      _Fields(short thriftId, java.lang.String fieldName) { _thriftId = 
thriftId; _fieldName = fieldName; }
+      @Override public short getThriftFieldId() { return _thriftId; }
+      @Override public java.lang.String getFieldName() { return _fieldName; }
+    }
+    public static final java.util.Map<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+    static {
+      java.util.Map<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> tmpMap 
= new java.util.EnumMap<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getOptimizingNodeUrls_args.class,
 metaDataMap);
+    }
+    public getOptimizingNodeUrls_args() {}
+    public getOptimizingNodeUrls_args(getOptimizingNodeUrls_args other) {}
+    @Override public getOptimizingNodeUrls_args deepCopy() { return new 
getOptimizingNodeUrls_args(this); }
+    @Override public void clear() {}
+    @Override public void setFieldValue(_Fields field, 
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
java.lang.Object value) { switch (field) {} }
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    @Override public java.lang.Object getFieldValue(_Fields field) { switch 
(field) {} throw new java.lang.IllegalStateException(); }
+    @Override public boolean isSet(_Fields field) { if (field == null) throw 
new java.lang.IllegalArgumentException(); switch (field) {} throw new 
java.lang.IllegalStateException(); }
+    @Override public boolean equals(java.lang.Object that) { if (that 
instanceof getOptimizingNodeUrls_args) return 
this.equals((getOptimizingNodeUrls_args)that); return false; }
+    public boolean equals(getOptimizingNodeUrls_args that) { if (that == null) 
return false; if (this == that) return true; return true; }
+    @Override public int hashCode() { int hashCode = 1; return hashCode; }
+    @Override public int compareTo(getOptimizingNodeUrls_args other) { if 
(!getClass().equals(other.getClass())) { return 
getClass().getName().compareTo(other.getClass().getName()); } int 
lastComparison = 0; return 0; }
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    @Override public _Fields fieldForId(int fieldId) { return 
_Fields.findByThriftId(fieldId); }
+    @Override public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot) 
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
scheme(iprot).read(iprot, this); }
+    @Override public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot) 
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
scheme(oprot).write(oprot, this); }
+    @Override public java.lang.String toString() { java.lang.StringBuilder sb 
= new java.lang.StringBuilder("getOptimizingNodeUrls_args("); sb.append(")"); 
return sb.toString(); }
+    public void validate() throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {}
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException { try { write(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(out)));
 } catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) { 
throw new java.io.IOException(te); } }
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException { try { read(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(in)));
 } catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) { 
throw new java.io.IOException(te); } }
+    private static class getOptimizingNodeUrls_argsStandardSchemeFactory 
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
{
+      @Override public getOptimizingNodeUrls_argsStandardScheme getScheme() { 
return new getOptimizingNodeUrls_argsStandardScheme(); }
+    }
+    private static class getOptimizingNodeUrls_argsStandardScheme extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme<getOptimizingNodeUrls_args>
 {
+      @Override public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot, 
getOptimizingNodeUrls_args struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField 
schemeField;
+        iprot.readStructBegin();
+        while (true) { schemeField = iprot.readFieldBegin(); if 
(schemeField.type == 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STOP) { break; } 
switch (schemeField.id) { default: 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
 schemeField.type); } iprot.readFieldEnd(); }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+      @Override public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot, 
getOptimizingNodeUrls_args struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        struct.validate();
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+    }
+    private static class getOptimizingNodeUrls_argsTupleSchemeFactory 
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
{
+      @Override public getOptimizingNodeUrls_argsTupleScheme getScheme() { 
return new getOptimizingNodeUrls_argsTupleScheme(); }
+    }
+    private static class getOptimizingNodeUrls_argsTupleScheme extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.TupleScheme<getOptimizingNodeUrls_args>
 {
+      @Override public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot, 
getOptimizingNodeUrls_args struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot; 
}
+      @Override public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot, 
getOptimizingNodeUrls_args struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot; 
}
+    }
+    private static <S extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol 
proto) {
+      return 
(org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme())
 ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
+  @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+  public static class getOptimizingNodeUrls_result implements 
org.apache.amoro.shade.thrift.org.apache.thrift.TBase<getOptimizingNodeUrls_result,
 getOptimizingNodeUrls_result._Fields>, java.io.Serializable, Cloneable, 
Comparable<getOptimizingNodeUrls_result>   {
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct STRUCT_DESC = 
new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct("getOptimizingNodeUrls_result");
+
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField 
SUCCESS_FIELD_DESC = new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField("success", 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST, (short)0);
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField E1_FIELD_DESC = 
new org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField("e1", 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT, 
(short)1);
+
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
STANDARD_SCHEME_FACTORY = new 
getOptimizingNodeUrls_resultStandardSchemeFactory();
+    private static final 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
TUPLE_SCHEME_FACTORY = new getOptimizingNodeUrls_resultTupleSchemeFactory();
+
+    public 
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
java.util.List<java.lang.String> success;
+    public 
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
org.apache.amoro.api.AmoroException e1;
+
+    public enum _Fields implements 
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success"),
+      E1((short)1, "e1");
+      private static final java.util.Map<java.lang.String, _Fields> byName = 
new java.util.HashMap<java.lang.String, _Fields>();
+      static { for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { 
byName.put(field.getFieldName(), field); } }
+      @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+      public static _Fields findByThriftId(int fieldId) { switch(fieldId) { 
case 0: return SUCCESS; case 1: return E1; default: return null; } }
+      public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields 
fields = findByThriftId(fieldId); if (fields == null) throw new 
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); 
return fields; }
+      @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+      public static _Fields findByName(java.lang.String name) { return 
byName.get(name); }
+      private final short _thriftId;
+      private final java.lang.String _fieldName;
+      _Fields(short thriftId, java.lang.String fieldName) { _thriftId = 
thriftId; _fieldName = fieldName; }
+      @Override public short getThriftFieldId() { return _thriftId; }
+      @Override public java.lang.String getFieldName() { return _fieldName; }
+    }
+
+    public static final java.util.Map<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+    static {
+      java.util.Map<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> tmpMap 
= new java.util.EnumMap<_Fields, 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData("success",
 org.apache.amoro.shade.thrift.org.apache.thrift.TFieldRequirementType.DEFAULT,
+          new 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.ListMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST,
+              new 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldValueMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRING))));
+      tmpMap.put(_Fields.E1, new 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData("e1", 
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldRequirementType.DEFAULT,
+          new 
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.StructMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT,
 org.apache.amoro.api.AmoroException.class)));
+      metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+      
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getOptimizingNodeUrls_result.class,
 metaDataMap);
+    }
+
+    public getOptimizingNodeUrls_result() {}
+    public getOptimizingNodeUrls_result(java.util.List<java.lang.String> 
success, org.apache.amoro.api.AmoroException e1) { this(); this.success = 
success; this.e1 = e1; }
+    public getOptimizingNodeUrls_result(getOptimizingNodeUrls_result other) {
+      if (other.isSetSuccess()) { this.success = new 
java.util.ArrayList<java.lang.String>(other.success); }
+      if (other.isSetE1()) { this.e1 = new 
org.apache.amoro.api.AmoroException(other.e1); }
+    }
+    @Override public getOptimizingNodeUrls_result deepCopy() { return new 
getOptimizingNodeUrls_result(this); }
+    @Override public void clear() { this.success = null; this.e1 = null; }
+
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    public java.util.List<java.lang.String> getSuccess() { return 
this.success; }
+    public getOptimizingNodeUrls_result 
setSuccess(@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
java.util.List<java.lang.String> success) { this.success = success; return 
this; }
+    public void unsetSuccess() { this.success = null; }
+    public boolean isSetSuccess() { return this.success != null; }
+    public void setSuccessIsSet(boolean value) { if (!value) { this.success = 
null; } }
+
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    public org.apache.amoro.api.AmoroException getE1() { return this.e1; }
+    public getOptimizingNodeUrls_result 
setE1(@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
org.apache.amoro.api.AmoroException e1) { this.e1 = e1; return this; }
+    public void unsetE1() { this.e1 = null; }
+    public boolean isSetE1() { return this.e1 != null; }
+    public void setE1IsSet(boolean value) { if (!value) { this.e1 = null; } }
+
+    @Override
+    public void setFieldValue(_Fields field, 
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable 
java.lang.Object value) {
+      switch (field) {
+      case SUCCESS: if (value == null) { unsetSuccess(); } else { 
setSuccess((java.util.List<java.lang.String>)value); } break;
+      case E1: if (value == null) { unsetE1(); } else { 
setE1((org.apache.amoro.api.AmoroException)value); } break;
+      }
+    }
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    @Override
+    public java.lang.Object getFieldValue(_Fields field) {
+      switch (field) { case SUCCESS: return getSuccess(); case E1: return 
getE1(); }
+      throw new java.lang.IllegalStateException();
+    }
+    @Override
+    public boolean isSet(_Fields field) {
+      if (field == null) { throw new java.lang.IllegalArgumentException(); }
+      switch (field) { case SUCCESS: return isSetSuccess(); case E1: return 
isSetE1(); }
+      throw new java.lang.IllegalStateException();
+    }
+    @Override public boolean equals(java.lang.Object that) { if (that 
instanceof getOptimizingNodeUrls_result) return 
this.equals((getOptimizingNodeUrls_result)that); return false; }
+    public boolean equals(getOptimizingNodeUrls_result that) {
+      if (that == null) return false;
+      if (this == that) return true;
+      boolean this_present_success = true && this.isSetSuccess();
+      boolean that_present_success = true && that.isSetSuccess();
+      if (this_present_success || that_present_success) { if 
(!(this_present_success && that_present_success)) return false; if 
(!this.success.equals(that.success)) return false; }
+      boolean this_present_e1 = true && this.isSetE1();
+      boolean that_present_e1 = true && that.isSetE1();
+      if (this_present_e1 || that_present_e1) { if (!(this_present_e1 && 
that_present_e1)) return false; if (!this.e1.equals(that.e1)) return false; }
+      return true;
+    }
+    @Override public int hashCode() { int hashCode = 1; hashCode = hashCode * 
8191 + ((isSetSuccess()) ? 131071 : 524287); if (isSetSuccess()) hashCode = 
hashCode * 8191 + success.hashCode(); hashCode = hashCode * 8191 + ((isSetE1()) 
? 131071 : 524287); if (isSetE1()) hashCode = hashCode * 8191 + e1.hashCode(); 
return hashCode; }
+    @Override
+    public int compareTo(getOptimizingNodeUrls_result other) {
+      if (!getClass().equals(other.getClass())) { return 
getClass().getName().compareTo(other.getClass().getName()); }
+      int lastComparison = 0;
+      lastComparison = java.lang.Boolean.compare(isSetSuccess(), 
other.isSetSuccess()); if (lastComparison != 0) return lastComparison;
+      if (isSetSuccess()) { lastComparison = 
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseHelper.compareTo(this.success,
 other.success); if (lastComparison != 0) return lastComparison; }
+      lastComparison = java.lang.Boolean.compare(isSetE1(), other.isSetE1()); 
if (lastComparison != 0) return lastComparison;
+      if (isSetE1()) { lastComparison = 
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseHelper.compareTo(this.e1, 
other.e1); if (lastComparison != 0) return lastComparison; }
+      return 0;
+    }
+    @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+    @Override public _Fields fieldForId(int fieldId) { return 
_Fields.findByThriftId(fieldId); }
+    @Override public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot) 
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
scheme(iprot).read(iprot, this); }
+    public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot) 
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException { 
scheme(oprot).write(oprot, this); }
+    @Override
+    public java.lang.String toString() {
+      java.lang.StringBuilder sb = new 
java.lang.StringBuilder("getOptimizingNodeUrls_result(");
+      boolean first = true;
+      sb.append("success:"); if (this.success == null) { sb.append("null"); } 
else { sb.append(this.success); } first = false;
+      if (!first) sb.append(", "); sb.append("e1:"); if (this.e1 == null) { 
sb.append("null"); } else { sb.append(this.e1); } first = false;
+      sb.append(")"); return sb.toString();
+    }
+    public void validate() throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {}
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException { try { write(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(out)));
 } catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) { 
throw new java.io.IOException(te); } }
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, java.lang.ClassNotFoundException { try { read(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(in)));
 } catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) { 
throw new java.io.IOException(te); } }
+
+    private static class getOptimizingNodeUrls_resultStandardSchemeFactory 
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
{
+      @Override public getOptimizingNodeUrls_resultStandardScheme getScheme() 
{ return new getOptimizingNodeUrls_resultStandardScheme(); }
+    }
+    private static class getOptimizingNodeUrls_resultStandardScheme extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme<getOptimizingNodeUrls_result>
 {
+      @Override
+      public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot, 
getOptimizingNodeUrls_result struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField 
schemeField;
+        iprot.readStructBegin();
+        while (true) {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STOP) { break; }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST) {
+                org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TList 
_list0 = iprot.readListBegin();
+                struct.success = new 
java.util.ArrayList<java.lang.String>(_list0.size);
+                java.lang.String _elem0;
+                for (int _i0 = 0; _i0 < _list0.size; ++_i0) { _elem0 = 
iprot.readString(); struct.success.add(_elem0); }
+                iprot.readListEnd();
+                struct.setSuccessIsSet(true);
+              } else { 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
 schemeField.type); }
+              break;
+            case 1: // E1
+              if (schemeField.type == 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT) {
+                struct.e1 = new org.apache.amoro.api.AmoroException();
+                struct.e1.read(iprot);
+                struct.setE1IsSet(true);
+              } else { 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
 schemeField.type); }
+              break;
+            default: 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
 schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+        struct.validate();
+      }
+      @Override
+      public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot, 
getOptimizingNodeUrls_result struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        struct.validate();
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.success != null) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          oprot.writeListBegin(new 
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TList(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRING,
 struct.success.size()));
+          for (java.lang.String _iter : struct.success) { 
oprot.writeString(_iter); }
+          oprot.writeListEnd();
+          oprot.writeFieldEnd();
+        }
+        if (struct.e1 != null) {
+          oprot.writeFieldBegin(E1_FIELD_DESC);
+          struct.e1.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+    }
+    private static class getOptimizingNodeUrls_resultTupleSchemeFactory 
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory 
{
+      @Override public getOptimizingNodeUrls_resultTupleScheme getScheme() { 
return new getOptimizingNodeUrls_resultTupleScheme(); }
+    }
+    private static class getOptimizingNodeUrls_resultTupleScheme extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.TupleScheme<getOptimizingNodeUrls_result>
 {
+      @Override
+      public void 
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot, 
getOptimizingNodeUrls_result struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol oprot = 
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet optionals = new java.util.BitSet();
+        if (struct.isSetSuccess()) { optionals.set(0); }
+        if (struct.isSetE1()) { optionals.set(1); }
+        oprot.writeBitSet(optionals, 2);
+        if (struct.isSetSuccess()) {
+          oprot.writeI32(struct.success.size());
+          for (java.lang.String _iter : struct.success) { 
oprot.writeString(_iter); }
+        }
+        if (struct.isSetE1()) { struct.e1.write(oprot); }
+      }
+      @Override
+      public void 
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot, 
getOptimizingNodeUrls_result struct) throws 
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+        
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol iprot = 
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
+        java.util.BitSet incoming = iprot.readBitSet(2);
+        if (incoming.get(0)) {
+          int _size1 = iprot.readI32();
+          struct.success = new java.util.ArrayList<java.lang.String>(_size1);
+          for (int _i = 0; _i < _size1; ++_i) { 
struct.success.add(iprot.readString()); }
+          struct.setSuccessIsSet(true);
+        }
+        if (incoming.get(1)) { struct.e1 = new 
org.apache.amoro.api.AmoroException(); struct.e1.read(iprot); 
struct.setE1IsSet(true); }
+      }
+    }
+    private static <S extends 
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.IScheme> S 
scheme(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol 
proto) {
+      return 
(org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme())
 ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+    }
+  }
+
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java 
b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
index a8d04a123..dd4a6f624 100644
--- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
+++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
@@ -48,4 +48,5 @@ public class OptimizerProperties {
   public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
   public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
   public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
+  public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED = 
"master-slave-mode-enabled";
 }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java 
b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
index 233fb6e69..860d5c6dc 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
@@ -32,6 +32,8 @@ import javax.security.auth.login.LoginException;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -207,6 +209,95 @@ public class AmsThriftUrl {
     }
   }
 
+  public static List<AmsServerInfo> parseMasterSlaveAmsNodes(String url) {
+    if (url == null) {
+      throw new IllegalArgumentException("thrift url is null");
+    }
+    return parserZookeeperUrlListForMasterSlaveMode(url);
+  }
+
+  private static List<AmsServerInfo> 
parserZookeeperUrlListForMasterSlaveMode(String url) {
+    if (!url.startsWith(ZOOKEEPER_FLAG)) {
+      throw new IllegalArgumentException(
+          "parseMasterSlaveAmsNodes only supports ZooKeeper URL format: 
zookeeper://host:port/cluster");
+    }
+    String thriftUrl = url;
+    if (url.contains("?")) {
+      thriftUrl = url.substring(0, url.indexOf("?"));
+    }
+    Matcher m = PATTERN.matcher(thriftUrl);
+    if (!m.matches()) {
+      throw new RuntimeException(String.format("invalid ams url %s", url));
+    }
+    String zkServerAddress;
+    String cluster;
+    if (m.group(1).contains("/")) {
+      zkServerAddress = m.group(1).substring(0, m.group(1).indexOf("/"));
+      cluster = m.group(1).substring(m.group(1).indexOf("/") + 1);
+    } else {
+      zkServerAddress = m.group(1);
+      cluster = m.group(2);
+    }
+    List<AmsServerInfo> serverInfoList = new ArrayList<>();
+    int retryCount = 0;
+    while (retryCount < MAX_RETRIES) {
+      try {
+        ZookeeperService zkService = 
ZookeeperService.getInstance(zkServerAddress);
+        String nodesPath = AmsHAProperties.getNodesPath(cluster);
+        List<String> nodePaths = zkService.getChildren(nodesPath);
+        for (String nodePath : nodePaths) {
+          try {
+            String fullPath = nodesPath + "/" + nodePath;
+            String nodeInfoJson = zkService.getData(fullPath);
+            if (nodeInfoJson != null && !nodeInfoJson.isEmpty()) {
+              AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson, 
AmsServerInfo.class);
+              serverInfoList.add(nodeInfo);
+            }
+          } catch (Exception e) {
+            logger.warn("Failed to get node info for path: {}", nodePath, e);
+          }
+        }
+        if (!serverInfoList.isEmpty()) {
+          logger.info("Found {} AMS nodes from ZooKeeper", 
serverInfoList.size());
+          return serverInfoList;
+        } else {
+          logger.warn("No AMS nodes found in ZooKeeper path: {}", nodesPath);
+          return serverInfoList;
+        }
+      } catch (KeeperException.AuthFailedException authFailedException) {
+        // If kerberos authentication is not enabled on the zk,
+        // an error occurs when the thread carrying kerberos authentication 
information accesses
+        // the zk.
+        // Therefore, clear the authentication information and try again
+        retryCount++;
+        logger.error(
+            String.format("Caught exception, retrying... (retry count: %s)", 
retryCount),
+            authFailedException);
+        try {
+          Subject subject = 
Subject.getSubject(java.security.AccessController.getContext());
+          if (subject != null) {
+            LoginContext loginContext = new LoginContext("", subject);
+            loginContext.logout();
+          }
+        } catch (LoginException e) {
+          logger.error("Failed to logout", e);
+        }
+      } catch (KeeperException.NoNodeException e) {
+        logger.debug("Nodes path does not exist: {}", 
AmsHAProperties.getNodesPath(cluster));
+        return serverInfoList;
+      } catch (Exception e) {
+        retryCount++;
+        logger.error(
+            String.format("Caught exception, retrying... (retry count: %s)", 
retryCount), e);
+        if (retryCount >= MAX_RETRIES) {
+          throw new RuntimeException(
+              String.format("Failed to get AMS nodes from ZooKeeper for url 
%s", url), e);
+        }
+      }
+    }
+    return serverInfoList;
+  }
+
   public String schema() {
     return schema;
   }
diff --git 
a/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java 
b/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
index ee683335d..8d413606d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
@@ -26,6 +26,7 @@ import 
org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.client.ZKClientCon
 import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
 
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 /** Provides ZooKeeper clients and operations. */
 public class ZookeeperService {
@@ -92,6 +93,10 @@ public class ZookeeperService {
     return new String(zkClient.getData().forPath(path), 
StandardCharsets.UTF_8);
   }
 
+  public List<String> getChildren(String path) throws Exception {
+    return zkClient.getChildren().forPath(path);
+  }
+
   public void delete(String path) throws Exception {
     zkClient.delete().forPath(path);
   }
diff --git a/amoro-common/src/main/thrift/amoro_optimizing_service.thrift 
b/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
index 77b6b7dce..3417fed31 100644
--- a/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
+++ b/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
@@ -67,4 +67,7 @@ service OptimizingService {
             throws (1: amoro_commons.AmoroException e1)
 
     bool cancelProcess(1:i64 processId)
+
+    list<string> getOptimizingNodeUrls()
+            throws (1: amoro_commons.AmoroException e1)
 }
diff --git 
a/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java 
b/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
index c4775b548..ae8f46285 100644
--- a/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
+++ b/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
@@ -469,6 +469,11 @@ public class MockAmoroManagementServer implements Runnable 
{
       return false;
     }
 
+    @Override
+    public java.util.List<String> getOptimizingNodeUrls() throws TException {
+      return java.util.Collections.emptyList();
+    }
+
     public Map<String, OptimizerRegisterInfo> getRegisteredOptimizers() {
       return registeredOptimizers;
     }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
index 22faae012..bda30c27f 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
@@ -29,6 +29,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -38,24 +40,100 @@ public class AbstractOptimizerOperator implements 
Serializable {
   // Call ams every 5 seconds by default
   private static long callAmsInterval = TimeUnit.SECONDS.toMillis(5);
 
+  /**
+   * In master-slave mode, the maximum number of consecutive shouldRetryLater 
errors allowed per
+   * node before failing fast and letting the caller try the next node. 
Prevents the multi-node
+   * polling loop in OptimizerExecutor from getting stuck indefinitely on a 
single unreachable node.
+   */
+  static final int MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE = 5;
+
   private final OptimizerConfig config;
   private final AtomicReference<String> token = new AtomicReference<>();
   private volatile boolean stopped = false;
+  private transient volatile AmsNodeManager amsNodeManager;
+  private transient volatile ThriftAmsNodeManager thriftAmsNodeManager;
 
   public AbstractOptimizerOperator(OptimizerConfig config) {
     Preconditions.checkNotNull(config);
     this.config = config;
+    if (config.isMasterSlaveMode()) {
+      String amsUrl = config.getAmsUrl();
+      if (amsUrl.startsWith("zookeeper://")) {
+        // ZK HA mode: discover nodes from ZooKeeper ephemeral nodes
+        try {
+          this.amsNodeManager = new AmsNodeManager(amsUrl);
+          LOG.info("Initialized ZK AmsNodeManager for master-slave mode");
+        } catch (Exception e) {
+          LOG.warn("Failed to initialize AmsNodeManager, will use single AMS 
URL", e);
+        }
+      } else {
+        // DB HA mode: discover nodes via getOptimizingNodeUrls() Thrift RPC
+        try {
+          this.thriftAmsNodeManager = new ThriftAmsNodeManager(amsUrl);
+          LOG.info("Initialized ThriftAmsNodeManager for master-slave mode (DB 
HA)");
+        } catch (Exception e) {
+          LOG.warn("Failed to initialize ThriftAmsNodeManager, will use single 
AMS URL", e);
+        }
+      }
+    }
+  }
+
+  /** Get the AmsNodeManager instance if available (ZK mode). */
+  protected AmsNodeManager getAmsNodeManager() {
+    return amsNodeManager;
+  }
+
+  /** Get the ThriftAmsNodeManager instance if available (DB mode). */
+  protected ThriftAmsNodeManager getThriftAmsNodeManager() {
+    return thriftAmsNodeManager;
+  }
+
+  /**
+   * Returns true if any node manager (ZK or Thrift) is active. Used by 
OptimizerExecutor to decide
+   * whether to use master-slave polling logic.
+   */
+  protected boolean hasAmsNodeManager() {
+    return amsNodeManager != null || thriftAmsNodeManager != null;
+  }
+
+  /**
+   * Get all AMS URLs from the active node manager. Falls back to the single 
configured URL when no
+   * node manager is available or the node list is empty.
+   */
+  protected List<String> getAmsNodeUrls() {
+    if (amsNodeManager != null) {
+      List<String> urls = amsNodeManager.getAllAmsUrls();
+      if (!urls.isEmpty()) {
+        return urls;
+      }
+    }
+    if (thriftAmsNodeManager != null) {
+      return thriftAmsNodeManager.getAllAmsUrls();
+    }
+    return Collections.singletonList(getConfig().getAmsUrl());
   }
 
   protected <T> T callAms(AmsCallOperation<T> operation) throws TException {
+    return callAms(getConfig().getAmsUrl(), operation);
+  }
+
+  /**
+   * Call AMS with a specific AMS URL.
+   *
+   * @param amsUrl The AMS node URL to call
+   * @param operation The operation to execute
+   * @return The result of the operation
+   * @throws TException If the operation fails
+   */
+  protected <T> T callAms(String amsUrl, AmsCallOperation<T> operation) throws 
TException {
     while (isStarted()) {
       try {
-        return 
operation.call(OptimizingClientPools.getClient(config.getAmsUrl()));
+        return operation.call(OptimizingClientPools.getClient(amsUrl));
       } catch (Throwable t) {
         if (shouldReturnNull(t)) {
           return null;
         } else if (shouldRetryLater(t)) {
-          LOG.error("Call ams got an error and will try again later", t);
+          LOG.error("Call ams {} got an error and will try again later", 
amsUrl, t);
           waitAShortTime();
         } else {
           throw t;
@@ -86,25 +164,95 @@ public class AbstractOptimizerOperator implements 
Serializable {
     return false;
   }
 
-  protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T> 
operation)
+  /**
+   * Call authenticated AMS with a specific AMS URL.
+   *
+   * @param amsUrl The AMS node URL to call
+   * @param operation The operation to execute
+   * @return The result of the operation
+   * @throws TException If the operation fails
+   */
+  protected <T> T callAuthenticatedAms(String amsUrl, 
AmsAuthenticatedCallOperation<T> operation)
       throws TException {
+    // Maximum retry time window for auth errors in master-slave mode (30 
seconds)
+    long maxAuthRetryTimeWindow = TimeUnit.SECONDS.toMillis(30);
+    Long firstAuthErrorTime = null;
+
+    // Per-node retry budget: in master-slave mode, limit consecutive 
shouldRetryLater retries so
+    // that a permanently unreachable node does not block the multi-node 
polling loop indefinitely.
+    int consecutiveRetries = 0;
+
     while (isStarted()) {
       if (tokenIsReady()) {
         String token = getToken();
         try {
-          return 
operation.call(OptimizingClientPools.getClient(config.getAmsUrl()), token);
+          return operation.call(OptimizingClientPools.getClient(amsUrl), 
token);
         } catch (Throwable t) {
           if (t instanceof AmoroException
               && ErrorCodes.PLUGIN_RETRY_AUTH_ERROR_CODE == ((AmoroException) 
(t)).getErrorCode()) {
-            // Reset the token when got a authorization error
-            LOG.error(
-                "Got a authorization error while calling ams, reset token and 
wait for a new one",
-                t);
-            resetToken(token);
+            // In master-slave mode, the slave node may not have completed 
optimizer
+            // auto-registration
+            // yet, so we should retry within a time window before resetting 
the token
+            boolean isMasterSlaveMode = config.isMasterSlaveMode() && 
hasAmsNodeManager();
+            long currentTime = System.currentTimeMillis();
+
+            if (isMasterSlaveMode) {
+              if (firstAuthErrorTime == null) {
+                // First auth error, record the time
+                firstAuthErrorTime = currentTime;
+              }
+
+              long elapsedTime = currentTime - firstAuthErrorTime;
+              if (elapsedTime < maxAuthRetryTimeWindow) {
+                // Still within retry time window, retry after waiting
+                LOG.warn(
+                    "Got an authorization error while calling ams {} in 
master-slave mode (elapsed: {}ms/{}ms). "
+                        + "This may be because the slave node hasn't completed 
optimizer auto-registration yet. "
+                        + "Will retry after waiting.",
+                    amsUrl,
+                    elapsedTime,
+                    maxAuthRetryTimeWindow,
+                    t);
+                waitAShortTime();
+              } else {
+                // Exceeded retry time window, reset token
+                LOG.error(
+                    "Got authorization errors from ams {} in master-slave mode 
for {}ms, exceeded retry time window ({}ms). "
+                        + "Reset token and wait for a new one",
+                    amsUrl,
+                    elapsedTime,
+                    maxAuthRetryTimeWindow,
+                    t);
+                resetToken(token);
+                firstAuthErrorTime = null;
+              }
+            } else {
+              // Non-master-slave mode, reset token immediately
+              LOG.error(
+                  "Got a authorization error while calling ams {}, reset token 
and wait for a new one",
+                  amsUrl,
+                  t);
+              resetToken(token);
+              firstAuthErrorTime = null;
+            }
           } else if (shouldReturnNull(t)) {
             return null;
           } else if (shouldRetryLater(t)) {
-            LOG.error("Call ams got an error and will try again later", t);
+            LOG.error("Call ams {} got an error and will try again later", 
amsUrl, t);
+            if (hasAmsNodeManager()) {
+              consecutiveRetries++;
+              if (consecutiveRetries > MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE) {
+                LOG.warn(
+                    "Per-node retry budget ({}) exhausted for AMS {}, failing 
fast to try next node",
+                    MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE,
+                    amsUrl);
+                if (t instanceof TException) {
+                  throw (TException) t;
+                }
+                throw new TException(
+                    "Per-node retry budget exhausted for " + amsUrl + ": " + 
t.getMessage(), t);
+              }
+            }
             waitAShortTime();
           } else {
             throw t;
@@ -118,6 +266,12 @@ public class AbstractOptimizerOperator implements 
Serializable {
     throw new IllegalStateException("Operator is stopped");
   }
 
+  /** Legacy method for backward compatibility. Uses the configured AMS URL. */
+  protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T> 
operation)
+      throws TException {
+    return callAuthenticatedAms(getConfig().getAmsUrl(), operation);
+  }
+
   public static void setCallAmsInterval(long callAmsInterval) {
     AbstractOptimizerOperator.callAmsInterval = callAmsInterval;
   }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
new file mode 100644
index 000000000..94af2ca13
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.common;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.client.AmsThriftUrl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Manages multiple AMS nodes in master-slave mode. Fetches node list from 
ZooKeeper and provides
+ * failover support.
+ */
+public class AmsNodeManager implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AmsNodeManager.class);
+  private static final Pattern ZOOKEEPER_PATTERN = 
Pattern.compile("zookeeper://(\\S+)/([\\w-]+)");
+  private static final long REFRESH_INTERVAL_MS =
+      TimeUnit.SECONDS.toMillis(30); // Refresh every 30 seconds
+
+  private final String zkServerAddress;
+  private final AtomicReference<List<String>> amsUrls =
+      new AtomicReference<>(Collections.emptyList());
+  private volatile long lastRefreshTime = 0;
+  private transient Object refreshLock;
+
+  public AmsNodeManager(String amsUrl) {
+    Matcher m = ZOOKEEPER_PATTERN.matcher(amsUrl);
+    if (!m.matches()) {
+      throw new IllegalArgumentException(
+          "AmsNodeManager only supports ZooKeeper URL format: 
zookeeper://host:port/cluster");
+    }
+    zkServerAddress = amsUrl;
+    refreshLock = new Object();
+    refreshNodes();
+  }
+
+  /** Get all available AMS URLs. */
+  public List<String> getAllAmsUrls() {
+    refreshNodesIfNeeded();
+    return new ArrayList<>(amsUrls.get());
+  }
+
+  /** Manually refresh node list from ZooKeeper. */
+  public void refreshNodes() {
+    synchronized (refreshLock) {
+      refreshNodesInternal();
+      lastRefreshTime = System.currentTimeMillis();
+    }
+  }
+
+  /** Refresh node list from ZooKeeper if needed. */
+  private void refreshNodesIfNeeded() {
+    long now = System.currentTimeMillis();
+    if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+      synchronized (refreshLock) {
+        if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+          refreshNodesInternal();
+          lastRefreshTime = now;
+        }
+      }
+    }
+  }
+
+  /** Refresh node list from ZooKeeper (internal implementation). */
+  private void refreshNodesInternal() {
+    try {
+      LOG.info("Refreshing nodes from {}", zkServerAddress);
+      List<String> nodeUrls = new ArrayList<>();
+      List<AmsServerInfo> amsServerInfos = 
AmsThriftUrl.parseMasterSlaveAmsNodes(zkServerAddress);
+      LOG.info("Refreshing nodes from {}", amsServerInfos);
+      for (AmsServerInfo amsServerInfo : amsServerInfos) {
+        nodeUrls.add(buildAmsUrl(amsServerInfo));
+      }
+
+      if (!nodeUrls.isEmpty()) {
+        amsUrls.set(nodeUrls);
+        LOG.info("Refreshed AMS nodes, found {} nodes: {}", nodeUrls.size(), 
nodeUrls);
+      } else {
+        LOG.warn("No AMS nodes found in ZooKeeper");
+        amsUrls.set(Collections.emptyList());
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to refresh AMS nodes from ZooKeeper", e);
+      // Keep existing nodes on error
+    }
+  }
+
+  private String buildAmsUrl(AmsServerInfo serverInfo) {
+    return String.format("thrift://%s:%d", serverInfo.getHost(), 
serverInfo.getThriftBindPort());
+  }
+}
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
index 544d31f24..2c70a2652 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
@@ -103,6 +103,12 @@ public class OptimizerConfig implements Serializable {
       usage = "Timeout in cache, default 10minutes")
   private String cacheTimeout = 
OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;
 
+  @Option(
+      name = "-msm",
+      aliases = "--" + OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE_ENABLED,
+      usage = "Enable master-slave mode")
+  private boolean masterSlaveMode = false;
+
   public OptimizerConfig() {}
 
   public OptimizerConfig(String[] args) throws CmdLineException {
@@ -214,6 +220,14 @@ public class OptimizerConfig implements Serializable {
     this.cacheTimeout = cacheTimeout;
   }
 
+  public boolean isMasterSlaveMode() {
+    return masterSlaveMode;
+  }
+
+  public void setMasterSlaveMode(boolean masterSlaveMode) {
+    this.masterSlaveMode = masterSlaveMode;
+  }
+
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
@@ -230,6 +244,7 @@ public class OptimizerConfig implements Serializable {
         .add("cacheMaxTotalSize", cacheMaxTotalSize)
         .add("cacheMaxEntrySize", cacheMaxEntrySize)
         .add("cacheTimeout", cacheTimeout)
+        .add("masterSlaveMode", masterSlaveMode)
         .toString();
   }
 }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 6b1dc0a6c..f83da12da 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -34,7 +34,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 public class OptimizerExecutor extends AbstractOptimizerOperator {
 
@@ -49,33 +53,81 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
   }
 
   public void start() {
+    // getAmsNodeUrls() returns all nodes in master-slave mode, or the single 
configured URL in
+    // active-standby mode, so both modes are handled uniformly via the same 
loop.
+    long basePollInterval = TimeUnit.SECONDS.toMillis(1);
+    long maxPollInterval = TimeUnit.SECONDS.toMillis(30);
+    int consecutiveEmptyPolls = 0;
+    final int emptyPollThreshold = 5;
+    final Random random = new Random();
+
     while (isStarted()) {
-      OptimizingTask ackTask = null;
-      OptimizingTaskResult result = null;
-      try {
-        OptimizingTask task = pollTask();
-        if (task != null && ackTask(task)) {
-          ackTask = task;
-          result = executeTask(task);
-        }
-      } catch (Throwable t) {
-        if (ackTask != null) {
-          LOG.error(
-              "Optimizer executor[{}] handling task[{}] failed and got an 
unknown error",
-              threadId,
-              ackTask.getTaskId(),
-              t);
-          String errorMessage = ExceptionUtil.getErrorMessage(t, 
ERROR_MESSAGE_MAX_LENGTH);
-          result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
-          result.setErrorMessage(errorMessage);
-        } else {
-          LOG.error("Optimizer executor[{}] got an unexpected error", 
threadId, t);
+      List<String> amsUrls = getAmsNodeUrls();
+
+      // Shuffle to prevent all optimizers from hitting the same AMS node 
simultaneously
+      if (amsUrls.size() > 1) {
+        Collections.shuffle(amsUrls, random);
+      }
+
+      boolean hasTask = false;
+      for (String amsUrl : amsUrls) {
+        if (!isStarted()) {
+          break;
         }
-      } finally {
-        if (result != null) {
-          completeTask(result);
+
+        OptimizingTask ackTask = null;
+        OptimizingTaskResult result = null;
+        try {
+          OptimizingTask task = pollTask(amsUrl);
+          if (task != null && ackTask(amsUrl, task)) {
+            ackTask = task;
+            hasTask = true;
+            result = executeTask(task);
+          }
+        } catch (Throwable t) {
+          if (ackTask != null) {
+            LOG.error(
+                "Optimizer executor[{}] handling task[{}] from AMS {} failed 
and got an unknown error",
+                threadId,
+                ackTask.getTaskId(),
+                amsUrl,
+                t);
+            String errorMessage = ExceptionUtil.getErrorMessage(t, 
ERROR_MESSAGE_MAX_LENGTH);
+            result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
+            result.setErrorMessage(errorMessage);
+          } else {
+            LOG.error(
+                "Optimizer executor[{}] got an unexpected error from AMS {}", 
threadId, amsUrl, t);
+          }
+        } finally {
+          if (result != null) {
+            completeTask(amsUrl, result);
+          }
         }
       }
+
+      if (hasTask) {
+        consecutiveEmptyPolls = 0;
+      } else {
+        consecutiveEmptyPolls++;
+      }
+
+      long waitTime;
+      if (amsUrls.isEmpty()) {
+        waitTime = basePollInterval;
+      } else if (consecutiveEmptyPolls >= emptyPollThreshold) {
+        int backoffFactor = consecutiveEmptyPolls - emptyPollThreshold + 1;
+        waitTime = Math.min(maxPollInterval, basePollInterval * (1L << 
backoffFactor));
+        LOG.debug(
+            "Optimizer executor[{}] no tasks found for {} consecutive polls, 
using increased interval: {}ms",
+            threadId,
+            consecutiveEmptyPolls,
+            waitTime);
+      } else {
+        waitTime = basePollInterval;
+      }
+
+      waitAShortTime(waitTime);
     }
   }
 
@@ -83,38 +135,44 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
     return threadId;
   }
 
-  private OptimizingTask pollTask() {
+  private OptimizingTask pollTask(String amsUrl) {
     OptimizingTask task = null;
-    while (isStarted()) {
-      try {
-        task = callAuthenticatedAms((client, token) -> client.pollTask(token, 
threadId));
-      } catch (TException exception) {
-        LOG.error("Optimizer executor[{}] polled task failed", threadId, 
exception);
-      }
+    try {
+      task = callAuthenticatedAms(amsUrl, (client, token) -> 
client.pollTask(token, threadId));
       if (task != null) {
-        LOG.info("Optimizer executor[{}] polled task[{}] from ams", threadId, 
task.getTaskId());
-        break;
-      } else {
-        waitAShortTime();
+        LOG.info(
+            "Optimizer executor[{}] polled task[{}] from AMS {}",
+            threadId,
+            task.getTaskId(),
+            amsUrl);
       }
+    } catch (TException exception) {
+      LOG.error(
+          "Optimizer executor[{}] polled task from AMS {} failed", threadId, 
amsUrl, exception);
     }
     return task;
   }
 
-  private boolean ackTask(OptimizingTask task) {
+  private boolean ackTask(String amsUrl, OptimizingTask task) {
     try {
       callAuthenticatedAms(
+          amsUrl,
           (client, token) -> {
             client.ackTask(token, threadId, task.getTaskId());
             return null;
           });
-      LOG.info("Optimizer executor[{}] acknowledged task[{}] to ams", 
threadId, task.getTaskId());
+      LOG.info(
+          "Optimizer executor[{}] acknowledged task[{}] to AMS {}",
+          threadId,
+          task.getTaskId(),
+          amsUrl);
       return true;
     } catch (TException exception) {
       LOG.error(
-          "Optimizer executor[{}] acknowledged task[{}] failed",
+          "Optimizer executor[{}] acknowledged task[{}] to AMS {} failed",
           threadId,
           task.getTaskId(),
+          amsUrl,
           exception);
       return false;
     }
@@ -124,24 +182,27 @@ public class OptimizerExecutor extends 
AbstractOptimizerOperator {
     return executeTask(getConfig(), getThreadId(), task, LOG);
   }
 
-  protected void completeTask(OptimizingTaskResult optimizingTaskResult) {
+  protected void completeTask(String amsUrl, OptimizingTaskResult 
optimizingTaskResult) {
     try {
       callAuthenticatedAms(
+          amsUrl,
           (client, token) -> {
             client.completeTask(token, optimizingTaskResult);
             return null;
           });
       LOG.info(
-          "Optimizer executor[{}] completed task[{}](status: {}) to ams",
+          "Optimizer executor[{}] completed task[{}](status: {}) to AMS {}",
           threadId,
           optimizingTaskResult.getTaskId(),
-          optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
+          optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
+          amsUrl);
     } catch (Exception exception) {
       LOG.error(
-          "Optimizer executor[{}] completed task[{}](status: {}) failed",
+          "Optimizer executor[{}] completed task[{}](status: {}) to AMS {} 
failed",
           threadId,
           optimizingTaskResult.getTaskId(),
           optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
+          amsUrl,
           exception);
     }
   }
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
index ce91e5501..48b8a7e07 100644
--- 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
@@ -32,7 +32,7 @@ import java.util.Map;
 public class OptimizerToucher extends AbstractOptimizerOperator {
   private static final Logger LOG = 
LoggerFactory.getLogger(OptimizerToucher.class);
 
-  private TokenChangeListener tokenChangeListener;
+  private transient TokenChangeListener tokenChangeListener;
   private final Map<String, String> registerProperties = Maps.newHashMap();
   private final long startTime;
 
diff --git 
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
new file mode 100644
index 000000000..f325d5c6e
--- /dev/null
+++ 
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.common;
+
+import org.apache.amoro.client.OptimizingClientPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Manages multiple AMS nodes in master-slave mode for DB-backed HA. Discovers 
the node list by
+ * calling {@code getOptimizingNodeUrls()} via the Thrift API on any reachable 
AMS node, then caches
+ * and periodically refreshes the list. This is the DB-mode counterpart of 
{@link AmsNodeManager}
+ * (which reads node addresses directly from ZooKeeper).
+ */
+public class ThriftAmsNodeManager implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ThriftAmsNodeManager.class);
+  private static final long REFRESH_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(30);
+
+  private final String initialAmsUrl;
+  private final AtomicReference<List<String>> amsUrls =
+      new AtomicReference<>(Collections.emptyList());
+  private volatile long lastRefreshTime = 0;
+  private transient Object refreshLock;
+
+  public ThriftAmsNodeManager(String initialAmsUrl) {
+    this.initialAmsUrl = initialAmsUrl;
+    this.refreshLock = new Object();
+    refreshNodes();
+  }
+
+  /** Get all available AMS URLs. Falls back to the initial URL when the list 
is empty. */
+  public List<String> getAllAmsUrls() {
+    refreshNodesIfNeeded();
+    List<String> current = amsUrls.get();
+    if (current.isEmpty()) {
+      return Collections.singletonList(initialAmsUrl);
+    }
+    return new ArrayList<>(current);
+  }
+
+  /** Manually refresh the node list from the AMS Thrift API. */
+  public void refreshNodes() {
+    synchronized (getLock()) {
+      refreshNodesInternal();
+      lastRefreshTime = System.currentTimeMillis();
+    }
+  }
+
+  private void refreshNodesIfNeeded() {
+    long now = System.currentTimeMillis();
+    if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+      synchronized (getLock()) {
+        if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+          refreshNodesInternal();
+          lastRefreshTime = now;
+        }
+      }
+    }
+  }
+
+  private void refreshNodesInternal() {
+    try {
+      List<String> nodeUrls =
+          
OptimizingClientPools.getClient(initialAmsUrl).getOptimizingNodeUrls();
+      if (nodeUrls != null && !nodeUrls.isEmpty()) {
+        amsUrls.set(new ArrayList<>(nodeUrls));
+        LOG.info(
+            "Refreshed AMS nodes via Thrift ({}), found {} nodes: {}",
+            initialAmsUrl,
+            nodeUrls.size(),
+            nodeUrls);
+      } else {
+        LOG.warn("No AMS nodes returned by getOptimizingNodeUrls() from {}", 
initialAmsUrl);
+        amsUrls.set(Collections.emptyList());
+      }
+    } catch (Exception e) {
+      LOG.warn("Failed to refresh AMS nodes from {} via Thrift", 
initialAmsUrl, e);
+    }
+  }
+
+  private Object getLock() {
+    if (refreshLock == null) {
+      refreshLock = new Object();
+    }
+    return refreshLock;
+  }
+}

Reply via email to