This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new af21f0d5 [MINOR] improvement: Reduce the size of Spark patch (#699)
af21f0d5 is described below
commit af21f0d58936dd6625e6ffbd3197afb776c18319
Author: roryqi <[email protected]>
AuthorDate: Thu Mar 9 18:48:32 2023 +0800
[MINOR] improvement: Reduce the size of Spark patch (#699)
### What changes were proposed in this pull request?
As https://github.com/apache/spark/pull/40307#issuecomment-1457619866, we
could use `spark.shuffle.reduceLocality.enabled` to reduce the modification of
the Apache Spark.
### Why are the changes needed?
Reduce the spark patch size
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
No need
Co-authored-by: roryqi <[email protected]>
---
.../org/apache/spark/shuffle/RssShuffleManager.java | 4 ++++
.../org/apache/spark/shuffle/RssShuffleManager.java | 4 ++++
.../spark-2.4.6_dynamic_allocation_support.patch | 21 ---------------------
3 files changed, 8 insertions(+), 21 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 5778f341..9b14f52c 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -188,6 +188,10 @@ public class RssShuffleManager implements ShuffleManager {
// External shuffle service is not supported when using remote shuffle
service
sparkConf.set("spark.shuffle.service.enabled", "false");
LOG.info("Disable external shuffle service in RssShuffleManager.");
+ // If we store shuffle data in distributed filesystem or in a disaggregated
+ // shuffle cluster, we don't need shuffle data locality
+ sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
+ LOG.info("Disable shuffle data locality in RssShuffleManager.");
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
// for non-driver executor, start a thread for sending shuffle data to
shuffle server
LOG.info("RSS data send thread is starting");
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index e08607ec..5b21ffda 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -205,6 +205,10 @@ public class RssShuffleManager implements ShuffleManager {
LOG.info("Disable external shuffle service in RssShuffleManager.");
sparkConf.set("spark.sql.adaptive.localShuffleReader.enabled", "false");
LOG.info("Disable local shuffle reader in RssShuffleManager.");
+ // If we store shuffle data in distributed filesystem or in a disaggregated
+ // shuffle cluster, we don't need shuffle data locality
+ sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
+ LOG.info("Disable shuffle data locality in RssShuffleManager.");
taskToSuccessBlockIds = Maps.newConcurrentMap();
taskToFailedBlockIds = Maps.newConcurrentMap();
// for non-driver executor, start a thread for sending shuffle data to
shuffle server
diff --git a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
index 22581d46..63572b96 100644
--- a/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
+++ b/spark-patches/spark-2.4.6_dynamic_allocation_support.patch
@@ -69,24 +69,3 @@ index 459f575ba7..f563368fa5 100644
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
// We may have a running task whose partition has been marked as
successful,
-diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-index 021ce2eac0..50c52b0091 100644
----
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
-@@ -149,9 +149,13 @@ class ShuffledRowRDD(
- }
-
- override def getPreferredLocations(partition: Partition): Seq[String] = {
-- val tracker =
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-- val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
-- tracker.getPreferredLocationsForShuffle(dep, partition.index)
-+ if (!conf.isRssEnable()) {
-+ val tracker =
SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
-+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
-+ tracker.getPreferredLocationsForShuffle(dep, partition.index)
-+ } else {
-+ Nil
-+ }
- }
-
- override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {