wardlican commented on code in PR #3937:
URL: https://github.com/apache/amoro/pull/3937#discussion_r3031104113


##########
amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java:
##########
@@ -610,6 +634,96 @@ protected void processTask(OptimizerKeepingTask 
keepingTask) {
       }
     }
 
+    @Override
+    protected void onFollowerTick(long syncInterval) throws 
InterruptedException {
+      loadOptimizersFromDatabase();
+      Thread.sleep(syncInterval);
+    }
+
+    @Override
+    protected void onBecomeLeader() {
+      LOG.info(
+          "Became leader, starting heartbeat monitoring for {} inherited 
optimizers",
+          authOptimizers.size());
+      // All optimizers in authOptimizers were loaded from DB by the follower 
sync loop.
+      // Their touchTime reflects the latest DB-persisted heartbeat, which is 
the correct
+      // baseline for the new leader's expiry detection.
+      authOptimizers.values().forEach(this::keepInTouch);
+    }
+
+    /**
+     * Load optimizer information from database. This is used in master-slave 
mode for follower
+     * nodes to sync optimizer state from database. This method performs 
incremental updates by
+     * comparing database state with local authOptimizers, only adding new 
optimizers and removing
+     * missing ones.
+     */
+    private void loadOptimizersFromDatabase() {
+      try {
+        List<OptimizerInstance> dbOptimizers =
+            getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
+
+        Map<String, OptimizerInstance> dbOptimizersByToken = new HashMap<>();
+        for (OptimizerInstance optimizer : dbOptimizers) {
+          String token = optimizer.getToken();
+          if (token != null) {
+            dbOptimizersByToken.put(token, optimizer);
+          }
+        }
+
+        Set<String> localTokens = new HashSet<>(authOptimizers.keySet());
+        Set<String> dbTokens = new HashSet<>(dbOptimizersByToken.keySet());
+        Set<String> tokensToAdd = new HashSet<>(dbTokens);
+        tokensToAdd.removeAll(localTokens);
+
+        Set<String> tokensToRemove = new HashSet<>(localTokens);
+        tokensToRemove.removeAll(dbTokens);
+
+        for (String token : tokensToAdd) {
+          OptimizerInstance optimizer = dbOptimizersByToken.get(token);
+          if (optimizer != null) {
+            registerOptimizerWithoutPersist(optimizer);
+            LOG.debug("Added optimizer {} from database", token);
+          }
+        }
+
+        for (String token : tokensToRemove) {
+          removeOptimizerFromLocal(token);
+          LOG.debug("Removed optimizer {} (not in database)", token);
+        }
+
+        LOG.info(
+            "Synced optimizers from database: total={}, added={}, removed={}, 
current={}",

Review Comment:
   Okay, I'll adjust the log level here to debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to