Repository: spark
Updated Branches:
  refs/heads/master 0f820e2b6 -> 15526653a


[SPARK-19956][CORE] Optimize a location order of blocks with topology 
information

## What changes were proposed in this pull request?

When call the method getLocations of BlockManager, we only compare the data 
block host. Random selection for non-local data blocks, this may cause the 
selected data block to be in a different rack. So in this patch to increase the 
sort of the rack.

## How was this patch tested?

New test case.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Xianyang Liu <xianyang....@intel.com>

Closes #17300 from ConeyLiu/blockmanager.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15526653
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15526653
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15526653

Branch: refs/heads/master
Commit: 15526653a93a32cde3c9ea0c0e68e35622b0a590
Parents: 0f820e2
Author: Xianyang Liu <xianyang....@intel.com>
Authored: Mon May 8 17:33:47 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon May 8 17:33:47 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 11 +++++--
 .../spark/storage/BlockManagerSuite.scala       | 31 ++++++++++++++++++--
 2 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/15526653/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3219969..33ce30c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -612,12 +612,19 @@ private[spark] class BlockManager(
 
   /**
    * Return a list of locations for the given block, prioritizing the local 
machine since
-   * multiple block managers can share the same host.
+   * multiple block managers can share the same host, followed by hosts on the 
same rack.
    */
   private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
     val locs = Random.shuffle(master.getLocations(blockId))
     val (preferredLocs, otherLocs) = locs.partition { loc => 
blockManagerId.host == loc.host }
-    preferredLocs ++ otherLocs
+    blockManagerId.topologyInfo match {
+      case None => preferredLocs ++ otherLocs
+      case Some(_) =>
+        val (sameRackLocs, differentRackLocs) = otherLocs.partition {
+          loc => blockManagerId.topologyInfo == loc.topologyInfo
+        }
+        preferredLocs ++ sameRackLocs ++ differentRackLocs
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/15526653/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index a8b9604..1e7bcdb 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -496,8 +496,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
   }
 
-  test("optimize a location order of blocks") {
-    val localHost = Utils.localHostName()
+  test("optimize a location order of blocks without topology information") {
+    val localHost = "localhost"
     val otherHost = "otherHost"
     val bmMaster = mock(classOf[BlockManagerMaster])
     val bmId1 = BlockManagerId("id1", localHost, 1)
@@ -508,7 +508,32 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     val blockManager = makeBlockManager(128, "exec", bmMaster)
     val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
     val locations = blockManager invokePrivate 
getLocations(BroadcastBlockId(0))
-    assert(locations.map(_.host).toSet === Set(localHost, localHost, 
otherHost))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+  }
+
+  test("optimize a location order of blocks with topology information") {
+    val localHost = "localhost"
+    val otherHost = "otherHost"
+    val localRack = "localRack"
+    val otherRack = "otherRack"
+
+    val bmMaster = mock(classOf[BlockManagerMaster])
+    val bmId1 = BlockManagerId("id1", localHost, 1, Some(localRack))
+    val bmId2 = BlockManagerId("id2", localHost, 2, Some(localRack))
+    val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
+    val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
+    val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
+    when(bmMaster.getLocations(mc.any[BlockId]))
+      .thenReturn(Seq(bmId1, bmId2, bmId5, bmId3, bmId4))
+
+    val blockManager = makeBlockManager(128, "exec", bmMaster)
+    blockManager.blockManagerId =
+      BlockManagerId(SparkContext.DRIVER_IDENTIFIER, localHost, 1, 
Some(localRack))
+    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+    val locations = blockManager invokePrivate 
getLocations(BroadcastBlockId(0))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost, 
otherHost, otherHost))
+    assert(locations.flatMap(_.topologyInfo)
+      === Seq(localRack, localRack, localRack, otherRack, otherRack))
   }
 
   test("SPARK-9591: getRemoteBytes from another location when Exception 
throw") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to