This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 878d5b01e [Subtask]: In master-slave mode, each AMS should
automatically senses the optimizer. (#3937)
878d5b01e is described below
commit 878d5b01e2eb019f26ac3f5c6a55296d61d336cf
Author: can <[email protected]>
AuthorDate: Fri Apr 3 18:04:28 2026 +0800
[Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. (#3937)
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
[Subtask]: Add a registration function for table allocation in master-slave
mode. #3919
[Subtask]: Add a registration function for table allocation in master-slave
mode. #3919
[Subtask]: Replace zk with mocking. #3919
[Subtask]: Replace zk with mocking. #3919
[Subtask]: add AmsAssignService to implement balanced bucket allocation in
master-slave mode. #3921
[Subtask]: add AmsAssignService to implement balanced bucket allocation in
master-slave mode. #3921
[Subtask]: add AmsAssignService to implement balanced bucket allocation in
master-slave mode. #3921
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
[Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
[Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
[Subtask]: Fix unit test failure issue #3923
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
[Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
[Subtask]: Optimize the logic for retrieving the AMS list from ZooKeeper in
master-slave mode. #3928
This addresses the conflict issues with the latest main branch and
introduces a new solution for storing allocation information based on a
database.
This addresses the conflict issues with the latest main branch and
introduces a new solution for storing allocation information based on a
database.
Troubleshooting unit test failures.
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
Made-with: Cursor
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
* [Subtask]: Optimize based on CR's feedback.#3929
---------
Co-authored-by: wardli <[email protected]>
Co-authored-by: ZhouJinsong <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 7 +-
.../amoro/server/DefaultOptimizingService.java | 136 +++++++++++++++++++--
.../apache/amoro/server/AMSServiceTestBase.java | 2 +-
3 files changed, 132 insertions(+), 13 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index 99108c5ff..0fa2b917e 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -278,7 +278,12 @@ public class AmoroServiceContainer {
processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
optimizingService =
new DefaultOptimizingService(
- serviceConfig, catalogManager, optimizerManager, tableService,
bucketAssignStore);
+ serviceConfig,
+ catalogManager,
+ optimizerManager,
+ tableService,
+ bucketAssignStore,
+ haContainer);
LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 66734682e..e17672768 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -39,6 +39,7 @@ import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.resource.ResourceType;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.dashboard.model.OptimizerResourceInfo;
+import org.apache.amoro.server.ha.HighAvailabilityContainer;
import org.apache.amoro.server.manager.AbstractOptimizerContainer;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingQueue;
@@ -68,6 +69,8 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -119,21 +122,16 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
private final BucketAssignStore bucketAssignStore;
-
- public DefaultOptimizingService(
- Configurations serviceConfig,
- CatalogManager catalogManager,
- OptimizerManager optimizerManager,
- TableService tableService) {
- this(serviceConfig, catalogManager, optimizerManager, tableService, null);
- }
+ private final HighAvailabilityContainer haContainer;
+ private final boolean isMasterSlaveMode;
public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
OptimizerManager optimizerManager,
TableService tableService,
- BucketAssignStore bucketAssignStore) {
+ BucketAssignStore bucketAssignStore,
+ HighAvailabilityContainer haContainer) {
this.optimizerTouchTimeout =
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
@@ -157,6 +155,9 @@ public class DefaultOptimizingService extends
StatedPersistentBase
this.catalogManager = catalogManager;
this.optimizerManager = optimizerManager;
this.bucketAssignStore = bucketAssignStore;
+ this.haContainer = haContainer;
+ this.isMasterSlaveMode =
+ haContainer != null &&
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
@@ -565,10 +566,27 @@ public class DefaultOptimizingService extends
StatedPersistentBase
@Override
public void run() {
+ // Use 1/4 of optimizerTouchTimeout as sync interval (default ~30
seconds), used for
+ // master-slave follower sync.
+ long syncInterval = Math.max(5000, optimizerTouchTimeout / 4);
+ // In non-master-slave mode, this node is always the leader.
+ boolean wasLeader = !isMasterSlaveMode;
while (!stopped) {
try {
- T keepingTask = suspendingQueue.take();
- this.processTask(keepingTask);
+ boolean isLeader = !isMasterSlaveMode || haContainer.hasLeadership();
+ if (!wasLeader && isLeader) {
+ // Follower → Leader transition: subclass takes over monitoring of
inherited optimizers.
+ onBecomeLeader();
+ }
+ wasLeader = isLeader;
+
+ if (isLeader) {
+ T keepingTask = suspendingQueue.take();
+ this.processTask(keepingTask);
+ } else {
+ // Not leader: let subclass handle follower state (e.g. sync
optimizer list from DB)
+ onFollowerTick(syncInterval);
+ }
} catch (InterruptedException ignored) {
} catch (Throwable t) {
LOG.error("{} has encountered a problem.",
this.getClass().getSimpleName(), t);
@@ -577,6 +595,12 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
protected abstract void processTask(T task) throws Exception;
+
+ protected void onFollowerTick(long syncInterval) throws
InterruptedException {
+ Thread.sleep(syncInterval);
+ }
+
+ protected void onBecomeLeader() {}
}
private class OptimizerKeeper extends AbstractKeeper<OptimizerKeepingTask> {
@@ -610,6 +634,96 @@ public class DefaultOptimizingService extends
StatedPersistentBase
}
}
+ @Override
+ protected void onFollowerTick(long syncInterval) throws
InterruptedException {
+ loadOptimizersFromDatabase();
+ Thread.sleep(syncInterval);
+ }
+
+ @Override
+ protected void onBecomeLeader() {
+ LOG.info(
+ "Became leader, starting heartbeat monitoring for {} inherited
optimizers",
+ authOptimizers.size());
+ // All optimizers in authOptimizers were loaded from DB by the follower
sync loop.
+ // Their touchTime reflects the latest DB-persisted heartbeat, which is
the correct
+ // baseline for the new leader's expiry detection.
+ authOptimizers.values().forEach(this::keepInTouch);
+ }
+
+ /**
+ * Load optimizer information from database. This is used in master-slave
mode for follower
+ * nodes to sync optimizer state from database. This method performs
incremental updates by
+ * comparing database state with local authOptimizers, only adding new
optimizers and removing
+ * missing ones.
+ */
+ private void loadOptimizersFromDatabase() {
+ try {
+ List<OptimizerInstance> dbOptimizers =
+ getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
+
+ Map<String, OptimizerInstance> dbOptimizersByToken = new HashMap<>();
+ for (OptimizerInstance optimizer : dbOptimizers) {
+ String token = optimizer.getToken();
+ if (token != null) {
+ dbOptimizersByToken.put(token, optimizer);
+ }
+ }
+
+ Set<String> localTokens = new HashSet<>(authOptimizers.keySet());
+ Set<String> dbTokens = new HashSet<>(dbOptimizersByToken.keySet());
+ Set<String> tokensToAdd = new HashSet<>(dbTokens);
+ tokensToAdd.removeAll(localTokens);
+
+ Set<String> tokensToRemove = new HashSet<>(localTokens);
+ tokensToRemove.removeAll(dbTokens);
+
+ for (String token : tokensToAdd) {
+ OptimizerInstance optimizer = dbOptimizersByToken.get(token);
+ if (optimizer != null) {
+ registerOptimizerWithoutPersist(optimizer);
+ LOG.debug("Added optimizer {} from database", token);
+ }
+ }
+
+ for (String token : tokensToRemove) {
+ removeOptimizerFromLocal(token);
+ LOG.debug("Removed optimizer {} (not in database)", token);
+ }
+
+ LOG.debug(
+ "Synced optimizers from database: total={}, added={}, removed={},
current={}",
+ dbOptimizersByToken.size(),
+ tokensToAdd.size(),
+ tokensToRemove.size(),
+ authOptimizers.size());
+ } catch (Exception e) {
+ LOG.error("Failed to load optimizers from database", e);
+ }
+ }
+
+ private void registerOptimizerWithoutPersist(OptimizerInstance optimizer) {
+ OptimizingQueue optimizingQueue =
optimizingQueueByGroup.get(optimizer.getGroupName());
+ if (optimizingQueue == null) {
+ LOG.warn(
+ "Cannot register optimizer {}: optimizing queue for group {} not
found",
+ optimizer.getToken(),
+ optimizer.getGroupName());
+ return;
+ }
+ optimizingQueue.addOptimizer(optimizer);
+ authOptimizers.put(optimizer.getToken(), optimizer);
+ optimizingQueueByToken.put(optimizer.getToken(), optimizingQueue);
+ }
+
+ private void removeOptimizerFromLocal(String token) {
+ OptimizingQueue optimizingQueue = optimizingQueueByToken.remove(token);
+ OptimizerInstance optimizer = authOptimizers.remove(token);
+ if (optimizingQueue != null && optimizer != null) {
+ optimizingQueue.removeOptimizer(optimizer);
+ }
+ }
+
private void retryTask(TaskRuntime<?> task, OptimizingQueue queue) {
if (isTaskExecTimeout(task)) {
LOG.warn(
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
index 390c93446..27ad5e29a 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java
@@ -52,7 +52,7 @@ public abstract class AMSServiceTestBase extends
AMSManagerTestBase {
new DefaultTableService(new Configurations(), CATALOG_MANAGER,
runtimeFactory);
OPTIMIZING_SERVICE =
new DefaultOptimizingService(
- configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER,
TABLE_SERVICE);
+ configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER,
TABLE_SERVICE, null, null);
PROCESS_SERVICE = new ProcessService(TABLE_SERVICE);
TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler());