wardlican commented on code in PR #3950:
URL: https://github.com/apache/amoro/pull/3950#discussion_r3007320307
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java:
##########
@@ -41,21 +43,90 @@ public class AbstractOptimizerOperator implements
Serializable {
private final OptimizerConfig config;
private final AtomicReference<String> token = new AtomicReference<>();
private volatile boolean stopped = false;
+ private transient volatile AmsNodeManager amsNodeManager;
+ private transient volatile ThriftAmsNodeManager thriftAmsNodeManager;
public AbstractOptimizerOperator(OptimizerConfig config) {
Preconditions.checkNotNull(config);
this.config = config;
+ if (config.isMasterSlaveMode()) {
+ String amsUrl = config.getAmsUrl();
+ if (amsUrl.startsWith("zookeeper://")) {
+ // ZK mode: discover nodes from ZooKeeper ephemeral nodes
+ try {
+ this.amsNodeManager = new AmsNodeManager(amsUrl);
+ LOG.info("Initialized ZK AmsNodeManager for master-slave mode");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize AmsNodeManager, will use single AMS
URL", e);
+ }
+ } else {
+ // DB mode (or direct thrift://): discover nodes via
getOptimizingNodeUrls() Thrift RPC
+ try {
+ this.thriftAmsNodeManager = new ThriftAmsNodeManager(amsUrl);
+ LOG.info("Initialized ThriftAmsNodeManager for master-slave mode (DB
HA)");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize ThriftAmsNodeManager, will use single
AMS URL", e);
+ }
+ }
+ }
+ }
+
+ /** Get the AmsNodeManager instance if available (ZK mode). */
+ protected AmsNodeManager getAmsNodeManager() {
+ return amsNodeManager;
+ }
+
+ /** Get the ThriftAmsNodeManager instance if available (DB mode). */
+ protected ThriftAmsNodeManager getThriftAmsNodeManager() {
+ return thriftAmsNodeManager;
+ }
+
+ /**
+ * Returns true if any node manager (ZK or Thrift) is active. Used by
OptimizerExecutor to decide
+ * whether to use master-slave polling logic.
+ */
+ protected boolean hasAmsNodeManager() {
+ return amsNodeManager != null || thriftAmsNodeManager != null;
+ }
+
+ /**
+ * Get all AMS URLs from the active node manager. Falls back to the single
configured URL when no
+ * node manager is available or the node list is empty.
+ */
+ protected List<String> getAmsNodeUrls() {
+ if (amsNodeManager != null) {
+ List<String> urls = amsNodeManager.getAllAmsUrls();
+ if (!urls.isEmpty()) {
+ return urls;
+ }
+ }
+ if (thriftAmsNodeManager != null) {
+ return thriftAmsNodeManager.getAllAmsUrls();
+ }
+ return Collections.singletonList(getConfig().getAmsUrl());
}
protected <T> T callAms(AmsCallOperation<T> operation) throws TException {
+ return callAms(getConfig().getAmsUrl(), operation);
Review Comment:
Under the current design, the optimizer registers solely with the primary
node to obtain a token. Each AMS node then automatically synchronizes the list
of currently registered optimizers from the database; once this synchronization
is complete, the optimizer is able to pull tasks from any of the other AMS
nodes. Shifting to a model where each AMS node independently maintains tokens
would introduce excessive complexity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]