xxubai commented on code in PR #3950:
URL: https://github.com/apache/amoro/pull/3950#discussion_r2995426257
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java:
##########
@@ -86,25 +157,79 @@ private boolean shouldReturnNull(Throwable t) {
return false;
}
- protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T>
operation)
+ /**
+ * Call authenticated AMS with a specific AMS URL.
+ *
+ * @param amsUrl The AMS node URL to call
+ * @param operation The operation to execute
+ * @return The result of the operation
+ * @throws TException If the operation fails
+ */
+ protected <T> T callAuthenticatedAms(String amsUrl,
AmsAuthenticatedCallOperation<T> operation)
throws TException {
+ // Maximum retry time window for auth errors in master-slave mode (30
seconds)
+ long maxAuthRetryTimeWindow = TimeUnit.SECONDS.toMillis(90);
+ Long firstAuthErrorTime = null;
+
while (isStarted()) {
if (tokenIsReady()) {
String token = getToken();
try {
- return
operation.call(OptimizingClientPools.getClient(config.getAmsUrl()), token);
+ // Reset retry time on successful call
+ firstAuthErrorTime = null;
+ return operation.call(OptimizingClientPools.getClient(amsUrl),
token);
Review Comment:
`firstAuthErrorTime` always be null in this pleace. I think it should reset
after calling.
```suggestion
T res = operation.call(OptimizingClientPools.getClient(amsUrl),
token);
// Reset retry time on successful call
firstAuthErrorTime = null;
return res;
```
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java:
##########
@@ -49,6 +53,112 @@ public OptimizerExecutor(OptimizerConfig config, int
threadId) {
}
public void start() {
+ // Check if in master-slave mode (node manager initialised for either ZK
or DB HA)
+ boolean isMasterSlaveMode = getConfig().isMasterSlaveMode() &&
hasAmsNodeManager();
+
+ if (isMasterSlaveMode) {
+ // Master-slave mode: get node list and process tasks from each node
+ startMasterSlaveMode();
Review Comment:
If I understand correctly, single-node mode is a special case of multi-node
mode. Could we refactor startMasterSlaveMode and startSingleNodeMode into a
more generic implementation that supports multiple modes?
##########
amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java:
##########
@@ -48,4 +48,5 @@ public class OptimizerProperties {
public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
+ public static final String OPTIMIZER_MASTER_SLAVE_MODE = "master-slave-mode";
Review Comment:
It is a boolean property. Can you rename it to
`OPTIMIZER_MASTER_SLAVE_MODE_ENABLED` or just remove it.
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java:
##########
@@ -86,25 +157,79 @@ private boolean shouldReturnNull(Throwable t) {
return false;
}
- protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T>
operation)
+ /**
+ * Call authenticated AMS with a specific AMS URL.
+ *
+ * @param amsUrl The AMS node URL to call
+ * @param operation The operation to execute
+ * @return The result of the operation
+ * @throws TException If the operation fails
+ */
+ protected <T> T callAuthenticatedAms(String amsUrl,
AmsAuthenticatedCallOperation<T> operation)
throws TException {
+ // Maximum retry time window for auth errors in master-slave mode (30
seconds)
+ long maxAuthRetryTimeWindow = TimeUnit.SECONDS.toMillis(90);
Review Comment:
30 seconds or 90 seconds?
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java:
##########
@@ -32,7 +32,7 @@
public class OptimizerToucher extends AbstractOptimizerOperator {
private static final Logger LOG =
LoggerFactory.getLogger(OptimizerToucher.class);
- private TokenChangeListener tokenChangeListener;
+ private transient TokenChangeListener tokenChangeListener;
Review Comment:
Why set it to `transient`
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java:
##########
@@ -49,6 +53,112 @@ public OptimizerExecutor(OptimizerConfig config, int
threadId) {
}
public void start() {
+ // Check if in master-slave mode (node manager initialised for either ZK
or DB HA)
+ boolean isMasterSlaveMode = getConfig().isMasterSlaveMode() &&
hasAmsNodeManager();
+
+ if (isMasterSlaveMode) {
+ // Master-slave mode: get node list and process tasks from each node
+ startMasterSlaveMode();
+ } else {
+ // Active-standby mode: use original logic
+ startSingleNodeMode();
+ }
+ }
+
+ /** Start in master-slave mode: get node list and process tasks from each
AMS node. */
+ private void startMasterSlaveMode() {
+ // Dynamic polling interval control
+ long basePollInterval = TimeUnit.SECONDS.toMillis(1); // Base interval: 1
second
+ long maxPollInterval = TimeUnit.SECONDS.toMillis(30); // Max interval: 10
seconds
Review Comment:
Max interval: 10 or 30 seconds
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java:
##########
@@ -49,6 +53,112 @@ public OptimizerExecutor(OptimizerConfig config, int
threadId) {
}
public void start() {
+ // Check if in master-slave mode (node manager initialised for either ZK
or DB HA)
+ boolean isMasterSlaveMode = getConfig().isMasterSlaveMode() &&
hasAmsNodeManager();
Review Comment:
If the existence of `hasAmsNodeManager` already determines whether we use
the multi-node task fetching strategy, do we still need the `master-slave-mode`
configuration?
If do not need, I recommand to remove this config property.
##########
amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java:
##########
@@ -41,21 +43,90 @@ public class AbstractOptimizerOperator implements
Serializable {
private final OptimizerConfig config;
private final AtomicReference<String> token = new AtomicReference<>();
private volatile boolean stopped = false;
+ private transient volatile AmsNodeManager amsNodeManager;
+ private transient volatile ThriftAmsNodeManager thriftAmsNodeManager;
public AbstractOptimizerOperator(OptimizerConfig config) {
Preconditions.checkNotNull(config);
this.config = config;
+ if (config.isMasterSlaveMode()) {
+ String amsUrl = config.getAmsUrl();
+ if (amsUrl.startsWith("zookeeper://")) {
+ // ZK mode: discover nodes from ZooKeeper ephemeral nodes
+ try {
+ this.amsNodeManager = new AmsNodeManager(amsUrl);
+ LOG.info("Initialized ZK AmsNodeManager for master-slave mode");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize AmsNodeManager, will use single AMS
URL", e);
+ }
+ } else {
+ // DB mode (or direct thrift://): discover nodes via
getOptimizingNodeUrls() Thrift RPC
+ try {
+ this.thriftAmsNodeManager = new ThriftAmsNodeManager(amsUrl);
+ LOG.info("Initialized ThriftAmsNodeManager for master-slave mode (DB
HA)");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize ThriftAmsNodeManager, will use single
AMS URL", e);
+ }
+ }
+ }
+ }
+
+ /** Get the AmsNodeManager instance if available (ZK mode). */
+ protected AmsNodeManager getAmsNodeManager() {
+ return amsNodeManager;
+ }
+
+ /** Get the ThriftAmsNodeManager instance if available (DB mode). */
+ protected ThriftAmsNodeManager getThriftAmsNodeManager() {
+ return thriftAmsNodeManager;
+ }
+
+ /**
+ * Returns true if any node manager (ZK or Thrift) is active. Used by
OptimizerExecutor to decide
+ * whether to use master-slave polling logic.
+ */
+ protected boolean hasAmsNodeManager() {
+ return amsNodeManager != null || thriftAmsNodeManager != null;
+ }
+
+ /**
+ * Get all AMS URLs from the active node manager. Falls back to the single
configured URL when no
+ * node manager is available or the node list is empty.
+ */
+ protected List<String> getAmsNodeUrls() {
+ if (amsNodeManager != null) {
+ List<String> urls = amsNodeManager.getAllAmsUrls();
+ if (!urls.isEmpty()) {
+ return urls;
+ }
+ }
+ if (thriftAmsNodeManager != null) {
+ return thriftAmsNodeManager.getAllAmsUrls();
+ }
+ return Collections.singletonList(getConfig().getAmsUrl());
}
protected <T> T callAms(AmsCallOperation<T> operation) throws TException {
+ return callAms(getConfig().getAmsUrl(), operation);
Review Comment:
I have one concern:
This breaks authentication in multi-node mode: the optimizer still registers
and heartbeats against only one AMS endpoint, but OptimizerExecutor now
polls/acks/completes against every AMS node.
The flow becomes:
- the optimizer authenticates with AMS-A and gets a token
- it uses that same token to call AMS-B
- AMS-B does not know this token because it validates against its own local
authOptimizers
- the request fails with `PluginRetryAuthException` and keep retry
Unless we add per-node registration/token propagation, cross-node task
fetching will not work reliably.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]