Repository: spark
Updated Branches:
  refs/heads/master a28900956 -> 6fe32869c


[SPARK-24678][SPARK-STREAMING] Give priority in use of 'PROCESS_LOCAL' for 
spark-streaming

## What changes were proposed in this pull request?

Currently, `BlockRDD.getPreferredLocations`  only get hosts info of blocks, 
which results in subsequent schedule level is not better than 'NODE_LOCAL'. We 
can just make a small changes, the schedule level can be improved to 
'PROCESS_LOCAL'

## How was this patch tested?

manual test

Author: sharkdtu <shark...@tencent.com>

Closes #21658 from sharkdtu/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fe32869
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fe32869
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fe32869

Branch: refs/heads/master
Commit: 6fe32869ccb17933e77a4dbe883e36d382fbeeec
Parents: a289009
Author: sharkdtu <shark...@tencent.com>
Authored: Tue Jul 10 20:18:34 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Tue Jul 10 20:18:34 2018 +0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/rdd/BlockRDD.scala |  2 +-
 .../scala/org/apache/spark/storage/BlockManager.scala  |  7 +++++--
 .../org/apache/spark/storage/BlockManagerSuite.scala   | 13 +++++++++++++
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6fe32869/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 4e036c2..23cf19d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -30,7 +30,7 @@ private[spark]
 class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: 
Array[BlockId])
   extends RDD[T](sc, Nil) {
 
-  @transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, 
SparkEnv.get)
+  @transient lazy val _locations = BlockManager.blockIdsToLocations(blockIds, 
SparkEnv.get)
   @volatile private var _isValid = true
 
   override def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/6fe32869/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index df1a4be..0e1c7d5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -45,6 +45,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.{ExternalShuffleClient, 
TempFileManager}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.memory._
@@ -1554,7 +1555,7 @@ private[spark] class BlockManager(
 private[spark] object BlockManager {
   private val ID_GENERATOR = new IdGenerator
 
-  def blockIdsToHosts(
+  def blockIdsToLocations(
       blockIds: Array[BlockId],
       env: SparkEnv,
       blockManagerMaster: BlockManagerMaster = null): Map[BlockId, 
Seq[String]] = {
@@ -1569,7 +1570,9 @@ private[spark] object BlockManager {
 
     val blockManagers = new HashMap[BlockId, Seq[String]]
     for (i <- 0 until blockIds.length) {
-      blockManagers(blockIds(i)) = blockLocations(i).map(_.host)
+      blockManagers(blockIds(i)) = blockLocations(i).map { loc =>
+        ExecutorCacheTaskLocation(loc.host, loc.executorId).toString
+      }
     }
     blockManagers.toMap
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6fe32869/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index b19d8eb..08172f0 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1422,6 +1422,19 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(mockBlockTransferService.tempFileManager === 
store.remoteBlockTempFileManager)
   }
 
+  test("query locations of blockIds") {
+    val mockBlockManagerMaster = mock(classOf[BlockManagerMaster])
+    val blockLocations = Seq(BlockManagerId("1", "host1", 100), 
BlockManagerId("2", "host2", 200))
+    when(mockBlockManagerMaster.getLocations(mc.any[Array[BlockId]]))
+      .thenReturn(Array(blockLocations))
+    val env = mock(classOf[SparkEnv])
+
+    val blockIds: Array[BlockId] = Array(StreamBlockId(1, 2))
+    val locs = BlockManager.blockIdsToLocations(blockIds, env, 
mockBlockManagerMaster)
+    val expectedLocs = Seq("executor_host1_1", "executor_host2_2")
+    assert(locs(blockIds(0)) == expectedLocs)
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends 
BlockTransferService {
     var numCalls = 0
     var tempFileManager: TempFileManager = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to