wardlican commented on code in PR #3937:
URL: https://github.com/apache/amoro/pull/3937#discussion_r3031104113
##########
amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java:
##########
@@ -610,6 +634,96 @@ protected void processTask(OptimizerKeepingTask
keepingTask) {
}
}
+ @Override
+ protected void onFollowerTick(long syncInterval) throws
InterruptedException {
+ loadOptimizersFromDatabase();
+ Thread.sleep(syncInterval);
+ }
+
+ @Override
+ protected void onBecomeLeader() {
+ LOG.info(
+ "Became leader, starting heartbeat monitoring for {} inherited
optimizers",
+ authOptimizers.size());
+ // All optimizers in authOptimizers were loaded from DB by the follower
sync loop.
+ // Their touchTime reflects the latest DB-persisted heartbeat, which is
the correct
+ // baseline for the new leader's expiry detection.
+ authOptimizers.values().forEach(this::keepInTouch);
+ }
+
+ /**
+ * Load optimizer information from database. This is used in master-slave
mode for follower
+ * nodes to sync optimizer state from database. This method performs
incremental updates by
+ * comparing database state with local authOptimizers, only adding new
optimizers and removing
+ * missing ones.
+ */
+ private void loadOptimizersFromDatabase() {
+ try {
+ List<OptimizerInstance> dbOptimizers =
+ getAs(OptimizerMapper.class, OptimizerMapper::selectAll);
+
+ Map<String, OptimizerInstance> dbOptimizersByToken = new HashMap<>();
+ for (OptimizerInstance optimizer : dbOptimizers) {
+ String token = optimizer.getToken();
+ if (token != null) {
+ dbOptimizersByToken.put(token, optimizer);
+ }
+ }
+
+ Set<String> localTokens = new HashSet<>(authOptimizers.keySet());
+ Set<String> dbTokens = new HashSet<>(dbOptimizersByToken.keySet());
+ Set<String> tokensToAdd = new HashSet<>(dbTokens);
+ tokensToAdd.removeAll(localTokens);
+
+ Set<String> tokensToRemove = new HashSet<>(localTokens);
+ tokensToRemove.removeAll(dbTokens);
+
+ for (String token : tokensToAdd) {
+ OptimizerInstance optimizer = dbOptimizersByToken.get(token);
+ if (optimizer != null) {
+ registerOptimizerWithoutPersist(optimizer);
+ LOG.debug("Added optimizer {} from database", token);
+ }
+ }
+
+ for (String token : tokensToRemove) {
+ removeOptimizerFromLocal(token);
+ LOG.debug("Removed optimizer {} (not in database)", token);
+ }
+
+ LOG.info(
+ "Synced optimizers from database: total={}, added={}, removed={},
current={}",
Review Comment:
Okay, I'll adjust the log level here to debug.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]