This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 60849b78204 [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` 
should respect `spark.shuffle.reduceLocality.enabled`
60849b78204 is described below

commit 60849b78204e69392976420b9a813bed0790e4e9
Author: roryqi <ror...@tencent.com>
AuthorDate: Thu Mar 9 11:50:05 2023 -0600

    [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect 
`spark.shuffle.reduceLocality.enabled`
    
    ### What changes were proposed in this pull request?
    `MapOutputTracker#getMapLocation` should respect 
`spark.shuffle.reduceLocality.enabled`
    
    ### Why are the changes needed?
    
    Discuss as https://github.com/apache/spark/pull/40307
    
    getPreferredLocations in ShuffledRowRDD should return Nil at the very 
beginning in case spark.shuffle.reduceLocality.enabled = false (conceptually).
    
    This logic is pushed into MapOutputTracker though - and 
getPreferredLocationsForShuffle honors spark.shuffle.reduceLocality.enabled - 
but getMapLocation does not.
    
    So the fix would be to fix getMapLocation to honor the parameter.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New ut
    
    Closes #40339 from jerqi/new_feature.
    
    Authored-by: roryqi <ror...@tencent.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/MapOutputTracker.scala    |  2 ++
 .../org/apache/spark/MapOutputTrackerSuite.scala     | 20 ++++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index fade0b86dd8..5772285a63d 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -1112,6 +1112,8 @@ private[spark] class MapOutputTrackerMaster(
       startMapIndex: Int,
       endMapIndex: Int): Seq[String] =
   {
+    if (!shuffleLocalityEnabled) return Nil
+
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
     if (shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index dfad4a924d7..6b702df25c1 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1030,4 +1030,24 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
     rpcEnv.shutdown()
     assert(npeCounter.intValue() == 0)
   }
+
+  test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the 
config option") {
+    val rpcEnv = createRpcEnv("test")
+    val newConf = new SparkConf
+    newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false)
+    val tracker = newTrackerMaster(newConf)
+    try {
+      tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+        new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf))
+      tracker.registerShuffle(10, 6, 1)
+      tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 
1000),
+        Array(2L), 5))
+      val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]])
+      when(mockShuffleDep.shuffleId).thenReturn(10)
+      assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil)
+    } finally {
+      tracker.stop()
+      rpcEnv.shutdown()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to