This is an automated email from the ASF dual-hosted git repository.

czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 878d5b01e [Subtask]: In master-slave mode, each AMS should 
automatically senses the optimizer.  (#3937)
878d5b01e is described below

commit 878d5b01e2eb019f26ac3f5c6a55296d61d336cf
Author: can <[email protected]>
AuthorDate: Fri Apr 3 18:04:28 2026 +0800

    [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer.  (#3937)
    
    * [Subtask]: add AmsAssignService to implement balanced bucket allocation 
in master-slave mode. #3921
    
    [Subtask]: Add a registration function for table allocation in master-slave 
mode. #3919
    
    [Subtask]: Add a registration function for table allocation in master-slave 
mode. #3919
    
    [Subtask]: Replace zk with mocking. #3919
    
    [Subtask]: Replace zk with mocking. #3919
    
    [Subtask]: add AmsAssignService to implement balanced bucket allocation in 
master-slave mode. #3921
    
    [Subtask]: add AmsAssignService to implement balanced bucket allocation in 
master-slave mode. #3921
    
    [Subtask]: add AmsAssignService to implement balanced bucket allocation in 
master-slave mode. #3921
    
    * [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    [Subtask]: Modify DefaultTableService to be compatible with master-slave 
mode #3923
    
    [Subtask]:  Fix unit test failure issue #3923
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    * [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    [Subtask]: Modify the optimizer to support obtaining tasks from each AMS 
node for processing. #3928
    
    [Subtask]: Optimize the logic for retrieving the AMS list from ZooKeeper in 
master-slave mode. #3928
    
    This addresses the conflict issues with the latest main branch and 
introduces a new solution for storing allocation information based on a 
database.
    
    This addresses the conflict issues with the latest main branch and 
introduces a new solution for storing allocation information based on a 
database.
    
    Troubleshooting unit test failures.
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    Made-with: Cursor
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    * [Subtask]: In master-slave mode, each AMS should automatically senses the 
optimizer. #3929
    
    * [Subtask]: Optimize based on CR's feedback.#3929
    
    ---------
    
    Co-authored-by: wardli <[email protected]>
    Co-authored-by: ZhouJinsong <[email protected]>
---
 .../apache/amoro/server/AmoroServiceContainer.java |   7 +-
 .../amoro/server/DefaultOptimizingService.java     | 136 +++++++++++++++++++--
 .../apache/amoro/server/AMSServiceTestBase.java    |   2 +-
 3 files changed, 132 insertions(+), 13 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 99108c5ff..0fa2b917e 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -278,7 +278,12 @@ public class AmoroServiceContainer {
     processService = new ProcessService(tableService, actionCoordinators, 
executeEngineManager);
     optimizingService =
         new DefaultOptimizingService(
-            serviceConfig, catalogManager, optimizerManager, tableService, 
bucketAssignStore);
+            serviceConfig,
+            catalogManager,
+            optimizerManager,
+            tableService,
+            bucketAssignStore,
+            haContainer);
 
     LOG.info("Setting up AMS table executors...");
     InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 66734682e..e17672768 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -39,6 +39,7 @@ import org.apache.amoro.resource.ResourceGroup;
 import org.apache.amoro.resource.ResourceType;
 import org.apache.amoro.server.catalog.CatalogManager;
 import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
 import org.apache.amoro.server.manager.AbstractOptimizerContainer;
 import org.apache.amoro.server.optimizing.OptimizingProcess;
 import org.apache.amoro.server.optimizing.OptimizingQueue;
@@ -68,6 +69,8 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -119,21 +122,16 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
   private final RuntimeHandlerChain tableHandlerChain;
   private final ExecutorService planExecutor;
   private final BucketAssignStore bucketAssignStore;
-
-  public DefaultOptimizingService(
-      Configurations serviceConfig,
-      CatalogManager catalogManager,
-      OptimizerManager optimizerManager,
-      TableService tableService) {
-    this(serviceConfig, catalogManager, optimizerManager, tableService, null);
-  }
+  private final HighAvailabilityContainer haContainer;
+  private final boolean isMasterSlaveMode;
 
   public DefaultOptimizingService(
       Configurations serviceConfig,
       CatalogManager catalogManager,
       OptimizerManager optimizerManager,
       TableService tableService,
-      BucketAssignStore bucketAssignStore) {
+      BucketAssignStore bucketAssignStore,
+      HighAvailabilityContainer haContainer) {
     this.optimizerTouchTimeout =
         
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
     this.taskAckTimeout =
@@ -157,6 +155,9 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     this.catalogManager = catalogManager;
     this.optimizerManager = optimizerManager;
     this.bucketAssignStore = bucketAssignStore;
+    this.haContainer = haContainer;
+    this.isMasterSlaveMode =
+        haContainer != null && 
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
     this.tableHandlerChain = new TableRuntimeHandlerImpl();
     this.planExecutor =
         Executors.newCachedThreadPool(
@@ -565,10 +566,27 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
 
     @Override
     public void run() {
+      // Use 1/4 of optimizerTouchTimeout as sync interval (default ~30 
seconds), used for
+      // master-slave follower sync.
+      long syncInterval = Math.max(5000, optimizerTouchTimeout / 4);
+      // In non-master-slave mode, this node is always the leader.
+      boolean wasLeader = !isMasterSlaveMode;
       while (!stopped) {
         try {
-          T keepingTask = suspendingQueue.take();
-          this.processTask(keepingTask);
+          boolean isLeader = !isMasterSlaveMode || haContainer.hasLeadership();
+          if (!wasLeader && isLeader) {
+            // Follower → Leader transition: subclass takes over monitoring of 
inherited optimizers.
+            onBecomeLeader();
+          }
+          wasLeader = isLeader;
+
+          if (isLeader) {
+            T keepingTask = suspendingQueue.take();
+            this.processTask(keepingTask);
+          } else {
+            // Not leader: let subclass handle follower state (e.g. sync 
optimizer list from DB)
+            onFollowerTick(syncInterval);
+          }
         } catch (InterruptedException ignored) {
         } catch (Throwable t) {
           LOG.error("{} has encountered a problem.", 
this.getClass().getSimpleName(), t);
@@ -577,6 +595,12 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
     }
 
     protected abstract void processTask(T task) throws Exception;
+
+    protected void onFollowerTick(long syncInterval) throws 
InterruptedException {
+      Thread.sleep(syncInterval);
+    }
+
+    protected void onBecomeLeader() {}
   }
 
   private class OptimizerKeeper extends AbstractKeeper<OptimizerKeepingTask> {
@@ -610,6 +634,96 @@ public class DefaultOptimizingService extends 
StatedPersistentBase
       }
     }
 
+    @Override
+    protected void onFollowerTick(long syncInterval) throws 
InterruptedException {
+      loadOptimizersFromDatabase();
+      Thread.sleep(syncInterval);
+    }
+
+    @Override
+    protected void onBecomeLeader() {
+      LOG.info(
+          "Became leader, starting heartbeat monitoring for {} inherited 
optimizers",
+          authOptimizers.size());
+      // All optimizers in authOptimizers were loaded from DB by the follower 
sync loop.
+      // Their touchTime reflects the latest DB-persisted heartbeat, which is 
the correct
+      // baseline for the new leader's expiry detection.
+      authOptimizers.values().forEach(this::keepInTouch);
+    }
+
+    /**
+     * Load optimizer information from database. This is used in master-slave 
mode for follower
+     * nodes to sync optimizer state from database. This method performs 
incremental updates by
+     * comparing database state with local authOptimizers, only adding new 
optimizers and removing
+     * missing ones.
+     */
+    private void loadOptimizersFromDatabase() {
+      try {
+        List<OptimizerInstance> dbOptimizers =
+            getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
+
+        Map<String, OptimizerInstance> dbOptimizersByToken = new HashMap<>();
+        for (OptimizerInstance optimizer : dbOptimizers) {
+          String token = optimizer.getToken();
+          if (token != null) {
+            dbOptimizersByToken.put(token, optimizer);
+          }
+        }
+
+        Set<String> localTokens = new HashSet<>(authOptimizers.keySet());
+        Set<String> dbTokens = new HashSet<>(dbOptimizersByToken.keySet());
+        Set<String> tokensToAdd = new HashSet<>(dbTokens);
+        tokensToAdd.removeAll(localTokens);
+
+        Set<String> tokensToRemove = new HashSet<>(localTokens);
+        tokensToRemove.removeAll(dbTokens);
+
+        for (String token : tokensToAdd) {
+          OptimizerInstance optimizer = dbOptimizersByToken.get(token);
+          if (optimizer != null) {
+            registerOptimizerWithoutPersist(optimizer);
+            LOG.debug("Added optimizer {} from database", token);
+          }
+        }
+
+        for (String token : tokensToRemove) {
+          removeOptimizerFromLocal(token);
+          LOG.debug("Removed optimizer {} (not in database)", token);
+        }
+
+        LOG.debug(
+            "Synced optimizers from database: total={}, added={}, removed={}, 
current={}",
+            dbOptimizersByToken.size(),
+            tokensToAdd.size(),
+            tokensToRemove.size(),
+            authOptimizers.size());
+      } catch (Exception e) {
+        LOG.error("Failed to load optimizers from database", e);
+      }
+    }
+
+    private void registerOptimizerWithoutPersist(OptimizerInstance optimizer) {
+      OptimizingQueue optimizingQueue = 
optimizingQueueByGroup.get(optimizer.getGroupName());
+      if (optimizingQueue == null) {
+        LOG.warn(
+            "Cannot register optimizer {}: optimizing queue for group {} not 
found",
+            optimizer.getToken(),
+            optimizer.getGroupName());
+        return;
+      }
+      optimizingQueue.addOptimizer(optimizer);
+      authOptimizers.put(optimizer.getToken(), optimizer);
+      optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue);
+    }
+
+    private void removeOptimizerFromLocal(String token) {
+      OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token);
+      OptimizerInstance optimizer = authOptimizers.remove(token);
+      if (optimizingQueue != null && optimizer != null) {
+        optimizingQueue.removeOptimizer(optimizer);
+      }
+    }
+
     private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
       if (isTaskExecTimeout(task)) {
         LOG.warn(
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java 
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 390c93446..27ad5e29a 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -52,7 +52,7 @@ public abstract class AMSServiceTestBase extends 
AMSManagerTestBase {
           new DefaultTableService(new Configurations(), CATALOG_MANAGER, 
runtimeFactory);
       OPTIMIZING_SERVICE =
           new DefaultOptimizingService(
-              configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, 
TABLE_SERVICE);
+              configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, 
TABLE_SERVICE, null, null);
       PROCESS_SERVICE = new ProcessService(TABLE_SERVICE);
 
       
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());

Reply via email to