This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new af21f0d5 [MINOR] improvement: Reduce the size of Spark patch (#699)
af21f0d5 is described below

commit af21f0d58936dd6625e6ffbd3197afb776c18319
Author: roryqi <[email protected]>
AuthorDate: Thu Mar 9 18:48:32 2023 +0800

    [MINOR] improvement: Reduce the size of Spark patch (#699)
    
    ### What changes were proposed in this pull request?
    As https://github.com/apache/spark/pull/40307#issuecomment-1457619866, we 
could use `spark.shuffle.reduceLocality.enabled` to reduce the modification of 
the Apache Spark.
    
    ### Why are the changes needed?
    
    Reduce the spark patch size
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    No need
    
    Co-authored-by: roryqi <[email protected]>
---
 .../org/apache/spark/shuffle/RssShuffleManager.java |  4 ++++
 .../org/apache/spark/shuffle/RssShuffleManager.java |  4 ++++
 .../spark-2.4.6_dynamic_allocation_support.patch    | 21 ---------------------
 3 files changed, 8 insertions(+), 21 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5778f341..9b14f52c 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -188,6 +188,10 @@ public class RssShuffleManager implements ShuffleManager {
     // External shuffle service is not supported when using remote shuffle 
service
     sparkConf.set("spark.shuffle.service.enabled", "false");
     LOG.info("Disable external shuffle service in RssShuffleManager.");
+    // If we store shuffle data in distributed filesystem or in a disaggregated
+    // shuffle cluster, we don't need shuffle data locality
+    sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
+    LOG.info("Disable shuffle data locality in RssShuffleManager.");
     if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
       // for non-driver executor, start a thread for sending shuffle data to 
shuffle server
       LOG.info("RSS data send thread is starting");
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e08607ec..5b21ffda 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -205,6 +205,10 @@ public class RssShuffleManager implements ShuffleManager {
     LOG.info("Disable external shuffle service in RssShuffleManager.");
     sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
     LOG.info("Disable local shuffle reader in RssShuffleManager.");
+    // If we store shuffle data in distributed filesystem or in a disaggregated
+    // shuffle cluster, we don't need shuffle data locality
+    sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
+    LOG.info("Disable shuffle data locality in RssShuffleManager.");
     taskToSuccessBlockIds = Maps.newConcurrentMap();
     taskToFailedBlockIds = Maps.newConcurrentMap();
     // for non-driver executor, start a thread for sending shuffle data to 
shuffle server
diff --git a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch 
b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
index 22581d46..63572b96 100644
--- a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
+++ b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
@@ -69,24 +69,3 @@ index 459f575ba7..f563368fa5 100644
        for ((tid, info) <- taskInfos if info.executorId == execId) {
          val index = taskInfos(tid).index
          // We may have a running task whose partition has been marked as 
successful,
-diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-index 021ce2eac0..50c52b0091 100644
---- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-@@ -149,9 +149,13 @@ class ShuffledRowRDD(
-   }
- 
-   override def getPreferredLocations(partition: Partition): Seq[String] = {
--    val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
--    val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
--    tracker.getPreferredLocationsForShuffle(dep, partition.index)
-+    if (!conf.isRssEnable()) {
-+      val tracker = 
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-+      val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
-+      tracker.getPreferredLocationsForShuffle(dep, partition.index)
-+    } else {
-+      Nil
-+    }
-   }
- 
-   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {

Reply via email to