This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 587c8b55f [CELEBORN-1195] Use batch rack resolve when restore meta 
from file
587c8b55f is described below

commit 587c8b55f88b1bcdb37ca09020f1182abe0dd4ad
Author: sychen <[email protected]>
AuthorDate: Wed Dec 27 11:29:28 2023 +0800

    [CELEBORN-1195] Use batch rack resolve when restore meta from file
    
    ### What changes were proposed in this pull request?、
    ```java
    
org.apache.celeborn.service.deploy.master.clustermeta.AbstractMetaManager#restoreMetaFromFile
    ```
    
    ### Why are the changes needed?
    When the number of workers is large, the performance of parsing one by one 
will decrease.
    
    YARN-9332. RackResolver tool should accept multiple hosts
    https://issues.apache.org/jira/browse/YARN-9332
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2185 from cxzl25/CELEBORN-1195.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../master/clustermeta/AbstractMetaManager.java     | 21 ++++++++++++---------
 .../master/network/CelebornRackResolver.scala       | 12 ++++++++++--
 .../master/network/CelebornRackResolverSuite.scala  | 15 +++++++++++++++
 3 files changed, 37 insertions(+), 11 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
index 3ddd39232..8f468d99e 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 
 import scala.Option;
 
+import org.apache.hadoop.net.Node;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -281,25 +282,27 @@ public abstract class AbstractMetaManager implements 
IMetadataHandler {
             }
           });
 
-      workers.addAll(
+      Set<WorkerInfo> workerInfoSet =
           snapshotMetaInfo.getWorkersList().stream()
               .map(PbSerDeUtils::fromPbWorkerInfo)
-              .collect(Collectors.toSet())
-              .stream()
-              .map(
+              .collect(Collectors.toSet());
+      List<String> workerHostList =
+          
workerInfoSet.stream().map(WorkerInfo::host).collect(Collectors.toList());
+      scala.collection.immutable.Map<String, Node> resolveMap =
+          rackResolver.resolveToMap(workerHostList);
+      workers.addAll(
+          workerInfoSet.stream()
+              .peek(
                   workerInfo -> {
                     // Reset worker's network location with current master's 
configuration.
                     workerInfo.networkLocation_$eq(
-                        
rackResolver.resolve(workerInfo.host()).getNetworkLocation());
-                    return workerInfo;
+                        
resolveMap.get(workerInfo.host()).get().getNetworkLocation());
                   })
               .collect(Collectors.toSet()));
 
       snapshotMetaInfo
           .getLostWorkersMap()
-          .entrySet()
-          .forEach(
-              entry -> 
lostWorkers.put(WorkerInfo.fromUniqueId(entry.getKey()), entry.getValue()));
+          .forEach((key, value) -> 
lostWorkers.put(WorkerInfo.fromUniqueId(key), value));
 
       shutdownWorkers.addAll(
           snapshotMetaInfo.getShutdownWorkersList().stream()
diff --git 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
index 1407c54c0..7e376f8a7 100644
--- 
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
+++ 
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolver.scala
@@ -54,6 +54,14 @@ class CelebornRackResolver(celebornConf: CelebornConf) 
extends Logging {
     coreResolve(hostNames)
   }
 
+  def resolveToMap(hostNames: java.util.List[String]): Map[String, Node] = {
+    resolveToMap(hostNames.asScala.toSeq)
+  }
+
+  def resolveToMap(hostNames: Seq[String]): Map[String, Node] = {
+    hostNames.zip(resolve(hostNames)).toMap
+  }
+
   private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
     if (hostNames.isEmpty) {
       return Seq.empty
@@ -80,8 +88,8 @@ class CelebornRackResolver(celebornConf: CelebornConf) 
extends Logging {
   }
 
   def isOnSameRack(primaryHost: String, replicaHost: String): Boolean = {
-    val primaryNode = resolve(primaryHost)
-    val replicaNode = resolve(replicaHost)
+    val nodes = resolve(Seq(primaryHost, replicaHost))
+    val (primaryNode, replicaNode) = (nodes.head, nodes.last)
     if (primaryNode == null || replicaNode == null) {
       false
     } else {
diff --git 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
index ead67aef2..71337e6ae 100644
--- 
a/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
+++ 
b/master/src/test/scala/org/apache/celeborn/service/deploy/master/network/CelebornRackResolverSuite.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.service.deploy.master.network
 
 import java.io.{File, FileWriter}
 import java.nio.charset.StandardCharsets
+import java.util
 
 import com.google.common.io.Files
 import 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.{NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
 NET_TOPOLOGY_TABLE_MAPPING_FILE_KEY}
@@ -53,6 +54,20 @@ class CelebornRackResolverSuite extends AnyFunSuite {
     assertEquals(names.size, result.size)
     assertEquals("/rack1", result(0).getNetworkLocation)
     assertEquals("/rack2", result(1).getNetworkLocation)
+
+    val resultMap: Map[String, Node] = resolver.resolveToMap(names)
+    assertEquals(names.size, resultMap.size)
+    assertEquals("/rack1", resultMap(hostName1).getNetworkLocation)
+    assertEquals("/rack2", resultMap(hostName2).getNetworkLocation)
+
+    val hostNamesList = new util.ArrayList[String]()
+    hostNamesList.add(hostName1)
+    hostNamesList.add(hostName2)
+    val resultMap2: Map[String, Node] = resolver.resolveToMap(hostNamesList)
+    assertEquals(hostNamesList.size, resultMap2.size)
+    assertEquals("/rack1", resultMap2(hostName1).getNetworkLocation)
+    assertEquals("/rack2", resultMap2(hostName2).getNetworkLocation)
+
   }
 
   test("CELEBORN-446: RackResolver support getDistance") {

Reply via email to