This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 60849b78204 [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled` 60849b78204 is described below commit 60849b78204e69392976420b9a813bed0790e4e9 Author: roryqi <ror...@tencent.com> AuthorDate: Thu Mar 9 11:50:05 2023 -0600 [SPARK-42719][CORE] `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled` ### What changes were proposed in this pull request? `MapOutputTracker#getMapLocation` should respect `spark.shuffle.reduceLocality.enabled` ### Why are the changes needed? Discuss as https://github.com/apache/spark/pull/40307 getPreferredLocations in ShuffledRowRDD should return Nil at the very beginning in case spark.shuffle.reduceLocality.enabled = false (conceptually). This logic is pushed into MapOutputTracker though - and getPreferredLocationsForShuffle honors spark.shuffle.reduceLocality.enabled - but getMapLocation does not. So the fix would be to fix getMapLocation to honor the parameter. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New ut Closes #40339 from jerqi/new_feature. Authored-by: roryqi <ror...@tencent.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../scala/org/apache/spark/MapOutputTracker.scala | 2 ++ .../org/apache/spark/MapOutputTrackerSuite.scala | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index fade0b86dd8..5772285a63d 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -1112,6 +1112,8 @@ private[spark] class MapOutputTrackerMaster( startMapIndex: Int, endMapIndex: Int): Seq[String] = { + if (!shuffleLocalityEnabled) return Nil + val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull if (shuffleStatus != null) { shuffleStatus.withMapStatuses { statuses => diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index dfad4a924d7..6b702df25c1 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -1030,4 +1030,24 @@ class MapOutputTrackerSuite extends SparkFunSuite with LocalSparkContext { rpcEnv.shutdown() assert(npeCounter.intValue() == 0) } + + test("SPARK-42719: `MapOutputTracker#getMapLocation` should respect the config option") { + val rpcEnv = createRpcEnv("test") + val newConf = new SparkConf + newConf.set(SHUFFLE_REDUCE_LOCALITY_ENABLE, false) + val tracker = newTrackerMaster(newConf) + try { + tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, + new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, newConf)) + tracker.registerShuffle(10, 6, 1) + tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000), + Array(2L), 5)) + val mockShuffleDep = mock(classOf[ShuffleDependency[Int, Int, _]]) + when(mockShuffleDep.shuffleId).thenReturn(10) + assert(tracker.getMapLocation(mockShuffleDep, 0, 1) === Nil) + } finally { + tracker.stop() + rpcEnv.shutdown() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org