This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 560072814 [CELEBORN-619][CORE][SHUFFLE] Support enable DRA with Apache
Celeborn
560072814 is described below
commit 5600728149d86941def4f7db0aedd477267d0c15
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jun 5 09:50:05 2023 +0800
[CELEBORN-619][CORE][SHUFFLE] Support enable DRA with Apache Celeborn
### What changes were proposed in this pull request?
Adapt Spark DRA patch for spark 3.4
### Why are the changes needed?
To support enabling DRA w/ Celeborn on Spark 3.4
### Does this PR introduce _any_ user-facing change?
Yes, this PR provides a DRA patch for Spark 3.4
### How was this patch tested?
Compiled with Spark 3.4
Closes #1529 from FMX/CELEBORN-619.
Authored-by: Ethan Feng <[email protected]>
Signed-off-by: Ethan Feng <[email protected]>
---
README.md | 5 +-
.../{RSS_RDA_spark2.patch => RSS_DRA_spark2.patch} | 0
.../{RSS_RDA_spark3.patch => RSS_DRA_spark3.patch} | 0
...RSS_RDA_spark3.patch => RSS_DRA_spark3_4.patch} | 95 +++++++++++-----------
4 files changed, 51 insertions(+), 49 deletions(-)
diff --git a/README.md b/README.md
index d22388b73..c75724e31 100644
--- a/README.md
+++ b/README.md
@@ -257,8 +257,9 @@ See more detail in [CONFIGURATIONS](docs/configuration.md)
### Support Spark Dynamic Allocation
We provide a patch to enable users to use Spark with both Dynamic Resource
Allocation(DRA) and Celeborn.
-For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_RDA_spark2.patch).
-For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_RDA_spark3.patch).
+For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_DRA_spark2.patch).
+For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3.patch).
+For Spark3.4 check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3_4.patch).
### Metrics
Celeborn has various metrics. [METRICS](METRICS.md)
diff --git a/assets/spark-patch/RSS_RDA_spark2.patch
b/assets/spark-patch/RSS_DRA_spark2.patch
similarity index 100%
rename from assets/spark-patch/RSS_RDA_spark2.patch
rename to assets/spark-patch/RSS_DRA_spark2.patch
diff --git a/assets/spark-patch/RSS_RDA_spark3.patch
b/assets/spark-patch/RSS_DRA_spark3.patch
similarity index 100%
copy from assets/spark-patch/RSS_RDA_spark3.patch
copy to assets/spark-patch/RSS_DRA_spark3.patch
diff --git a/assets/spark-patch/RSS_RDA_spark3.patch
b/assets/spark-patch/RSS_DRA_spark3_4.patch
similarity index 72%
rename from assets/spark-patch/RSS_RDA_spark3.patch
rename to assets/spark-patch/RSS_DRA_spark3_4.patch
index d07bcb46c..3b0bad14a 100644
--- a/assets/spark-patch/RSS_RDA_spark3.patch
+++ b/assets/spark-patch/RSS_DRA_spark3_4.patch
@@ -13,74 +13,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (date
1652322540231)
-@@ -3222,6 +3222,9 @@
- }
- files.toSeq
- }
-+
-+ def isRssEnabled(conf: SparkConf): Boolean =
-+ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
-
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] rss support RDA.
+---
Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(date 1652322692344)
-@@ -211,7 +211,7 @@
- (decommissionEnabled &&
- conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
- logWarning("Dynamic allocation without a shuffle service is an
experimental feature.")
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
(date 1685515818323)
+@@ -209,7 +209,7 @@
+ } else if (decommissionEnabled &&
+ conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+ logInfo("Shuffle data decommission is enabled without a shuffle
service.")
- } else if (!testing) {
+ } else if (!testing && !Utils.isRssEnabled(conf)) {
throw new SparkException("Dynamic allocation of executors requires
the external " +
"shuffle service. You may enable this through
spark.shuffle.service.enabled.")
}
-Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
- // and we are not using an external shuffle server which could serve the
shuffle outputs.
- // The reason is the next stage wouldn't be able to fetch the data from
this dead executor
- // so we would need to rerun these tasks on other executors.
-- if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie) {
-+ if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled
&& !isZombie &&
-+ !Utils.isRssEnabled(conf)) {
- for ((tid, info) <- taskInfos if info.executorId == execId) {
- val index = taskInfos(tid).index
- // We may have a running task whose partition has been marked as
successful,
Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
(date 1685516413475)
+@@ -2515,7 +2515,7 @@
// if the cluster manager explicitly tells us that the entire worker was
lost, then
// we know to unregister shuffle output. (Note that "worker"
specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the
Worker.)
- val fileLost = workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled
-+ val fileLost = !Utils.isRssEnabled(sc.getConf) &&
-+ (workerHost.isDefined ||
!env.blockManager.externalShuffleServiceEnabled)
++ val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerHost.isDefined
|| !env.blockManager.externalShuffleServiceEnabled)
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = fileLost,
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala (revision
efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala (date
1685515818317)
+@@ -3271,6 +3271,10 @@
+ files.toSeq
+ }
+
++ def isRssEnabled(conf: SparkConf): Boolean =
++ conf.get("spark.shuffle.manager", "sort").contains("celeborn")
++ }
++
+ /**
+ * Return the median number of a long array
+ *
+Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
(date 1685516047047)
+@@ -1055,7 +1055,7 @@
+ // data from this dead executor so we would need to rerun these tasks on
other executors.
+ val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+ (reason.isInstanceOf[ExecutorDecommission] ||
!env.blockManager.externalShuffleServiceEnabled)
+- if (maybeShuffleMapOutputLoss && !isZombie) {
++ if (maybeShuffleMapOutputLoss && !isZombie && !Utils.isRssEnabled(conf)) {
+ for ((tid, info) <- taskInfos if info.executorId == execId) {
+ val index = info.index
+ lazy val isShuffleMapOutputAvailable = reason match {