This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f6ee3d0  [SPARK-31082][CORE] MapOutputTrackerMaster.getMapLocation 
should handle last mapIndex correctly
f6ee3d0 is described below

commit f6ee3d061409128c301e5f0d9ae9733132173cc8
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon Mar 9 15:53:34 2020 +0800

    [SPARK-31082][CORE] MapOutputTrackerMaster.getMapLocation should handle 
last mapIndex correctly
    
    ### What changes were proposed in this pull request?
    
    In `getMapLocation`, change the condition from `...endMapIndex < 
statuses.length` to `...endMapIndex <= statuses.length`.
    
    ### Why are the changes needed?
    
    `endMapIndex` is exclusive, we should include it when comparing to 
`statuses.length`. Otherwise, we can't get the location for last mapIndex.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated existed test.
    
    Closes #27850 from Ngone51/fix_getmaploction.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit ef51ff9dc8c220fcbed76cdd1783f58f400df48c)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala    |  5 +++--
 .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 10 +++++++---
 2 files changed, 10 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index f229061..ec8621b 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -696,7 +696,7 @@ private[spark] class MapOutputTrackerMaster(
    *
    * @param dep shuffle dependency object
    * @param startMapIndex the start map index
-   * @param endMapIndex the end map index
+   * @param endMapIndex the end map index (exclusive)
    * @return a sequence of locations where task runs.
    */
   def getMapLocation(
@@ -707,7 +707,8 @@ private[spark] class MapOutputTrackerMaster(
     val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull
     if (shuffleStatus != null) {
       shuffleStatus.withMapStatuses { statuses =>
-        if (startMapIndex < endMapIndex && (startMapIndex >= 0 && endMapIndex 
< statuses.length)) {
+        if (startMapIndex < endMapIndex &&
+          (startMapIndex >= 0 && endMapIndex <= statuses.length)) {
           val statusesPicked = statuses.slice(startMapIndex, 
endMapIndex).filter(_ != null)
           statusesPicked.map(_.location.host).toSeq
         } else {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 25b1f89..94947a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -114,9 +114,13 @@ class AdaptiveQueryExecSuite
 
     val numLocalReaders = collect(plan) {
       case reader @ CustomShuffleReaderExec(_, _, 
LOCAL_SHUFFLE_READER_DESCRIPTION) => reader
-    }.length
-
-    assert(numShuffles === (numLocalReaders + numShufflesWithoutLocalReader))
+    }
+    numLocalReaders.foreach { r =>
+      val rdd = r.execute()
+      val parts = rdd.partitions
+      assert(parts.forall(rdd.preferredLocations(_).nonEmpty))
+    }
+    assert(numShuffles === (numLocalReaders.length + 
numShufflesWithoutLocalReader))
   }
 
   test("Change merge join to broadcast join") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to