This is an automated email from the ASF dual-hosted git repository.
czy006 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new f6b1e683f [AMORO-3928] Modify the optimizer to support obtaining tasks
from each AMS node for processing. (#3950)
f6b1e683f is described below
commit f6b1e683f4bf650b860e39a09273f52ba703a128
Author: can <[email protected]>
AuthorDate: Thu Apr 2 11:30:25 2026 +0800
[AMORO-3928] Modify the optimizer to support obtaining tasks from each AMS
node for processing. (#3950)
* [Subtask]: Use a new configuration item to control whether master & slave
mode is enabled. #3845
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: add AmsAssignService to implement balanced bucket allocation
in master-slave mode. #3921
* [Subtask]: Modify DefaultTableService to be compatible with master-slave
mode #3923
* [Subtask]: In master-slave mode, each AMS should automatically senses the
optimizer. #3929
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
* [Subtask]: Optimize the logic for retrieving the AMS list from ZooKeeper
in master-slave mode. #3928
* This addresses the conflict issues with the latest main branch and
introduces a new solution for storing allocation information based on a
database.
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
* [Subtask]: Modify the optimizer to support obtaining tasks from each AMS
node for processing. #3928
* [Subtask]: Fixed a legacy bug that could cause unit tests to fail during
compilation. #3928
* [Subtask]: Optimized based on CR feedback. #3928
* [Subtask]: Fixed a legacy bug that could cause unit tests to fail during
compilation. #3928
* Revert "[Subtask]: Fixed a legacy bug that could cause unit tests to fail
during compilation. #3928"
This reverts commit d54c126fdc32a77b4d1a5f8cf648cb3bbbca4a65.
* [Subtask]: Optimized based on CR feedback. #3928
* [Subtask]: Optimized based on CR feedback. #3928
* [Subtask]: Optimized based on CR feedback. #3928
---------
Co-authored-by: wardli <[email protected]>
---
.../apache/amoro/server/AmoroServiceContainer.java | 3 +-
.../org/apache/amoro/server/BucketAssignStore.java | 9 +
.../amoro/server/BucketAssignStoreFactory.java | 3 +-
.../apache/amoro/server/DBBucketAssignStore.java | 30 +-
.../amoro/server/DefaultOptimizingService.java | 35 ++
.../apache/amoro/server/ZkBucketAssignStore.java | 29 ++
.../ha/DataBaseHighAvailabilityContainer.java | 132 ++++--
.../amoro/server/ha/HighAvailabilityContainer.java | 11 +-
.../server/ha/NoopHighAvailabilityContainer.java | 5 +
.../server/ha/ZkHighAvailabilityContainer.java | 11 +
.../server/manager/AbstractOptimizerContainer.java | 7 +
.../server/persistence/BucketAssignmentMeta.java | 26 ++
.../persistence/mapper/BucketAssignMapper.java | 23 +-
.../amoro/server/table/DefaultTableService.java | 4 +-
.../src/main/resources/derby/ams-derby-init.sql | 1 +
.../src/main/resources/mysql/ams-mysql-init.sql | 1 +
amoro-ams/src/main/resources/mysql/upgrade.sql | 6 +-
.../main/resources/postgres/ams-postgres-init.sql | 1 +
amoro-ams/src/main/resources/postgres/upgrade.sql | 7 +-
.../apache/amoro/server/TestAmsAssignService.java | 9 +
.../server/TestInternalIcebergCatalogService.java | 6 +-
.../org/apache/amoro/api/OptimizingService.java | 472 +++++++++++++++++++++
.../java/org/apache/amoro/OptimizerProperties.java | 1 +
.../java/org/apache/amoro/client/AmsThriftUrl.java | 91 ++++
.../org/apache/amoro/client/ZookeeperService.java | 5 +
.../main/thrift/amoro_optimizing_service.thrift | 3 +
.../apache/amoro/MockAmoroManagementServer.java | 5 +
.../common/AbstractOptimizerOperator.java | 174 +++++++-
.../amoro/optimizer/common/AmsNodeManager.java | 116 +++++
.../amoro/optimizer/common/OptimizerConfig.java | 15 +
.../amoro/optimizer/common/OptimizerExecutor.java | 143 +++++--
.../amoro/optimizer/common/OptimizerToucher.java | 2 +-
.../optimizer/common/ThriftAmsNodeManager.java | 110 +++++
33 files changed, 1390 insertions(+), 106 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
index c8144f714..99108c5ff 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java
@@ -277,7 +277,8 @@ public class AmoroServiceContainer {
serviceConfig, catalogManager, defaultRuntimeFactory, haContainer,
bucketAssignStore);
processService = new ProcessService(tableService, actionCoordinators,
executeEngineManager);
optimizingService =
- new DefaultOptimizingService(serviceConfig, catalogManager,
optimizerManager, tableService);
+ new DefaultOptimizingService(
+ serviceConfig, catalogManager, optimizerManager, tableService,
bucketAssignStore);
LOG.info("Setting up AMS table executors...");
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
index 8db61265b..b85751bf8 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStore.java
@@ -65,6 +65,15 @@ public interface BucketAssignStore {
*/
Map<AmsServerInfo, List<String>> getAllAssignments() throws
BucketAssignStoreException;
+ /**
+ * Get all alive AMS nodes that have bucket assignments. Used by optimizers
in master-slave mode
+ * to discover all AMS optimizing endpoints.
+ *
+ * @return List of AmsServerInfo for all nodes with bucket assignments,
empty list if none
+ * @throws BucketAssignStoreException If retrieval operation fails
+ */
+ List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException;
+
/**
* Get the last update time for a node's assignments.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
index 12a494450..18f6d00e3 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/BucketAssignStoreFactory.java
@@ -63,8 +63,9 @@ public final class BucketAssignStoreFactory {
"Cannot create ZkBucketAssignStore: ZK client not available or
invalid container type");
case AmoroManagementConf.HA_TYPE_DATABASE:
+ long nodeHeartbeatTtlMs =
conf.get(AmoroManagementConf.HA_LEASE_TTL).toMillis();
LOG.info("Creating DBBucketAssignStore for cluster: {}", clusterName);
- return new DBBucketAssignStore(clusterName);
+ return new DBBucketAssignStore(clusterName, nodeHeartbeatTtlMs);
default:
throw new IllegalArgumentException(
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
index b044f7b6c..0727e850c 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/DBBucketAssignStore.java
@@ -45,9 +45,11 @@ public class DBBucketAssignStore extends PersistentBase
implements BucketAssignS
new TypeReference<List<String>>() {};
private final String clusterName;
+ private final long nodeHeartbeatTtlMs;
- public DBBucketAssignStore(String clusterName) {
+ public DBBucketAssignStore(String clusterName, long nodeHeartbeatTtlMs) {
this.clusterName = clusterName;
+ this.nodeHeartbeatTtlMs = nodeHeartbeatTtlMs;
}
@Override
@@ -180,6 +182,32 @@ public class DBBucketAssignStore extends PersistentBase
implements BucketAssignS
}
}
+ @Override
+ public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException
{
+ try {
+ long cutoff = System.currentTimeMillis() - nodeHeartbeatTtlMs;
+ List<BucketAssignmentMeta> rows =
+ getAs(BucketAssignMapper.class, mapper ->
mapper.selectAllByCluster(clusterName));
+ List<AmsServerInfo> nodes = new ArrayList<>();
+ for (BucketAssignmentMeta meta : rows) {
+ Long heartbeatTs = meta.getNodeHeartbeatTs();
+ if (heartbeatTs == null || heartbeatTs < cutoff) {
+ LOG.debug(
+ "Skipping stale node key={}, node_heartbeat_ts={}",
meta.getNodeKey(), heartbeatTs);
+ continue;
+ }
+ AmsServerInfo nodeInfo = parseNodeInfo(meta);
+ if (nodeInfo.getThriftBindPort() != null &&
nodeInfo.getThriftBindPort() > 0) {
+ nodes.add(nodeInfo);
+ }
+ }
+ return nodes;
+ } catch (Exception e) {
+ LOG.error("Failed to get alive nodes", e);
+ throw new BucketAssignStoreException("Failed to get alive nodes", e);
+ }
+ }
+
private static String getNodeKey(AmsServerInfo nodeInfo) {
return nodeInfo.getHost() + ":" + nodeInfo.getThriftBindPort();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
index 73ee0dda8..66734682e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java
@@ -26,6 +26,7 @@ import org.apache.amoro.api.OptimizingService;
import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.api.OptimizingTaskResult;
+import org.apache.amoro.client.AmsServerInfo;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.exception.ForbiddenException;
@@ -66,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -116,12 +118,22 @@ public class DefaultOptimizingService extends
StatedPersistentBase
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
+ private final BucketAssignStore bucketAssignStore;
public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
OptimizerManager optimizerManager,
TableService tableService) {
+ this(serviceConfig, catalogManager, optimizerManager, tableService, null);
+ }
+
+ public DefaultOptimizingService(
+ Configurations serviceConfig,
+ CatalogManager catalogManager,
+ OptimizerManager optimizerManager,
+ TableService tableService,
+ BucketAssignStore bucketAssignStore) {
this.optimizerTouchTimeout =
serviceConfig.getDurationInMillis(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout =
@@ -144,6 +156,7 @@ public class DefaultOptimizingService extends
StatedPersistentBase
this.tableService = tableService;
this.catalogManager = catalogManager;
this.optimizerManager = optimizerManager;
+ this.bucketAssignStore = bucketAssignStore;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
@@ -322,6 +335,28 @@ public class DefaultOptimizingService extends
StatedPersistentBase
return true;
}
+ @Override
+ public List<String> getOptimizingNodeUrls() {
+ if (bucketAssignStore == null) {
+ return Collections.emptyList();
+ }
+ try {
+ List<AmsServerInfo> nodes = bucketAssignStore.getAliveNodes();
+ List<String> urls = new ArrayList<>(nodes.size());
+ for (AmsServerInfo node : nodes) {
+ if (node.getHost() != null
+ && node.getThriftBindPort() != null
+ && node.getThriftBindPort() > 0) {
+ urls.add(String.format("thrift://%s:%d", node.getHost(),
node.getThriftBindPort()));
+ }
+ }
+ return urls;
+ } catch (Exception e) {
+ LOG.warn("Failed to get optimizing node URLs from bucket assign store",
e);
+ return Collections.emptyList();
+ }
+ }
+
/**
* Get optimizing queue.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
index 9ece14aa4..ed498c3e3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/ZkBucketAssignStore.java
@@ -170,6 +170,35 @@ public class ZkBucketAssignStore implements
BucketAssignStore {
return allAssignments;
}
+ @Override
+ public List<AmsServerInfo> getAliveNodes() throws BucketAssignStoreException
{
+ List<AmsServerInfo> nodes = new ArrayList<>();
+ try {
+ if (zkClient.checkExists().forPath(assignmentsBasePath) == null) {
+ return nodes;
+ }
+ List<String> nodeKeys =
zkClient.getChildren().forPath(assignmentsBasePath);
+ for (String nodeKey : nodeKeys) {
+ try {
+ AmsServerInfo nodeInfo = parseNodeKey(nodeKey);
+ if (nodeInfo.getThriftBindPort() != null &&
nodeInfo.getThriftBindPort() > 0) {
+ nodes.add(nodeInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to parse node key: {}", nodeKey, e);
+ }
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // path doesn't exist
+ } catch (BucketAssignStoreException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to get alive nodes", e);
+ throw new BucketAssignStoreException("Failed to get alive nodes", e);
+ }
+ return nodes;
+ }
+
@Override
public long getLastUpdateTime(AmsServerInfo nodeInfo) throws
BucketAssignStoreException {
String nodeKey = getNodeKey(nodeInfo);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
index 4edcd74fa..7d6b6b37e 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java
@@ -21,8 +21,10 @@ package org.apache.amoro.server.ha;
import org.apache.amoro.client.AmsServerInfo;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.persistence.BucketAssignmentMeta;
import org.apache.amoro.server.persistence.HaLeaseMeta;
import org.apache.amoro.server.persistence.PersistentBase;
+import org.apache.amoro.server.persistence.mapper.BucketAssignMapper;
import org.apache.amoro.server.persistence.mapper.HaLeaseMapper;
import org.apache.amoro.utils.JacksonUtil;
import org.slf4j.Logger;
@@ -144,29 +146,50 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
LOG.debug("Master-slave mode is not enabled, skip node registration");
return;
}
- // In master-slave mode, register node to database by writing
OPTIMIZING_SERVICE info
- // This is similar to ZK mode registering ephemeral nodes
+ // Register this node in bucket_assignments so that all nodes can be
discovered via
+ // getAliveNodes(). ha_lease has PK (cluster_name, service_name) and
cannot store multiple
+ // nodes for the same service, so we use the per-node bucket_assignments
table instead.
+ upsertNodeHeartbeat();
+ LOG.info(
+ "Registered AMS node to bucket_assignments: nodeKey={},
optimizingService={}",
+ getNodeKey(),
+ optimizingServiceServerInfo);
+ }
+
+ /** Returns nodeKey used as the bucket_assignments row identifier:
host:optimizingPort. */
+ private String getNodeKey() {
+ return optimizingServiceServerInfo.getHost()
+ + ":"
+ + optimizingServiceServerInfo.getThriftBindPort();
+ }
+
+ /**
+ * Upsert this node's heartbeat row into bucket_assignments. Only updates
node_heartbeat_ts (and
+ * server_info_json on first insert); never touches assignments_json so the
leader's assignments
+ * are preserved.
+ */
+ private void upsertNodeHeartbeat() {
long now = System.currentTimeMillis();
- String optimizingInfoJson =
JacksonUtil.toJSONString(optimizingServiceServerInfo);
+ String nodeKey = getNodeKey();
+ String serverInfoJson =
JacksonUtil.toJSONString(optimizingServiceServerInfo);
try {
- doAsIgnoreError(
- HaLeaseMapper.class,
- mapper -> {
- int updated =
- mapper.updateServerInfo(
- clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
- if (updated == 0) {
- mapper.insertServerInfoIfAbsent(
- clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp,
optimizingInfoJson, now);
- }
- });
- LOG.info(
- "Registered AMS node to database: nodeId={}, optimizingService={}",
- nodeId,
- optimizingServiceServerInfo);
+ int updated =
+ updateAs(
+ BucketAssignMapper.class,
+ mapper -> mapper.updateNodeHeartbeat(clusterName, nodeKey,
now))
+ .intValue();
+ if (updated == 0) {
+ // First registration: insert a new row with empty assignments
+ doAsIgnoreError(
+ BucketAssignMapper.class,
+ mapper ->
+ mapper.insert(
+ new BucketAssignmentMeta(
+ clusterName, nodeKey, serverInfoJson, null, now,
now)));
+ }
} catch (Exception e) {
- LOG.error("Failed to register node to database", e);
- throw e;
+ LOG.error("Failed to upsert node heartbeat for nodeKey={}", nodeKey, e);
+ throw new RuntimeException("Failed to register node in
bucket_assignments", e);
}
}
@@ -180,7 +203,12 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
return tableServiceServerInfo;
}
- /** Closes the heartbeat executor safely. */
+ @Override
+ public AmsServerInfo getOptimizingServiceServerInfo() {
+ return optimizingServiceServerInfo;
+ }
+
+ /** Closes the heartbeat executor safely and removes this node's
registration row. */
@Override
public void close() {
try {
@@ -190,6 +218,18 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
} catch (Exception e) {
LOG.error("Close Database HighAvailabilityContainer failed", e);
}
+ // Remove this node from bucket_assignments so the leader immediately
stops seeing it
+ // as alive without waiting for the heartbeat TTL to expire.
+ boolean isMasterSlaveMode =
serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
+ if (isMasterSlaveMode) {
+ try {
+ String nodeKey = getNodeKey();
+ doAs(BucketAssignMapper.class, mapper ->
mapper.deleteByNode(clusterName, nodeKey));
+ LOG.info("Unregistered AMS node from bucket_assignments: nodeKey={}",
nodeKey);
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister node from bucket_assignments on close",
e);
+ }
+ }
}
private class HeartbeatRunnable implements Runnable {
@@ -204,6 +244,14 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
return;
}
+ // Each node independently refreshes its own heartbeat in
bucket_assignments so that
+ // getAliveNodes() can correctly detect liveness without relying on
ha_lease.
+ try {
+ upsertNodeHeartbeat();
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh node heartbeat in bucket_assignments",
e);
+ }
+
if (!isLeader.get()) {
// First attempt to acquire the lease (similar to candidate/await)
boolean success = tryAcquireLease(newExpireTs, now);
@@ -353,29 +401,35 @@ public class DataBaseHighAvailabilityContainer extends
PersistentBase
LOG.warn("Only leader node can get alive nodes list");
return aliveNodes;
}
+ // Read alive nodes from bucket_assignments keyed by node_heartbeat_ts.
ha_lease has
+ // PK (cluster_name, service_name) which only allows one row per service
and cannot
+ // represent multiple AMS nodes. bucket_assignments has PK (cluster_name,
node_key) and
+ // stores one row per node; node_heartbeat_ts is updated exclusively by
the owning node
+ // so the leader's refreshLastUpdateTime calls cannot mask a dead node's
staleness.
try {
- long currentTime = System.currentTimeMillis();
- List<HaLeaseMeta> leases =
- getAs(
- HaLeaseMapper.class,
- mapper -> mapper.selectLeasesByService(clusterName,
OPTIMIZING_SERVICE));
- for (HaLeaseMeta lease : leases) {
- // Only include nodes with valid (non-expired) leases
- if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() >
currentTime) {
- if (lease.getServerInfoJson() != null &&
!lease.getServerInfoJson().isEmpty()) {
- try {
- AmsServerInfo nodeInfo =
- JacksonUtil.parseObject(lease.getServerInfoJson(),
AmsServerInfo.class);
- aliveNodes.add(nodeInfo);
- } catch (Exception e) {
- LOG.warn("Failed to parse server info for node {}",
lease.getNodeId(), e);
- }
+ long cutoff = System.currentTimeMillis() -
TimeUnit.SECONDS.toMillis(ttlSeconds);
+ List<BucketAssignmentMeta> rows =
+ getAs(BucketAssignMapper.class, mapper ->
mapper.selectAllByCluster(clusterName));
+ for (BucketAssignmentMeta meta : rows) {
+ Long heartbeatTs = meta.getNodeHeartbeatTs();
+ if (heartbeatTs == null || heartbeatTs < cutoff) {
+ LOG.debug(
+ "Skipping stale node key={}, node_heartbeat_ts={}",
meta.getNodeKey(), heartbeatTs);
+ continue;
+ }
+ if (meta.getServerInfoJson() != null &&
!meta.getServerInfoJson().isEmpty()) {
+ try {
+ AmsServerInfo nodeInfo =
+ JacksonUtil.parseObject(meta.getServerInfoJson(),
AmsServerInfo.class);
+ aliveNodes.add(nodeInfo);
+ } catch (Exception e) {
+ LOG.warn("Failed to parse server_info_json for node {}",
meta.getNodeKey(), e);
}
}
}
} catch (Exception e) {
- LOG.error("Failed to get alive nodes from database", e);
- throw e;
+ LOG.error("Failed to get alive nodes from bucket_assignments", e);
+ throw new RuntimeException("Failed to get alive nodes", e);
}
return aliveNodes;
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
index 1afeb73f5..30d01a606 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java
@@ -70,9 +70,18 @@ public interface HighAvailabilityContainer {
boolean hasLeadership();
/**
- * Get current AMS node information.
+ * Get current AMS node's table service server info (host + table-service
Thrift port).
*
* @return {@link AmsServerInfo}
*/
AmsServerInfo getTableServiceServerInfo();
+
+ /**
+ * Get current AMS node's optimizing service server info (host + optimizing
Thrift port). This
+ * must be consistent with the {@link AmsServerInfo} written into {@link
+ * org.apache.amoro.server.BucketAssignStore} so that bucket lookups can
match the stored nodeKey.
+ *
+ * @return {@link AmsServerInfo}
+ */
+ AmsServerInfo getOptimizingServiceServerInfo();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
index 4638e5a17..48282f2a9 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java
@@ -66,4 +66,9 @@ public class NoopHighAvailabilityContainer implements
HighAvailabilityContainer
public AmsServerInfo getTableServiceServerInfo() {
return null;
}
+
+ @Override
+ public AmsServerInfo getOptimizingServiceServerInfo() {
+ return null;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
index 0aa7b6e35..5c93a6777 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/ha/ZkHighAvailabilityContainer.java
@@ -294,6 +294,17 @@ public class ZkHighAvailabilityContainer implements
HighAvailabilityContainer, L
return tableServiceServerInfo;
}
+ /**
+ * Get the current node's optimizing service server info. ZK-registered node
data uses this info,
+ * so bucket-assignment lookups must use the same port.
+ *
+ * @return The current node's optimizing service server info, null if HA is
not enabled
+ */
+ @Override
+ public AmsServerInfo getOptimizingServiceServerInfo() {
+ return optimizingServiceServerInfo;
+ }
+
/**
* Get the ZooKeeper client. This is used for creating BucketAssignStore.
*
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
index 16628406d..05745c79b 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractOptimizerContainer.java
@@ -136,6 +136,13 @@ public abstract class AbstractOptimizerContainer
implements InternalResourceCont
if (StringUtils.isNotEmpty(resource.getResourceId())) {
stringBuilder.append(" -id ").append(resource.getResourceId());
}
+ // Add master-slave mode flag if enabled
+ if (containerProperties != null
+ && "true"
+ .equalsIgnoreCase(
+
containerProperties.get(OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE_ENABLED)))
{
+ stringBuilder.append(" -msm");
+ }
return stringBuilder.toString();
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
index 4f6ab7fb4..5cdb59b90 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/BucketAssignmentMeta.java
@@ -28,6 +28,8 @@ public class BucketAssignmentMeta {
private String serverInfoJson;
private String assignmentsJson;
private Long lastUpdateTime;
+ /** Per-node heartbeat timestamp. Updated only by the owning node, never by
the leader. */
+ private Long nodeHeartbeatTs;
public BucketAssignmentMeta() {}
@@ -42,6 +44,22 @@ public class BucketAssignmentMeta {
this.serverInfoJson = serverInfoJson;
this.assignmentsJson = assignmentsJson;
this.lastUpdateTime = lastUpdateTime;
+ this.nodeHeartbeatTs = lastUpdateTime;
+ }
+
+ public BucketAssignmentMeta(
+ String clusterName,
+ String nodeKey,
+ String serverInfoJson,
+ String assignmentsJson,
+ Long lastUpdateTime,
+ Long nodeHeartbeatTs) {
+ this.clusterName = clusterName;
+ this.nodeKey = nodeKey;
+ this.serverInfoJson = serverInfoJson;
+ this.assignmentsJson = assignmentsJson;
+ this.lastUpdateTime = lastUpdateTime;
+ this.nodeHeartbeatTs = nodeHeartbeatTs;
}
public String getClusterName() {
@@ -83,4 +101,12 @@ public class BucketAssignmentMeta {
public void setLastUpdateTime(Long lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}
+
+ public Long getNodeHeartbeatTs() {
+ return nodeHeartbeatTs;
+ }
+
+ public void setNodeHeartbeatTs(Long nodeHeartbeatTs) {
+ this.nodeHeartbeatTs = nodeHeartbeatTs;
+ }
}
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
index 55fab0175..e53a3a4ed 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/BucketAssignMapper.java
@@ -34,8 +34,8 @@ import java.util.List;
public interface BucketAssignMapper {
@Insert(
- "INSERT INTO bucket_assignments (cluster_name, node_key,
server_info_json, assignments_json, last_update_time) "
- + "VALUES (#{meta.clusterName}, #{meta.nodeKey},
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime})")
+ "INSERT INTO bucket_assignments (cluster_name, node_key,
server_info_json, assignments_json, last_update_time, node_heartbeat_ts) "
+ + "VALUES (#{meta.clusterName}, #{meta.nodeKey},
#{meta.serverInfoJson}, #{meta.assignmentsJson}, #{meta.lastUpdateTime},
#{meta.nodeHeartbeatTs})")
int insert(@Param("meta") BucketAssignmentMeta meta);
@Update(
@@ -56,8 +56,20 @@ public interface BucketAssignMapper {
@Param("nodeKey") String nodeKey,
@Param("lastUpdateTime") Long lastUpdateTime);
+ /**
+ * Update only the per-node heartbeat timestamp. Must be called only by the
owning node, never by
+ * the leader on behalf of other nodes.
+ */
+ @Update(
+ "UPDATE bucket_assignments SET node_heartbeat_ts = #{nodeHeartbeatTs} "
+ + "WHERE cluster_name = #{clusterName} AND node_key = #{nodeKey}")
+ int updateNodeHeartbeat(
+ @Param("clusterName") String clusterName,
+ @Param("nodeKey") String nodeKey,
+ @Param("nodeHeartbeatTs") Long nodeHeartbeatTs);
+
@Select(
- "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time "
+ "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time, node_heartbeat_ts "
+ "FROM bucket_assignments WHERE cluster_name = #{clusterName} AND
node_key = #{nodeKey}")
@Results(
id = "BucketAssignmentMetaMap",
@@ -66,13 +78,14 @@ public interface BucketAssignMapper {
@Result(column = "node_key", property = "nodeKey"),
@Result(column = "server_info_json", property = "serverInfoJson"),
@Result(column = "assignments_json", property = "assignmentsJson"),
- @Result(column = "last_update_time", property = "lastUpdateTime")
+ @Result(column = "last_update_time", property = "lastUpdateTime"),
+ @Result(column = "node_heartbeat_ts", property = "nodeHeartbeatTs")
})
BucketAssignmentMeta selectByNode(
@Param("clusterName") String clusterName, @Param("nodeKey") String
nodeKey);
@Select(
- "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time "
+ "SELECT cluster_name, node_key, server_info_json, assignments_json,
last_update_time, node_heartbeat_ts "
+ "FROM bucket_assignments WHERE cluster_name = #{clusterName}")
@ResultMap("BucketAssignmentMetaMap")
List<BucketAssignmentMeta> selectAllByCluster(@Param("clusterName") String
clusterName);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 27acb89d1..c64b7d923 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -349,7 +349,9 @@ public class DefaultTableService extends PersistentBase
implements TableService
return;
}
try {
- AmsServerInfo currentServerInfo =
haContainer.getTableServiceServerInfo();
+ // Must use optimizingServiceServerInfo because AmsAssignService stores
bucket assignments
+ // keyed by host:optimizingPort (from haContainer.getAliveNodes()), not
host:tableServicePort.
+ AmsServerInfo currentServerInfo =
haContainer.getOptimizingServiceServerInfo();
if (currentServerInfo == null) {
LOG.warn("Cannot get current server info, skip updating assigned
bucketIds");
return;
diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
index 6f4be2ae5..32d663288 100644
--- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql
+++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql
@@ -272,5 +272,6 @@ CREATE TABLE bucket_assignments (
server_info_json VARCHAR(32672),
assignments_json VARCHAR(32672),
last_update_time BIGINT NOT NULL DEFAULT 0,
+ node_heartbeat_ts BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (cluster_name, node_key)
);
\ No newline at end of file
diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
index e9fd1a244..f202695c8 100644
--- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
+++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql
@@ -288,5 +288,6 @@ CREATE TABLE IF NOT EXISTS bucket_assignments (
server_info_json TEXT NULL COMMENT 'JSON encoded AmsServerInfo',
assignments_json TEXT NULL COMMENT 'JSON array of bucket IDs',
last_update_time BIGINT NOT NULL DEFAULT 0 COMMENT 'Last update
timestamp (ms since epoch)',
+ node_heartbeat_ts BIGINT NOT NULL DEFAULT 0 COMMENT 'Per-node
heartbeat timestamp updated only by the owning node (ms since epoch)',
PRIMARY KEY (cluster_name, node_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS
node for master-slave mode';
diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql
b/amoro-ams/src/main/resources/mysql/upgrade.sql
index dd3c85cb2..c67807751 100644
--- a/amoro-ams/src/main/resources/mysql/upgrade.sql
+++ b/amoro-ams/src/main/resources/mysql/upgrade.sql
@@ -164,12 +164,14 @@ ADD COLUMN `process_parameters` mediumtext COMMENT 'Table
process parameters';
-- ADD table bucket_assignments for storing assigned info
CREATE TABLE IF NOT EXISTS bucket_assignments (
- cluster_name
VARCHAR(64) NOT NULL COMMENT 'AMS cluster name',
+ cluster_name VARCHAR(64) NOT NULL COMMENT 'AMS cluster name',
node_key VARCHAR(256) NOT NULL COMMENT 'Node key
(host:thriftBindPort)',
server_info_json TEXT NULL COMMENT 'JSON encoded AmsServerInfo',
assignments_json TEXT NULL COMMENT 'JSON array of bucket IDs',
last_update_time BIGINT NOT NULL DEFAULT 0 COMMENT 'Last update
timestamp (ms since epoch)',
PRIMARY KEY (cluster_name, node_key)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per
AMS node for master-slave mode';
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Bucket ID assignments per AMS
node for master-slave mode';
+-- ADD node_heartbeat_ts to table bucket_assignments
+ALTER TABLE `bucket_assignments` ADD COLUMN `node_heartbeat_ts` BIGINT NOT
NULL DEFAULT 0 COMMENT 'Per-node heartbeat timestamp updated only by the owning
node (ms since epoch)';
diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
index eeb1348bb..245e37744 100644
--- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
+++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql
@@ -462,6 +462,7 @@ CREATE TABLE IF NOT EXISTS bucket_assignments (
server_info_json TEXT NULL,
assignments_json TEXT NULL,
last_update_time BIGINT NOT NULL DEFAULT 0,
+ node_heartbeat_ts BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (cluster_name, node_key)
);
diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql
b/amoro-ams/src/main/resources/postgres/upgrade.sql
index 5797d704e..8f26023f0 100644
--- a/amoro-ams/src/main/resources/postgres/upgrade.sql
+++ b/amoro-ams/src/main/resources/postgres/upgrade.sql
@@ -224,12 +224,15 @@ ADD COLUMN process_parameters text;
-- ADD table bucket_assignments for storing assigned info
CREATE TABLE IF NOT EXISTS bucket_assignments (
- cluster_name
VARCHAR(64) NOT NULL,
+ cluster_name VARCHAR(64) NOT NULL,
node_key VARCHAR(256) NOT NULL,
server_info_json TEXT NULL,
assignments_json TEXT NULL,
last_update_time BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (cluster_name, node_key)
- );
+);
CREATE INDEX IF NOT EXISTS idx_bucket_assignments_cluster ON
bucket_assignments (cluster_name);
+
+-- ADD node_heartbeat_ts to table bucket_assignments
+ALTER TABLE bucket_assignments ADD COLUMN node_heartbeat_ts BIGINT NOT NULL
DEFAULT 0;
\ No newline at end of file
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
index 4b283c668..e441a6d1f 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestAmsAssignService.java
@@ -872,6 +872,15 @@ public class TestAmsAssignService {
lastUpdateTimes.put(nodeKey, System.currentTimeMillis());
}
+ @Override
+ public List<AmsServerInfo> getAliveNodes() throws
BucketAssignStoreException {
+ List<AmsServerInfo> nodes = new ArrayList<>();
+ for (String nodeKey : assignments.keySet()) {
+ nodes.add(nodeInfoMap.getOrDefault(nodeKey, parseNodeKey(nodeKey)));
+ }
+ return nodes;
+ }
+
private AmsServerInfo parseNodeKey(String nodeKey) {
String[] parts = nodeKey.split(":");
if (parts.length != 2) {
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
index 936b54c5b..19d4ace8a 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/TestInternalIcebergCatalogService.java
@@ -76,7 +76,11 @@ public class TestInternalIcebergCatalogService extends
RestCatalogServiceTestBas
meta.putToCatalogProperties("cache-enabled", "false");
meta.putToCatalogProperties("cache.expiration-interval-ms", "10000");
catalogManager.updateCatalog(meta);
- String warehouseInAMS =
meta.getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE);
+ // Force a cache reload after invalidation to prevent the background
catalog-scan task from
+ // overwriting the cache with a stale DB snapshot it read before the
update completed.
+ CatalogMeta updatedMeta = catalogManager.getCatalogMeta(catalogName());
+ String warehouseInAMS =
+
updatedMeta.getCatalogProperties().get(CatalogMetaProperties.KEY_WAREHOUSE);
Map<String, String> clientSideConfiguration = Maps.newHashMap();
clientSideConfiguration.put("cache-enabled", "true");
diff --git
a/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
b/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
index 331e69388..a13600c51 100644
--- a/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
+++ b/amoro-common/src/main/gen-java/org/apache/amoro/api/OptimizingService.java
@@ -26,6 +26,8 @@ public class OptimizingService {
public boolean cancelProcess(long processId) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+ public java.util.List<java.lang.String> getOptimizingNodeUrls() throws
org.apache.amoro.api.AmoroException,
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+
}
public interface AsyncIface {
@@ -44,6 +46,8 @@ public class OptimizingService {
public void cancelProcess(long processId,
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.lang.Boolean>
resultHandler) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+ public void
getOptimizingNodeUrls(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
resultHandler) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException;
+
}
public static class Client extends
org.apache.amoro.shade.thrift.org.apache.thrift.TServiceClient implements Iface
{
@@ -242,6 +246,32 @@ public class OptimizingService {
throw new
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.MISSING_RESULT,
"cancelProcess failed: unknown result");
}
+ @Override
+ public java.util.List<java.lang.String> getOptimizingNodeUrls() throws
org.apache.amoro.api.AmoroException,
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+ {
+ send_getOptimizingNodeUrls();
+ return recv_getOptimizingNodeUrls();
+ }
+
+ public void send_getOptimizingNodeUrls() throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+ {
+ getOptimizingNodeUrls_args args = new getOptimizingNodeUrls_args();
+ sendBase("getOptimizingNodeUrls", args);
+ }
+
+ public java.util.List<java.lang.String> recv_getOptimizingNodeUrls()
throws org.apache.amoro.api.AmoroException,
org.apache.amoro.shade.thrift.org.apache.thrift.TException
+ {
+ getOptimizingNodeUrls_result result = new getOptimizingNodeUrls_result();
+ receiveBase(result, "getOptimizingNodeUrls");
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ if (result.e1 != null) {
+ throw result.e1;
+ }
+ throw new
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.MISSING_RESULT,
"getOptimizingNodeUrls failed: unknown result");
+ }
+
}
public static class AsyncClient extends
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClient implements
AsyncIface {
public static class Factory implements
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClientFactory<AsyncClient>
{
@@ -519,6 +549,38 @@ public class OptimizingService {
}
}
+ @Override
+ public void
getOptimizingNodeUrls(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
resultHandler) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ checkReady();
+ getOptimizingNodeUrls_call method_call = new
getOptimizingNodeUrls_call(resultHandler, this, ___protocolFactory,
___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class getOptimizingNodeUrls_call extends
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncMethodCall<java.util.List<java.lang.String>>
{
+ public
getOptimizingNodeUrls_call(org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
resultHandler,
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncClient client,
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolFactory
protocolFactory,
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TNonblockingTransport
transport) throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ @Override
+ public void
write_args(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol
prot) throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ prot.writeMessageBegin(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessage("getOptimizingNodeUrls",
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.CALL,
0));
+ getOptimizingNodeUrls_args args = new getOptimizingNodeUrls_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ @Override
+ public java.util.List<java.lang.String> getResult() throws
org.apache.amoro.api.AmoroException,
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ if (getState() !=
org.apache.amoro.shade.thrift.org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ)
{
+ throw new java.lang.IllegalStateException("Method call not
finished!");
+ }
+
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TMemoryInputTransport
memoryTransport = new
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol
prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_getOptimizingNodeUrls();
+ }
+ }
+
}
public static class Processor<I extends Iface> extends
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseProcessor<I> implements
org.apache.amoro.shade.thrift.org.apache.thrift.TProcessor {
@@ -539,6 +601,7 @@ public class OptimizingService {
processMap.put("completeTask", new completeTask());
processMap.put("authenticate", new authenticate());
processMap.put("cancelProcess", new cancelProcess());
+ processMap.put("getOptimizingNodeUrls", new getOptimizingNodeUrls());
return processMap;
}
@@ -759,6 +822,38 @@ public class OptimizingService {
}
}
+ public static class getOptimizingNodeUrls<I extends Iface> extends
org.apache.amoro.shade.thrift.org.apache.thrift.ProcessFunction<I,
getOptimizingNodeUrls_args> {
+ public getOptimizingNodeUrls() {
+ super("getOptimizingNodeUrls");
+ }
+
+ @Override
+ public getOptimizingNodeUrls_args getEmptyArgsInstance() {
+ return new getOptimizingNodeUrls_args();
+ }
+
+ @Override
+ protected boolean isOneway() {
+ return false;
+ }
+
+ @Override
+ protected boolean rethrowUnhandledExceptions() {
+ return false;
+ }
+
+ @Override
+ public getOptimizingNodeUrls_result getResult(I iface,
getOptimizingNodeUrls_args args) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ getOptimizingNodeUrls_result result = new
getOptimizingNodeUrls_result();
+ try {
+ result.success = iface.getOptimizingNodeUrls();
+ } catch (org.apache.amoro.api.AmoroException e1) {
+ result.e1 = e1;
+ }
+ return result;
+ }
+ }
+
}
public static class AsyncProcessor<I extends AsyncIface> extends
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseAsyncProcessor<I> {
@@ -779,6 +874,7 @@ public class OptimizingService {
processMap.put("completeTask", new completeTask());
processMap.put("authenticate", new authenticate());
processMap.put("cancelProcess", new cancelProcess());
+ processMap.put("getOptimizingNodeUrls", new getOptimizingNodeUrls());
return processMap;
}
@@ -1268,6 +1364,77 @@ public class OptimizingService {
}
}
+ public static class getOptimizingNodeUrls<I extends AsyncIface> extends
org.apache.amoro.shade.thrift.org.apache.thrift.AsyncProcessFunction<I,
getOptimizingNodeUrls_args, java.util.List<java.lang.String>> {
+ public getOptimizingNodeUrls() {
+ super("getOptimizingNodeUrls");
+ }
+
+ @Override
+ public getOptimizingNodeUrls_args getEmptyArgsInstance() {
+ return new getOptimizingNodeUrls_args();
+ }
+
+ @Override
+ public
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
getResultHandler(final
org.apache.amoro.shade.thrift.org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer
fb, final int seqid) {
+ final
org.apache.amoro.shade.thrift.org.apache.thrift.AsyncProcessFunction fcall =
this;
+ return new
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>()
{
+ @Override
+ public void onComplete(java.util.List<java.lang.String> o) {
+ getOptimizingNodeUrls_result result = new
getOptimizingNodeUrls_result();
+ result.success = o;
+ try {
+ fcall.sendResponse(fb, result,
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.REPLY,
seqid);
+ } catch
(org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException
e) {
+ _LOGGER.error("TTransportException writing to internal frame
buffer", e);
+ fb.close();
+ } catch (java.lang.Exception e) {
+ _LOGGER.error("Exception writing to internal frame buffer", e);
+ onError(e);
+ }
+ }
+ @Override
+ public void onError(java.lang.Exception e) {
+ byte msgType =
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.amoro.shade.thrift.org.apache.thrift.TSerializable msg;
+ getOptimizingNodeUrls_result result = new
getOptimizingNodeUrls_result();
+ if (e instanceof org.apache.amoro.api.AmoroException) {
+ result.e1 = (org.apache.amoro.api.AmoroException) e;
+ result.setE1IsSet(true);
+ msg = result;
+ } else if (e instanceof
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException) {
+ _LOGGER.error("TTransportException inside handler", e);
+ fb.close();
+ return;
+ } else if (e instanceof
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException) {
+ _LOGGER.error("TApplicationException inside handler", e);
+ msgType =
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg =
(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException) e;
+ } else {
+ _LOGGER.error("Exception inside handler", e);
+ msgType =
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = new
org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException(org.apache.amoro.shade.thrift.org.apache.thrift.TApplicationException.INTERNAL_ERROR,
e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb, msg, msgType, seqid);
+ } catch (java.lang.Exception ex) {
+ _LOGGER.error("Exception writing to internal frame buffer", ex);
+ fb.close();
+ }
+ }
+ };
+ }
+
+ @Override
+ protected boolean isOneway() {
+ return false;
+ }
+
+ @Override
+ public void start(I iface, getOptimizingNodeUrls_args args,
org.apache.amoro.shade.thrift.org.apache.thrift.async.AsyncMethodCallback<java.util.List<java.lang.String>>
resultHandler) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ iface.getOptimizingNodeUrls(resultHandler);
+ }
+ }
+
}
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
@@ -6985,4 +7152,309 @@ public class OptimizingService {
}
}
+ @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+ public static class getOptimizingNodeUrls_args implements
org.apache.amoro.shade.thrift.org.apache.thrift.TBase<getOptimizingNodeUrls_args,
getOptimizingNodeUrls_args._Fields>, java.io.Serializable, Cloneable,
Comparable<getOptimizingNodeUrls_args> {
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct STRUCT_DESC =
new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct("getOptimizingNodeUrls_args");
+
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new getOptimizingNodeUrls_argsStandardSchemeFactory();
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new getOptimizingNodeUrls_argsTupleSchemeFactory();
+
+ /** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
+ public enum _Fields implements
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldIdEnum {
+;
+ private static final java.util.Map<java.lang.String, _Fields> byName =
new java.util.HashMap<java.lang.String, _Fields>();
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default: return null;
+ }
+ }
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public static _Fields findByName(java.lang.String name) { return
byName.get(name); }
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+ _Fields(short thriftId, java.lang.String fieldName) { _thriftId =
thriftId; _fieldName = fieldName; }
+ @Override public short getThriftFieldId() { return _thriftId; }
+ @Override public java.lang.String getFieldName() { return _fieldName; }
+ }
+ public static final java.util.Map<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>
metaDataMap;
+ static {
+ java.util.Map<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> tmpMap
= new java.util.EnumMap<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getOptimizingNodeUrls_args.class,
metaDataMap);
+ }
+ public getOptimizingNodeUrls_args() {}
+ public getOptimizingNodeUrls_args(getOptimizingNodeUrls_args other) {}
+ @Override public getOptimizingNodeUrls_args deepCopy() { return new
getOptimizingNodeUrls_args(this); }
+ @Override public void clear() {}
+ @Override public void setFieldValue(_Fields field,
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
java.lang.Object value) { switch (field) {} }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ @Override public java.lang.Object getFieldValue(_Fields field) { switch
(field) {} throw new java.lang.IllegalStateException(); }
+ @Override public boolean isSet(_Fields field) { if (field == null) throw
new java.lang.IllegalArgumentException(); switch (field) {} throw new
java.lang.IllegalStateException(); }
+ @Override public boolean equals(java.lang.Object that) { if (that
instanceof getOptimizingNodeUrls_args) return
this.equals((getOptimizingNodeUrls_args)that); return false; }
+ public boolean equals(getOptimizingNodeUrls_args that) { if (that == null)
return false; if (this == that) return true; return true; }
+ @Override public int hashCode() { int hashCode = 1; return hashCode; }
+ @Override public int compareTo(getOptimizingNodeUrls_args other) { if
(!getClass().equals(other.getClass())) { return
getClass().getName().compareTo(other.getClass().getName()); } int
lastComparison = 0; return 0; }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ @Override public _Fields fieldForId(int fieldId) { return
_Fields.findByThriftId(fieldId); }
+ @Override public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot)
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
scheme(iprot).read(iprot, this); }
+ @Override public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot)
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
scheme(oprot).write(oprot, this); }
+ @Override public java.lang.String toString() { java.lang.StringBuilder sb
= new java.lang.StringBuilder("getOptimizingNodeUrls_args("); sb.append(")");
return sb.toString(); }
+ public void validate() throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {}
+ private void writeObject(java.io.ObjectOutputStream out) throws
java.io.IOException { try { write(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) {
throw new java.io.IOException(te); } }
+ private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, java.lang.ClassNotFoundException { try { read(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) {
throw new java.io.IOException(te); } }
+ private static class getOptimizingNodeUrls_argsStandardSchemeFactory
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
{
+ @Override public getOptimizingNodeUrls_argsStandardScheme getScheme() {
return new getOptimizingNodeUrls_argsStandardScheme(); }
+ }
+ private static class getOptimizingNodeUrls_argsStandardScheme extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme<getOptimizingNodeUrls_args>
{
+ @Override public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot,
getOptimizingNodeUrls_args struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField
schemeField;
+ iprot.readStructBegin();
+ while (true) { schemeField = iprot.readFieldBegin(); if
(schemeField.type ==
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STOP) { break; }
switch (schemeField.id) { default:
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type); } iprot.readFieldEnd(); }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+ @Override public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot,
getOptimizingNodeUrls_args struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ struct.validate();
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+ }
+ private static class getOptimizingNodeUrls_argsTupleSchemeFactory
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
{
+ @Override public getOptimizingNodeUrls_argsTupleScheme getScheme() {
return new getOptimizingNodeUrls_argsTupleScheme(); }
+ }
+ private static class getOptimizingNodeUrls_argsTupleScheme extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.TupleScheme<getOptimizingNodeUrls_args>
{
+ @Override public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot,
getOptimizingNodeUrls_args struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol oprot =
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
}
+ @Override public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot,
getOptimizingNodeUrls_args struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
}
+ }
+ private static <S extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.IScheme> S
scheme(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol
proto) {
+ return
(org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme())
? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
+ @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+ public static class getOptimizingNodeUrls_result implements
org.apache.amoro.shade.thrift.org.apache.thrift.TBase<getOptimizingNodeUrls_result,
getOptimizingNodeUrls_result._Fields>, java.io.Serializable, Cloneable,
Comparable<getOptimizingNodeUrls_result> {
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct STRUCT_DESC =
new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TStruct("getOptimizingNodeUrls_result");
+
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField
SUCCESS_FIELD_DESC = new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField("success",
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST, (short)0);
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField E1_FIELD_DESC =
new org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField("e1",
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT,
(short)1);
+
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new
getOptimizingNodeUrls_resultStandardSchemeFactory();
+ private static final
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new getOptimizingNodeUrls_resultTupleSchemeFactory();
+
+ public
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
java.util.List<java.lang.String> success;
+ public
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
org.apache.amoro.api.AmoroException e1;
+
+ public enum _Fields implements
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldIdEnum {
+ SUCCESS((short)0, "success"),
+ E1((short)1, "e1");
+ private static final java.util.Map<java.lang.String, _Fields> byName =
new java.util.HashMap<java.lang.String, _Fields>();
+ static { for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field); } }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public static _Fields findByThriftId(int fieldId) { switch(fieldId) {
case 0: return SUCCESS; case 1: return E1; default: return null; } }
+ public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields
fields = findByThriftId(fieldId); if (fields == null) throw new
java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields; }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public static _Fields findByName(java.lang.String name) { return
byName.get(name); }
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+ _Fields(short thriftId, java.lang.String fieldName) { _thriftId =
thriftId; _fieldName = fieldName; }
+ @Override public short getThriftFieldId() { return _thriftId; }
+ @Override public java.lang.String getFieldName() { return _fieldName; }
+ }
+
+ public static final java.util.Map<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>
metaDataMap;
+ static {
+ java.util.Map<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData> tmpMap
= new java.util.EnumMap<_Fields,
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUCCESS, new
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData("success",
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.ListMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST,
+ new
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldValueMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.E1, new
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData("e1",
org.apache.amoro.shade.thrift.org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.StructMetaData(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT,
org.apache.amoro.api.AmoroException.class)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+
org.apache.amoro.shade.thrift.org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getOptimizingNodeUrls_result.class,
metaDataMap);
+ }
+
+ public getOptimizingNodeUrls_result() {}
+ public getOptimizingNodeUrls_result(java.util.List<java.lang.String>
success, org.apache.amoro.api.AmoroException e1) { this(); this.success =
success; this.e1 = e1; }
+ public getOptimizingNodeUrls_result(getOptimizingNodeUrls_result other) {
+ if (other.isSetSuccess()) { this.success = new
java.util.ArrayList<java.lang.String>(other.success); }
+ if (other.isSetE1()) { this.e1 = new
org.apache.amoro.api.AmoroException(other.e1); }
+ }
+ @Override public getOptimizingNodeUrls_result deepCopy() { return new
getOptimizingNodeUrls_result(this); }
+ @Override public void clear() { this.success = null; this.e1 = null; }
+
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public java.util.List<java.lang.String> getSuccess() { return
this.success; }
+ public getOptimizingNodeUrls_result
setSuccess(@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
java.util.List<java.lang.String> success) { this.success = success; return
this; }
+ public void unsetSuccess() { this.success = null; }
+ public boolean isSetSuccess() { return this.success != null; }
+ public void setSuccessIsSet(boolean value) { if (!value) { this.success =
null; } }
+
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ public org.apache.amoro.api.AmoroException getE1() { return this.e1; }
+ public getOptimizingNodeUrls_result
setE1(@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
org.apache.amoro.api.AmoroException e1) { this.e1 = e1; return this; }
+ public void unsetE1() { this.e1 = null; }
+ public boolean isSetE1() { return this.e1 != null; }
+ public void setE1IsSet(boolean value) { if (!value) { this.e1 = null; } }
+
+ @Override
+ public void setFieldValue(_Fields field,
@org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
java.lang.Object value) {
+ switch (field) {
+ case SUCCESS: if (value == null) { unsetSuccess(); } else {
setSuccess((java.util.List<java.lang.String>)value); } break;
+ case E1: if (value == null) { unsetE1(); } else {
setE1((org.apache.amoro.api.AmoroException)value); } break;
+ }
+ }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ @Override
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) { case SUCCESS: return getSuccess(); case E1: return
getE1(); }
+ throw new java.lang.IllegalStateException();
+ }
+ @Override
+ public boolean isSet(_Fields field) {
+ if (field == null) { throw new java.lang.IllegalArgumentException(); }
+ switch (field) { case SUCCESS: return isSetSuccess(); case E1: return
isSetE1(); }
+ throw new java.lang.IllegalStateException();
+ }
+ @Override public boolean equals(java.lang.Object that) { if (that
instanceof getOptimizingNodeUrls_result) return
this.equals((getOptimizingNodeUrls_result)that); return false; }
+ public boolean equals(getOptimizingNodeUrls_result that) {
+ if (that == null) return false;
+ if (this == that) return true;
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) { if
(!(this_present_success && that_present_success)) return false; if
(!this.success.equals(that.success)) return false; }
+ boolean this_present_e1 = true && this.isSetE1();
+ boolean that_present_e1 = true && that.isSetE1();
+ if (this_present_e1 || that_present_e1) { if (!(this_present_e1 &&
that_present_e1)) return false; if (!this.e1.equals(that.e1)) return false; }
+ return true;
+ }
+ @Override public int hashCode() { int hashCode = 1; hashCode = hashCode *
8191 + ((isSetSuccess()) ? 131071 : 524287); if (isSetSuccess()) hashCode =
hashCode * 8191 + success.hashCode(); hashCode = hashCode * 8191 + ((isSetE1())
? 131071 : 524287); if (isSetE1()) hashCode = hashCode * 8191 + e1.hashCode();
return hashCode; }
+ @Override
+ public int compareTo(getOptimizingNodeUrls_result other) {
+ if (!getClass().equals(other.getClass())) { return
getClass().getName().compareTo(other.getClass().getName()); }
+ int lastComparison = 0;
+ lastComparison = java.lang.Boolean.compare(isSetSuccess(),
other.isSetSuccess()); if (lastComparison != 0) return lastComparison;
+ if (isSetSuccess()) { lastComparison =
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseHelper.compareTo(this.success,
other.success); if (lastComparison != 0) return lastComparison; }
+ lastComparison = java.lang.Boolean.compare(isSetE1(), other.isSetE1());
if (lastComparison != 0) return lastComparison;
+ if (isSetE1()) { lastComparison =
org.apache.amoro.shade.thrift.org.apache.thrift.TBaseHelper.compareTo(this.e1,
other.e1); if (lastComparison != 0) return lastComparison; }
+ return 0;
+ }
+ @org.apache.amoro.shade.thrift.org.apache.thrift.annotation.Nullable
+ @Override public _Fields fieldForId(int fieldId) { return
_Fields.findByThriftId(fieldId); }
+ @Override public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot)
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
scheme(iprot).read(iprot, this); }
+ public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot)
throws org.apache.amoro.shade.thrift.org.apache.thrift.TException {
scheme(oprot).write(oprot, this); }
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new
java.lang.StringBuilder("getOptimizingNodeUrls_result(");
+ boolean first = true;
+ sb.append("success:"); if (this.success == null) { sb.append("null"); }
else { sb.append(this.success); } first = false;
+ if (!first) sb.append(", "); sb.append("e1:"); if (this.e1 == null) {
sb.append("null"); } else { sb.append(this.e1); } first = false;
+ sb.append(")"); return sb.toString();
+ }
+ public void validate() throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {}
+ private void writeObject(java.io.ObjectOutputStream out) throws
java.io.IOException { try { write(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) {
throw new java.io.IOException(te); } }
+ private void readObject(java.io.ObjectInputStream in) throws
java.io.IOException, java.lang.ClassNotFoundException { try { read(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TCompactProtocol(new
org.apache.amoro.shade.thrift.org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.amoro.shade.thrift.org.apache.thrift.TException te) {
throw new java.io.IOException(te); } }
+
+ private static class getOptimizingNodeUrls_resultStandardSchemeFactory
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
{
+ @Override public getOptimizingNodeUrls_resultStandardScheme getScheme()
{ return new getOptimizingNodeUrls_resultStandardScheme(); }
+ }
+ private static class getOptimizingNodeUrls_resultStandardScheme extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme<getOptimizingNodeUrls_result>
{
+ @Override
+ public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol iprot,
getOptimizingNodeUrls_result struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TField
schemeField;
+ iprot.readStructBegin();
+ while (true) {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type ==
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STOP) { break; }
+ switch (schemeField.id) {
+ case 0: // SUCCESS
+ if (schemeField.type ==
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.LIST) {
+ org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TList
_list0 = iprot.readListBegin();
+ struct.success = new
java.util.ArrayList<java.lang.String>(_list0.size);
+ java.lang.String _elem0;
+ for (int _i0 = 0; _i0 < _list0.size; ++_i0) { _elem0 =
iprot.readString(); struct.success.add(_elem0); }
+ iprot.readListEnd();
+ struct.setSuccessIsSet(true);
+ } else {
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type); }
+ break;
+ case 1: // E1
+ if (schemeField.type ==
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRUCT) {
+ struct.e1 = new org.apache.amoro.api.AmoroException();
+ struct.e1.read(iprot);
+ struct.setE1IsSet(true);
+ } else {
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type); }
+ break;
+ default:
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ struct.validate();
+ }
+ @Override
+ public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol oprot,
getOptimizingNodeUrls_result struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+ struct.validate();
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.success != null) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ oprot.writeListBegin(new
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TList(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TType.STRING,
struct.success.size()));
+ for (java.lang.String _iter : struct.success) {
oprot.writeString(_iter); }
+ oprot.writeListEnd();
+ oprot.writeFieldEnd();
+ }
+ if (struct.e1 != null) {
+ oprot.writeFieldBegin(E1_FIELD_DESC);
+ struct.e1.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+ }
+ private static class getOptimizingNodeUrls_resultTupleSchemeFactory
implements org.apache.amoro.shade.thrift.org.apache.thrift.scheme.SchemeFactory
{
+ @Override public getOptimizingNodeUrls_resultTupleScheme getScheme() {
return new getOptimizingNodeUrls_resultTupleScheme(); }
+ }
+ private static class getOptimizingNodeUrls_resultTupleScheme extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.TupleScheme<getOptimizingNodeUrls_result>
{
+ @Override
+ public void
write(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot,
getOptimizingNodeUrls_result struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol oprot =
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetSuccess()) { optionals.set(0); }
+ if (struct.isSetE1()) { optionals.set(1); }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetSuccess()) {
+ oprot.writeI32(struct.success.size());
+ for (java.lang.String _iter : struct.success) {
oprot.writeString(_iter); }
+ }
+ if (struct.isSetE1()) { struct.e1.write(oprot); }
+ }
+ @Override
+ public void
read(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol prot,
getOptimizingNodeUrls_result struct) throws
org.apache.amoro.shade.thrift.org.apache.thrift.TException {
+
org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ int _size1 = iprot.readI32();
+ struct.success = new java.util.ArrayList<java.lang.String>(_size1);
+ for (int _i = 0; _i < _size1; ++_i) {
struct.success.add(iprot.readString()); }
+ struct.setSuccessIsSet(true);
+ }
+ if (incoming.get(1)) { struct.e1 = new
org.apache.amoro.api.AmoroException(); struct.e1.read(iprot);
struct.setE1IsSet(true); }
+ }
+ }
+ private static <S extends
org.apache.amoro.shade.thrift.org.apache.thrift.scheme.IScheme> S
scheme(org.apache.amoro.shade.thrift.org.apache.thrift.protocol.TProtocol
proto) {
+ return
(org.apache.amoro.shade.thrift.org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme())
? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
index a8d04a123..dd4a6f624 100644
--- a/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
+++ b/amoro-common/src/main/java/org/apache/amoro/OptimizerProperties.java
@@ -48,4 +48,5 @@ public class OptimizerProperties {
public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = "128mb";
public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final String OPTIMIZER_CACHE_TIMEOUT_DEFAULT = "10min";
+ public static final String OPTIMIZER_MASTER_SLAVE_MODE_ENABLED =
"master-slave-mode-enabled";
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
index 233fb6e69..860d5c6dc 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/AmsThriftUrl.java
@@ -32,6 +32,8 @@ import javax.security.auth.login.LoginException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -207,6 +209,95 @@ public class AmsThriftUrl {
}
}
+ public static List<AmsServerInfo> parseMasterSlaveAmsNodes(String url) {
+ if (url == null) {
+ throw new IllegalArgumentException("thrift url is null");
+ }
+ return parserZookeeperUrlListForMasterSlaveMode(url);
+ }
+
+ private static List<AmsServerInfo>
parserZookeeperUrlListForMasterSlaveMode(String url) {
+ if (!url.startsWith(ZOOKEEPER_FLAG)) {
+ throw new IllegalArgumentException(
+ "parseMasterSlaveAmsNodes only supports ZooKeeper URL format:
zookeeper://host:port/cluster");
+ }
+ String thriftUrl = url;
+ if (url.contains("?")) {
+ thriftUrl = url.substring(0, url.indexOf("?"));
+ }
+ Matcher m = PATTERN.matcher(thriftUrl);
+ if (!m.matches()) {
+ throw new RuntimeException(String.format("invalid ams url %s", url));
+ }
+ String zkServerAddress;
+ String cluster;
+ if (m.group(1).contains("/")) {
+ zkServerAddress = m.group(1).substring(0, m.group(1).indexOf("/"));
+ cluster = m.group(1).substring(m.group(1).indexOf("/") + 1);
+ } else {
+ zkServerAddress = m.group(1);
+ cluster = m.group(2);
+ }
+ List<AmsServerInfo> serverInfoList = new ArrayList<>();
+ int retryCount = 0;
+ while (retryCount < MAX_RETRIES) {
+ try {
+ ZookeeperService zkService =
ZookeeperService.getInstance(zkServerAddress);
+ String nodesPath = AmsHAProperties.getNodesPath(cluster);
+ List<String> nodePaths = zkService.getChildren(nodesPath);
+ for (String nodePath : nodePaths) {
+ try {
+ String fullPath = nodesPath + "/" + nodePath;
+ String nodeInfoJson = zkService.getData(fullPath);
+ if (nodeInfoJson != null && !nodeInfoJson.isEmpty()) {
+ AmsServerInfo nodeInfo = JacksonUtil.parseObject(nodeInfoJson,
AmsServerInfo.class);
+ serverInfoList.add(nodeInfo);
+ }
+ } catch (Exception e) {
+ logger.warn("Failed to get node info for path: {}", nodePath, e);
+ }
+ }
+ if (!serverInfoList.isEmpty()) {
+ logger.info("Found {} AMS nodes from ZooKeeper",
serverInfoList.size());
+ return serverInfoList;
+ } else {
+ logger.warn("No AMS nodes found in ZooKeeper path: {}", nodesPath);
+ return serverInfoList;
+ }
+ } catch (KeeperException.AuthFailedException authFailedException) {
+ // If kerberos authentication is not enabled on the zk,
+ // an error occurs when the thread carrying kerberos authentication
information accesses
+ // the zk.
+ // Therefore, clear the authentication information and try again
+ retryCount++;
+ logger.error(
+ String.format("Caught exception, retrying... (retry count: %s)",
retryCount),
+ authFailedException);
+ try {
+ Subject subject =
Subject.getSubject(java.security.AccessController.getContext());
+ if (subject != null) {
+ LoginContext loginContext = new LoginContext("", subject);
+ loginContext.logout();
+ }
+ } catch (LoginException e) {
+ logger.error("Failed to logout", e);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ logger.debug("Nodes path does not exist: {}",
AmsHAProperties.getNodesPath(cluster));
+ return serverInfoList;
+ } catch (Exception e) {
+ retryCount++;
+ logger.error(
+ String.format("Caught exception, retrying... (retry count: %s)",
retryCount), e);
+ if (retryCount >= MAX_RETRIES) {
+ throw new RuntimeException(
+ String.format("Failed to get AMS nodes from ZooKeeper for url
%s", url), e);
+ }
+ }
+ }
+ return serverInfoList;
+ }
+
public String schema() {
return schema;
}
diff --git
a/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
b/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
index ee683335d..8d413606d 100644
--- a/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
+++ b/amoro-common/src/main/java/org/apache/amoro/client/ZookeeperService.java
@@ -26,6 +26,7 @@ import
org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.client.ZKClientCon
import org.apache.amoro.shade.zookeeper3.org.apache.zookeeper.data.Stat;
import java.nio.charset.StandardCharsets;
+import java.util.List;
/** Provides ZooKeeper clients and operations. */
public class ZookeeperService {
@@ -92,6 +93,10 @@ public class ZookeeperService {
return new String(zkClient.getData().forPath(path),
StandardCharsets.UTF_8);
}
+ public List<String> getChildren(String path) throws Exception {
+ return zkClient.getChildren().forPath(path);
+ }
+
public void delete(String path) throws Exception {
zkClient.delete().forPath(path);
}
diff --git a/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
b/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
index 77b6b7dce..3417fed31 100644
--- a/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
+++ b/amoro-common/src/main/thrift/amoro_optimizing_service.thrift
@@ -67,4 +67,7 @@ service OptimizingService {
throws (1: amoro_commons.AmoroException e1)
bool cancelProcess(1:i64 processId)
+
+ list<string> getOptimizingNodeUrls()
+ throws (1: amoro_commons.AmoroException e1)
}
diff --git
a/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
b/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
index c4775b548..ae8f46285 100644
--- a/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
+++ b/amoro-common/src/test/java/org/apache/amoro/MockAmoroManagementServer.java
@@ -469,6 +469,11 @@ public class MockAmoroManagementServer implements Runnable
{
return false;
}
+ @Override
+ public java.util.List<String> getOptimizingNodeUrls() throws TException {
+ return java.util.Collections.emptyList();
+ }
+
public Map<String, OptimizerRegisterInfo> getRegisteredOptimizers() {
return registeredOptimizers;
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
index 22faae012..bda30c27f 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AbstractOptimizerOperator.java
@@ -29,6 +29,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,24 +40,100 @@ public class AbstractOptimizerOperator implements
Serializable {
// Call ams every 5 seconds by default
private static long callAmsInterval = TimeUnit.SECONDS.toMillis(5);
+ /**
+ * In master-slave mode, the maximum number of consecutive shouldRetryLater
errors allowed per
+ * node before failing fast and letting the caller try the next node.
Prevents the multi-node
+ * polling loop in OptimizerExecutor from getting stuck indefinitely on a
single unreachable node.
+ */
+ static final int MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE = 5;
+
private final OptimizerConfig config;
private final AtomicReference<String> token = new AtomicReference<>();
private volatile boolean stopped = false;
+ private transient volatile AmsNodeManager amsNodeManager;
+ private transient volatile ThriftAmsNodeManager thriftAmsNodeManager;
public AbstractOptimizerOperator(OptimizerConfig config) {
Preconditions.checkNotNull(config);
this.config = config;
+ if (config.isMasterSlaveMode()) {
+ String amsUrl = config.getAmsUrl();
+ if (amsUrl.startsWith("zookeeper://")) {
+ // ZK HA mode: discover nodes from ZooKeeper ephemeral nodes
+ try {
+ this.amsNodeManager = new AmsNodeManager(amsUrl);
+ LOG.info("Initialized ZK AmsNodeManager for master-slave mode");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize AmsNodeManager, will use single AMS
URL", e);
+ }
+ } else {
+ // DB HA mode: discover nodes via getOptimizingNodeUrls() Thrift RPC
+ try {
+ this.thriftAmsNodeManager = new ThriftAmsNodeManager(amsUrl);
+ LOG.info("Initialized ThriftAmsNodeManager for master-slave mode (DB
HA)");
+ } catch (Exception e) {
+ LOG.warn("Failed to initialize ThriftAmsNodeManager, will use single
AMS URL", e);
+ }
+ }
+ }
+ }
+
+ /** Get the AmsNodeManager instance if available (ZK mode). */
+ protected AmsNodeManager getAmsNodeManager() {
+ return amsNodeManager;
+ }
+
+ /** Get the ThriftAmsNodeManager instance if available (DB mode). */
+ protected ThriftAmsNodeManager getThriftAmsNodeManager() {
+ return thriftAmsNodeManager;
+ }
+
+ /**
+ * Returns true if any node manager (ZK or Thrift) is active. Used by
OptimizerExecutor to decide
+ * whether to use master-slave polling logic.
+ */
+ protected boolean hasAmsNodeManager() {
+ return amsNodeManager != null || thriftAmsNodeManager != null;
+ }
+
+ /**
+ * Get all AMS URLs from the active node manager. Falls back to the single
configured URL when no
+ * node manager is available or the node list is empty.
+ */
+ protected List<String> getAmsNodeUrls() {
+ if (amsNodeManager != null) {
+ List<String> urls = amsNodeManager.getAllAmsUrls();
+ if (!urls.isEmpty()) {
+ return urls;
+ }
+ }
+ if (thriftAmsNodeManager != null) {
+ return thriftAmsNodeManager.getAllAmsUrls();
+ }
+ return Collections.singletonList(getConfig().getAmsUrl());
}
protected <T> T callAms(AmsCallOperation<T> operation) throws TException {
+ return callAms(getConfig().getAmsUrl(), operation);
+ }
+
+ /**
+ * Call AMS with a specific AMS URL.
+ *
+ * @param amsUrl The AMS node URL to call
+ * @param operation The operation to execute
+ * @return The result of the operation
+ * @throws TException If the operation fails
+ */
+ protected <T> T callAms(String amsUrl, AmsCallOperation<T> operation) throws
TException {
while (isStarted()) {
try {
- return
operation.call(OptimizingClientPools.getClient(config.getAmsUrl()));
+ return operation.call(OptimizingClientPools.getClient(amsUrl));
} catch (Throwable t) {
if (shouldReturnNull(t)) {
return null;
} else if (shouldRetryLater(t)) {
- LOG.error("Call ams got an error and will try again later", t);
+ LOG.error("Call ams {} got an error and will try again later",
amsUrl, t);
waitAShortTime();
} else {
throw t;
@@ -86,25 +164,95 @@ public class AbstractOptimizerOperator implements
Serializable {
return false;
}
- protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T>
operation)
+ /**
+ * Call authenticated AMS with a specific AMS URL.
+ *
+ * @param amsUrl The AMS node URL to call
+ * @param operation The operation to execute
+ * @return The result of the operation
+ * @throws TException If the operation fails
+ */
+ protected <T> T callAuthenticatedAms(String amsUrl,
AmsAuthenticatedCallOperation<T> operation)
throws TException {
+ // Maximum retry time window for auth errors in master-slave mode (30
seconds)
+ long maxAuthRetryTimeWindow = TimeUnit.SECONDS.toMillis(30);
+ Long firstAuthErrorTime = null;
+
+ // Per-node retry budget: in master-slave mode, limit consecutive
shouldRetryLater retries so
+ // that a permanently unreachable node does not block the multi-node
polling loop indefinitely.
+ int consecutiveRetries = 0;
+
while (isStarted()) {
if (tokenIsReady()) {
String token = getToken();
try {
- return
operation.call(OptimizingClientPools.getClient(config.getAmsUrl()), token);
+ return operation.call(OptimizingClientPools.getClient(amsUrl),
token);
} catch (Throwable t) {
if (t instanceof AmoroException
&& ErrorCodes.PLUGIN_RETRY_AUTH_ERROR_CODE == ((AmoroException)
(t)).getErrorCode()) {
- // Reset the token when got a authorization error
- LOG.error(
- "Got a authorization error while calling ams, reset token and
wait for a new one",
- t);
- resetToken(token);
+ // In master-slave mode, the slave node may not have completed
optimizer
+ // auto-registration
+ // yet, so we should retry within a time window before resetting
the token
+ boolean isMasterSlaveMode = config.isMasterSlaveMode() &&
hasAmsNodeManager();
+ long currentTime = System.currentTimeMillis();
+
+ if (isMasterSlaveMode) {
+ if (firstAuthErrorTime == null) {
+ // First auth error, record the time
+ firstAuthErrorTime = currentTime;
+ }
+
+ long elapsedTime = currentTime - firstAuthErrorTime;
+ if (elapsedTime < maxAuthRetryTimeWindow) {
+ // Still within retry time window, retry after waiting
+ LOG.warn(
+ "Got an authorization error while calling ams {} in
master-slave mode (elapsed: {}ms/{}ms). "
+ + "This may be because the slave node hasn't completed
optimizer auto-registration yet. "
+ + "Will retry after waiting.",
+ amsUrl,
+ elapsedTime,
+ maxAuthRetryTimeWindow,
+ t);
+ waitAShortTime();
+ } else {
+ // Exceeded retry time window, reset token
+ LOG.error(
+ "Got authorization errors from ams {} in master-slave mode
for {}ms, exceeded retry time window ({}ms). "
+ + "Reset token and wait for a new one",
+ amsUrl,
+ elapsedTime,
+ maxAuthRetryTimeWindow,
+ t);
+ resetToken(token);
+ firstAuthErrorTime = null;
+ }
+ } else {
+ // Non-master-slave mode, reset token immediately
+ LOG.error(
+ "Got a authorization error while calling ams {}, reset token
and wait for a new one",
+ amsUrl,
+ t);
+ resetToken(token);
+ firstAuthErrorTime = null;
+ }
} else if (shouldReturnNull(t)) {
return null;
} else if (shouldRetryLater(t)) {
- LOG.error("Call ams got an error and will try again later", t);
+ LOG.error("Call ams {} got an error and will try again later",
amsUrl, t);
+ if (hasAmsNodeManager()) {
+ consecutiveRetries++;
+ if (consecutiveRetries > MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE) {
+ LOG.warn(
+ "Per-node retry budget ({}) exhausted for AMS {}, failing
fast to try next node",
+ MAX_RETRIES_PER_NODE_IN_MASTER_SLAVE,
+ amsUrl);
+ if (t instanceof TException) {
+ throw (TException) t;
+ }
+ throw new TException(
+ "Per-node retry budget exhausted for " + amsUrl + ": " +
t.getMessage(), t);
+ }
+ }
waitAShortTime();
} else {
throw t;
@@ -118,6 +266,12 @@ public class AbstractOptimizerOperator implements
Serializable {
throw new IllegalStateException("Operator is stopped");
}
+ /** Legacy method for backward compatibility. Uses the configured AMS URL. */
+ protected <T> T callAuthenticatedAms(AmsAuthenticatedCallOperation<T>
operation)
+ throws TException {
+ return callAuthenticatedAms(getConfig().getAmsUrl(), operation);
+ }
+
public static void setCallAmsInterval(long callAmsInterval) {
AbstractOptimizerOperator.callAmsInterval = callAmsInterval;
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
new file mode 100644
index 000000000..94af2ca13
--- /dev/null
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/AmsNodeManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.common;
+
+import org.apache.amoro.client.AmsServerInfo;
+import org.apache.amoro.client.AmsThriftUrl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Manages multiple AMS nodes in master-slave mode. Fetches node list from
ZooKeeper and provides
+ * failover support.
+ */
+public class AmsNodeManager implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(AmsNodeManager.class);
+ private static final Pattern ZOOKEEPER_PATTERN =
Pattern.compile("zookeeper://(\\S+)/([\\w-]+)");
+ private static final long REFRESH_INTERVAL_MS =
+ TimeUnit.SECONDS.toMillis(30); // Refresh every 30 seconds
+
+ private final String zkServerAddress;
+ private final AtomicReference<List<String>> amsUrls =
+ new AtomicReference<>(Collections.emptyList());
+ private volatile long lastRefreshTime = 0;
+ private transient Object refreshLock;
+
+ public AmsNodeManager(String amsUrl) {
+ Matcher m = ZOOKEEPER_PATTERN.matcher(amsUrl);
+ if (!m.matches()) {
+ throw new IllegalArgumentException(
+ "AmsNodeManager only supports ZooKeeper URL format:
zookeeper://host:port/cluster");
+ }
+ zkServerAddress = amsUrl;
+ refreshLock = new Object();
+ refreshNodes();
+ }
+
+ /** Get all available AMS URLs. */
+ public List<String> getAllAmsUrls() {
+ refreshNodesIfNeeded();
+ return new ArrayList<>(amsUrls.get());
+ }
+
+ /** Manually refresh node list from ZooKeeper. */
+ public void refreshNodes() {
+ synchronized (refreshLock) {
+ refreshNodesInternal();
+ lastRefreshTime = System.currentTimeMillis();
+ }
+ }
+
+ /** Refresh node list from ZooKeeper if needed. */
+ private void refreshNodesIfNeeded() {
+ long now = System.currentTimeMillis();
+ if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+ synchronized (refreshLock) {
+ if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+ refreshNodesInternal();
+ lastRefreshTime = now;
+ }
+ }
+ }
+ }
+
+ /** Refresh node list from ZooKeeper (internal implementation). */
+ private void refreshNodesInternal() {
+ try {
+ LOG.info("Refreshing nodes from {}", zkServerAddress);
+ List<String> nodeUrls = new ArrayList<>();
+ List<AmsServerInfo> amsServerInfos =
AmsThriftUrl.parseMasterSlaveAmsNodes(zkServerAddress);
+ LOG.info("Refreshing nodes from {}", amsServerInfos);
+ for (AmsServerInfo amsServerInfo : amsServerInfos) {
+ nodeUrls.add(buildAmsUrl(amsServerInfo));
+ }
+
+ if (!nodeUrls.isEmpty()) {
+ amsUrls.set(nodeUrls);
+ LOG.info("Refreshed AMS nodes, found {} nodes: {}", nodeUrls.size(),
nodeUrls);
+ } else {
+ LOG.warn("No AMS nodes found in ZooKeeper");
+ amsUrls.set(Collections.emptyList());
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to refresh AMS nodes from ZooKeeper", e);
+ // Keep existing nodes on error
+ }
+ }
+
+ private String buildAmsUrl(AmsServerInfo serverInfo) {
+ return String.format("thrift://%s:%d", serverInfo.getHost(),
serverInfo.getThriftBindPort());
+ }
+}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
index 544d31f24..2c70a2652 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerConfig.java
@@ -103,6 +103,12 @@ public class OptimizerConfig implements Serializable {
usage = "Timeout in cache, default 10minutes")
private String cacheTimeout =
OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;
+ @Option(
+ name = "-msm",
+ aliases = "--" + OptimizerProperties.OPTIMIZER_MASTER_SLAVE_MODE_ENABLED,
+ usage = "Enable master-slave mode")
+ private boolean masterSlaveMode = false;
+
public OptimizerConfig() {}
public OptimizerConfig(String[] args) throws CmdLineException {
@@ -214,6 +220,14 @@ public class OptimizerConfig implements Serializable {
this.cacheTimeout = cacheTimeout;
}
+ public boolean isMasterSlaveMode() {
+ return masterSlaveMode;
+ }
+
+ public void setMasterSlaveMode(boolean masterSlaveMode) {
+ this.masterSlaveMode = masterSlaveMode;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -230,6 +244,7 @@ public class OptimizerConfig implements Serializable {
.add("cacheMaxTotalSize", cacheMaxTotalSize)
.add("cacheMaxEntrySize", cacheMaxEntrySize)
.add("cacheTimeout", cacheTimeout)
+ .add("masterSlaveMode", masterSlaveMode)
.toString();
}
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
index 6b1dc0a6c..f83da12da 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerExecutor.java
@@ -34,7 +34,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
public class OptimizerExecutor extends AbstractOptimizerOperator {
@@ -49,33 +53,81 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
}
public void start() {
+ // getAmsNodeUrls() returns all nodes in master-slave mode, or the single
configured URL in
+ // active-standby mode, so both modes are handled uniformly via the same
loop.
+ long basePollInterval = TimeUnit.SECONDS.toMillis(1);
+ long maxPollInterval = TimeUnit.SECONDS.toMillis(30);
+ int consecutiveEmptyPolls = 0;
+ final int emptyPollThreshold = 5;
+ final Random random = new Random();
+
while (isStarted()) {
- OptimizingTask ackTask = null;
- OptimizingTaskResult result = null;
- try {
- OptimizingTask task = pollTask();
- if (task != null && ackTask(task)) {
- ackTask = task;
- result = executeTask(task);
- }
- } catch (Throwable t) {
- if (ackTask != null) {
- LOG.error(
- "Optimizer executor[{}] handling task[{}] failed and got an
unknown error",
- threadId,
- ackTask.getTaskId(),
- t);
- String errorMessage = ExceptionUtil.getErrorMessage(t,
ERROR_MESSAGE_MAX_LENGTH);
- result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
- result.setErrorMessage(errorMessage);
- } else {
- LOG.error("Optimizer executor[{}] got an unexpected error",
threadId, t);
+ List<String> amsUrls = getAmsNodeUrls();
+
+ // Shuffle to prevent all optimizers from hitting the same AMS node
simultaneously
+ if (amsUrls.size() > 1) {
+ Collections.shuffle(amsUrls, random);
+ }
+
+ boolean hasTask = false;
+ for (String amsUrl : amsUrls) {
+ if (!isStarted()) {
+ break;
}
- } finally {
- if (result != null) {
- completeTask(result);
+
+ OptimizingTask ackTask = null;
+ OptimizingTaskResult result = null;
+ try {
+ OptimizingTask task = pollTask(amsUrl);
+ if (task != null && ackTask(amsUrl, task)) {
+ ackTask = task;
+ hasTask = true;
+ result = executeTask(task);
+ }
+ } catch (Throwable t) {
+ if (ackTask != null) {
+ LOG.error(
+ "Optimizer executor[{}] handling task[{}] from AMS {} failed
and got an unknown error",
+ threadId,
+ ackTask.getTaskId(),
+ amsUrl,
+ t);
+ String errorMessage = ExceptionUtil.getErrorMessage(t,
ERROR_MESSAGE_MAX_LENGTH);
+ result = new OptimizingTaskResult(ackTask.getTaskId(), threadId);
+ result.setErrorMessage(errorMessage);
+ } else {
+ LOG.error(
+ "Optimizer executor[{}] got an unexpected error from AMS {}",
threadId, amsUrl, t);
+ }
+ } finally {
+ if (result != null) {
+ completeTask(amsUrl, result);
+ }
}
}
+
+ if (hasTask) {
+ consecutiveEmptyPolls = 0;
+ } else {
+ consecutiveEmptyPolls++;
+ }
+
+ long waitTime;
+ if (amsUrls.isEmpty()) {
+ waitTime = basePollInterval;
+ } else if (consecutiveEmptyPolls >= emptyPollThreshold) {
+ int backoffFactor = consecutiveEmptyPolls - emptyPollThreshold + 1;
+ waitTime = Math.min(maxPollInterval, basePollInterval * (1L <<
backoffFactor));
+ LOG.debug(
+ "Optimizer executor[{}] no tasks found for {} consecutive polls,
using increased interval: {}ms",
+ threadId,
+ consecutiveEmptyPolls,
+ waitTime);
+ } else {
+ waitTime = basePollInterval;
+ }
+
+ waitAShortTime(waitTime);
}
}
@@ -83,38 +135,44 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
return threadId;
}
- private OptimizingTask pollTask() {
+ private OptimizingTask pollTask(String amsUrl) {
OptimizingTask task = null;
- while (isStarted()) {
- try {
- task = callAuthenticatedAms((client, token) -> client.pollTask(token,
threadId));
- } catch (TException exception) {
- LOG.error("Optimizer executor[{}] polled task failed", threadId,
exception);
- }
+ try {
+ task = callAuthenticatedAms(amsUrl, (client, token) ->
client.pollTask(token, threadId));
if (task != null) {
- LOG.info("Optimizer executor[{}] polled task[{}] from ams", threadId,
task.getTaskId());
- break;
- } else {
- waitAShortTime();
+ LOG.info(
+ "Optimizer executor[{}] polled task[{}] from AMS {}",
+ threadId,
+ task.getTaskId(),
+ amsUrl);
}
+ } catch (TException exception) {
+ LOG.error(
+ "Optimizer executor[{}] polled task from AMS {} failed", threadId,
amsUrl, exception);
}
return task;
}
- private boolean ackTask(OptimizingTask task) {
+ private boolean ackTask(String amsUrl, OptimizingTask task) {
try {
callAuthenticatedAms(
+ amsUrl,
(client, token) -> {
client.ackTask(token, threadId, task.getTaskId());
return null;
});
- LOG.info("Optimizer executor[{}] acknowledged task[{}] to ams",
threadId, task.getTaskId());
+ LOG.info(
+ "Optimizer executor[{}] acknowledged task[{}] to AMS {}",
+ threadId,
+ task.getTaskId(),
+ amsUrl);
return true;
} catch (TException exception) {
LOG.error(
- "Optimizer executor[{}] acknowledged task[{}] failed",
+ "Optimizer executor[{}] acknowledged task[{}] to AMS {} failed",
threadId,
task.getTaskId(),
+ amsUrl,
exception);
return false;
}
@@ -124,24 +182,27 @@ public class OptimizerExecutor extends
AbstractOptimizerOperator {
return executeTask(getConfig(), getThreadId(), task, LOG);
}
- protected void completeTask(OptimizingTaskResult optimizingTaskResult) {
+ protected void completeTask(String amsUrl, OptimizingTaskResult
optimizingTaskResult) {
try {
callAuthenticatedAms(
+ amsUrl,
(client, token) -> {
client.completeTask(token, optimizingTaskResult);
return null;
});
LOG.info(
- "Optimizer executor[{}] completed task[{}](status: {}) to ams",
+ "Optimizer executor[{}] completed task[{}](status: {}) to AMS {}",
threadId,
optimizingTaskResult.getTaskId(),
- optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL");
+ optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
+ amsUrl);
} catch (Exception exception) {
LOG.error(
- "Optimizer executor[{}] completed task[{}](status: {}) failed",
+ "Optimizer executor[{}] completed task[{}](status: {}) to AMS {}
failed",
threadId,
optimizingTaskResult.getTaskId(),
optimizingTaskResult.getErrorMessage() == null ? "SUCCESS" : "FAIL",
+ amsUrl,
exception);
}
}
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
index ce91e5501..48b8a7e07 100644
---
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/OptimizerToucher.java
@@ -32,7 +32,7 @@ import java.util.Map;
public class OptimizerToucher extends AbstractOptimizerOperator {
private static final Logger LOG =
LoggerFactory.getLogger(OptimizerToucher.class);
- private TokenChangeListener tokenChangeListener;
+ private transient TokenChangeListener tokenChangeListener;
private final Map<String, String> registerProperties = Maps.newHashMap();
private final long startTime;
diff --git
a/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
new file mode 100644
index 000000000..f325d5c6e
--- /dev/null
+++
b/amoro-optimizer/amoro-optimizer-common/src/main/java/org/apache/amoro/optimizer/common/ThriftAmsNodeManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.optimizer.common;
+
+import org.apache.amoro.client.OptimizingClientPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Manages multiple AMS nodes in master-slave mode for DB-backed HA. Discovers
the node list by
+ * calling {@code getOptimizingNodeUrls()} via the Thrift API on any reachable
AMS node, then caches
+ * and periodically refreshes the list. This is the DB-mode counterpart of
{@link AmsNodeManager}
+ * (which reads node addresses directly from ZooKeeper).
+ */
+public class ThriftAmsNodeManager implements Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ThriftAmsNodeManager.class);
+ private static final long REFRESH_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(30);
+
+ private final String initialAmsUrl;
+ private final AtomicReference<List<String>> amsUrls =
+ new AtomicReference<>(Collections.emptyList());
+ private volatile long lastRefreshTime = 0;
+ private transient Object refreshLock;
+
+ public ThriftAmsNodeManager(String initialAmsUrl) {
+ this.initialAmsUrl = initialAmsUrl;
+ this.refreshLock = new Object();
+ refreshNodes();
+ }
+
+ /** Get all available AMS URLs. Falls back to the initial URL when the list
is empty. */
+ public List<String> getAllAmsUrls() {
+ refreshNodesIfNeeded();
+ List<String> current = amsUrls.get();
+ if (current.isEmpty()) {
+ return Collections.singletonList(initialAmsUrl);
+ }
+ return new ArrayList<>(current);
+ }
+
+ /** Manually refresh the node list from the AMS Thrift API. */
+ public void refreshNodes() {
+ synchronized (getLock()) {
+ refreshNodesInternal();
+ lastRefreshTime = System.currentTimeMillis();
+ }
+ }
+
+ private void refreshNodesIfNeeded() {
+ long now = System.currentTimeMillis();
+ if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+ synchronized (getLock()) {
+ if (now - lastRefreshTime > REFRESH_INTERVAL_MS) {
+ refreshNodesInternal();
+ lastRefreshTime = now;
+ }
+ }
+ }
+ }
+
+ private void refreshNodesInternal() {
+ try {
+ List<String> nodeUrls =
+
OptimizingClientPools.getClient(initialAmsUrl).getOptimizingNodeUrls();
+ if (nodeUrls != null && !nodeUrls.isEmpty()) {
+ amsUrls.set(new ArrayList<>(nodeUrls));
+ LOG.info(
+ "Refreshed AMS nodes via Thrift ({}), found {} nodes: {}",
+ initialAmsUrl,
+ nodeUrls.size(),
+ nodeUrls);
+ } else {
+ LOG.warn("No AMS nodes returned by getOptimizingNodeUrls() from {}",
initialAmsUrl);
+ amsUrls.set(Collections.emptyList());
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to refresh AMS nodes from {} via Thrift",
initialAmsUrl, e);
+ }
+ }
+
+ private Object getLock() {
+ if (refreshLock == null) {
+ refreshLock = new Object();
+ }
+ return refreshLock;
+ }
+}