This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 587c8b55f [CELEBORN-1195] Use batch rack resolve when restore meta
from file
587c8b55f is described below
commit 587c8b55f88b1bcdb37ca09020f1182abe0dd4ad
Author: sychen <[email protected]>
AuthorDate: Wed Dec 27 11:29:28 2023 +0800
[CELEBORN-1195] Use batch rack resolve when restore meta from file
### What changes were proposed in this pull request?、
```java
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager#restoreMetaFromFile
```
### Why are the changes needed?
When the number of workers is large, the performance of parsing one by one
will decrease.
YARN-9332. RackResolver tool should accept multiple hosts
https://issues.apache.org/jira/browse/YARN-9332
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #2185 from cxzl25/CELEBORN-1195.
Authored-by: sychen <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../master/clustermeta/AbstractMetaManager.java | 21 ++++++++++++---------
.../master/network/CelebornRackResolver.scala | 12 ++++++++++--
.../master/network/CelebornRackResolverSuite.scala | 15 +++++++++++++++
3 files changed, 37 insertions(+), 11 deletions(-)
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 3ddd39232..8f468d99e 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
import scala.Option;
+import org.apache.hadoop.net.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -281,25 +282,27 @@ public abstract class AbstractMetaManager implements
IMetadataHandler {
}
});
- workers.addAll(
+ Set<WorkerInfo> workerInfoSet =
snapshotMetaInfo.getWorkersList().stream()
.map(PbSerDeUtils::fromPbWorkerInfo)
- .collect(Collectors.toSet())
- .stream()
- .map(
+ .collect(Collectors.toSet());
+ List<String> workerHostList =
+
workerInfoSet.stream().map(WorkerInfo::host).collect(Collectors.toList());
+ scala.collection.immutable.Map<String, Node> resolveMap =
+ rackResolver.resolveToMap(workerHostList);
+ workers.addAll(
+ workerInfoSet.stream()
+ .peek(
workerInfo -> {
// Reset worker's network location with current master's
configuration.
workerInfo.networkLocation_$eq(
-
rackResolver.resolve(workerInfo.host()).getNetworkLocation());
- return workerInfo;
+
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
})
.collect(Collectors.toSet()));
snapshotMetaInfo
.getLostWorkersMap()
- .entrySet()
- .forEach(
- entry ->
lostWorkers.put(WorkerInfo.fromUniqueId(entry.getKey()), entry.getValue()));
+ .forEach((key, value) ->
lostWorkers.put(WorkerInfo.fromUniqueId(key), value));
shutdownWorkers.addAll(
snapshotMetaInfo.getShutdownWorkersList().stream()
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
index 1407c54c0..7e376f8a7 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
@@ -54,6 +54,14 @@ class CelebornRackResolver(celebornConf: CelebornConf)
extends Logging {
coreResolve(hostNames)
}
+ def resolveToMap(hostNames: java.util.List[String]): Map[String, Node] = {
+ resolveToMap(hostNames.asScala.toSeq)
+ }
+
+ def resolveToMap(hostNames: Seq[String]): Map[String, Node] = {
+ hostNames.zip(resolve(hostNames)).toMap
+ }
+
private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
if (hostNames.isEmpty) {
return Seq.empty
@@ -80,8 +88,8 @@ class CelebornRackResolver(celebornConf: CelebornConf)
extends Logging {
}
def isOnSameRack(primaryHost: String, replicaHost: String): Boolean = {
- val primaryNode = resolve(primaryHost)
- val replicaNode = resolve(replicaHost)
+ val nodes = resolve(Seq(primaryHost, replicaHost))
+ val (primaryNode, replicaNode) = (nodes.head, nodes.last)
if (primaryNode == null || replicaNode == null) {
false
} else {
diff --git
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
index ead67aef2..71337e6ae 100644
---
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
+++
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.master.network
import java.io.{File, FileWriter}
import java.nio.charset.StandardCharsets
+import java.util
import com.google.common.io.Files
import
org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
@@ -53,6 +54,20 @@ class CelebornRackResolverSuite extends AnyFunSuite {
assertEquals(names.size, result.size)
assertEquals("/rack1", result(0).getNetworkLocation)
assertEquals("/rack2", result(1).getNetworkLocation)
+
+ val resultMap: Map[String, Node] = resolver.resolveToMap(names)
+ assertEquals(names.size, resultMap.size)
+ assertEquals("/rack1", resultMap(hostName1).getNetworkLocation)
+ assertEquals("/rack2", resultMap(hostName2).getNetworkLocation)
+
+ val hostNamesList = new util.ArrayList[String]()
+ hostNamesList.add(hostName1)
+ hostNamesList.add(hostName2)
+ val resultMap2: Map[String, Node] = resolver.resolveToMap(hostNamesList)
+ assertEquals(hostNamesList.size, resultMap2.size)
+ assertEquals("/rack1", resultMap2(hostName1).getNetworkLocation)
+ assertEquals("/rack2", resultMap2(hostName2).getNetworkLocation)
+
}
test("CELEBORN-446: RackResolver support getDistance") {