This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 6aa7e020f [CELEBORN-619][CORE][SHUFFLE] Support enable DRA with Apache 
Celeborn
6aa7e020f is described below

commit 6aa7e020fcb16044337ed3ebb56bbc3eec34616f
Author: Ethan Feng <[email protected]>
AuthorDate: Mon Jun 5 09:50:05 2023 +0800

    [CELEBORN-619][CORE][SHUFFLE] Support enable DRA with Apache Celeborn
    
    ### What changes were proposed in this pull request?
    
    Adapt Spark DRA patch for spark 3.4
    
    ### Why are the changes needed?
    
    To support enabling DRA w/ Celeborn on Spark 3.4
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this PR provides a DRA patch for Spark 3.4
    
    ### How was this patch tested?
    
    Compiled with Spark 3.4
    
    Closes #1529 from FMX/CELEBORN-619.
    
    Authored-by: Ethan Feng <[email protected]>
    Signed-off-by: Ethan Feng <[email protected]>
    (cherry picked from commit 5600728149d86941def4f7db0aedd477267d0c15)
    Signed-off-by: Ethan Feng <[email protected]>
---
 README.md                                          |  5 +-
 .../{RSS_RDA_spark2.patch => RSS_DRA_spark2.patch} |  0
 .../{RSS_RDA_spark3.patch => RSS_DRA_spark3.patch} |  0
 ...RSS_RDA_spark3.patch => RSS_DRA_spark3_4.patch} | 95 +++++++++++-----------
 4 files changed, 51 insertions(+), 49 deletions(-)

diff --git a/README.md b/README.md
index d22388b73..c75724e31 100644
--- a/README.md
+++ b/README.md
@@ -257,8 +257,9 @@ See more detail in [CONFIGURATIONS](docs/configuration.md)
 
 ### Support Spark Dynamic Allocation
 We provide a patch to enable users to use Spark with both Dynamic Resource 
Allocation(DRA) and Celeborn.
-For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_RDA_spark2.patch).  
-For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_RDA_spark3.patch).
+For Spark2.x check [Spark2 Patch](assets/spark-patch/RSS_DRA_spark2.patch).  
+For Spark3.x check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3.patch).
+For Spark3.4 check [Spark3 Patch](assets/spark-patch/RSS_DRA_spark3_4.patch).
 
 ### Metrics
 Celeborn has various metrics. [METRICS](METRICS.md)
diff --git a/assets/spark-patch/RSS_RDA_spark2.patch 
b/assets/spark-patch/RSS_DRA_spark2.patch
similarity index 100%
rename from assets/spark-patch/RSS_RDA_spark2.patch
rename to assets/spark-patch/RSS_DRA_spark2.patch
diff --git a/assets/spark-patch/RSS_RDA_spark3.patch 
b/assets/spark-patch/RSS_DRA_spark3.patch
similarity index 100%
copy from assets/spark-patch/RSS_RDA_spark3.patch
copy to assets/spark-patch/RSS_DRA_spark3.patch
diff --git a/assets/spark-patch/RSS_RDA_spark3.patch 
b/assets/spark-patch/RSS_DRA_spark3_4.patch
similarity index 72%
rename from assets/spark-patch/RSS_RDA_spark3.patch
rename to assets/spark-patch/RSS_DRA_spark3_4.patch
index d07bcb46c..3b0bad14a 100644
--- a/assets/spark-patch/RSS_RDA_spark3.patch
+++ b/assets/spark-patch/RSS_DRA_spark3_4.patch
@@ -13,74 +13,75 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-Index: core/src/main/scala/org/apache/spark/util/Utils.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
---- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (date 
1652322540231)
-@@ -3222,6 +3222,9 @@
-     }
-     files.toSeq
-   }
-+
-+  def isRssEnabled(conf: SparkConf): Boolean =
-+    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
- }
- 
- private[util] object CallerContext extends Logging {
+Subject: [PATCH] rss support RDA.
+---
 Index: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
---- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(date 1652322692344)
-@@ -211,7 +211,7 @@
-           (decommissionEnabled &&
-             conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
-         logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
+--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala     
(date 1685515818323)
+@@ -209,7 +209,7 @@
+       } else if (decommissionEnabled &&
+           conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+         logInfo("Shuffle data decommission is enabled without a shuffle 
service.")
 -      } else if (!testing) {
 +      } else if (!testing && !Utils.isRssEnabled(conf)) {
          throw new SparkException("Dynamic allocation of executors requires 
the external " +
            "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
        }
-Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
-IDEA additional info:
-Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
-<+>UTF-8
-===================================================================
-diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(date 1652323382069)
-@@ -1015,7 +1015,8 @@
-     // and we are not using an external shuffle server which could serve the 
shuffle outputs.
-     // The reason is the next stage wouldn't be able to fetch the data from 
this dead executor
-     // so we would need to rerun these tasks on other executors.
--    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie) {
-+    if (isShuffleMapTasks && !env.blockManager.externalShuffleServiceEnabled 
&& !isZombie &&
-+      !Utils.isRssEnabled(conf)) {
-       for ((tid, info) <- taskInfos if info.executorId == execId) {
-         val index = taskInfos(tid).index
-         // We may have a running task whose partition has been marked as 
successful,
 Index: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
 IDEA additional info:
 Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
 <+>UTF-8
 ===================================================================
 diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
---- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision d1cae9c5ac5393243d2f9661dc7957d0ebccb1d6)
-+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(date 1652322695806)
-@@ -2231,7 +2231,8 @@
+--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala        
(date 1685516413475)
+@@ -2515,7 +2515,7 @@
      // if the cluster manager explicitly tells us that the entire worker was 
lost, then
      // we know to unregister shuffle output.  (Note that "worker" 
specifically refers to the process
      // from a Standalone cluster, where the shuffle service lives in the 
Worker.)
 -    val fileLost = workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled
-+    val fileLost = !Utils.isRssEnabled(sc.getConf) &&
-+      (workerHost.isDefined || 
!env.blockManager.externalShuffleServiceEnabled)
++    val fileLost = !Utils.isRssEnabled(sc.getConf) && (workerHost.isDefined 
|| !env.blockManager.externalShuffleServiceEnabled)
      removeExecutorAndUnregisterOutputs(
        execId = execId,
        fileLost = fileLost,
+Index: core/src/main/scala/org/apache/spark/util/Utils.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
+--- a/core/src/main/scala/org/apache/spark/util/Utils.scala    (revision 
efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/util/Utils.scala    (date 
1685515818317)
+@@ -3271,6 +3271,10 @@
+     files.toSeq
+   }
+
++  def isRssEnabled(conf: SparkConf): Boolean =
++    conf.get("spark.shuffle.manager", "sort").contains("celeborn")
++  }
++
+   /**
+    * Return the median number of a long array
+    *
+Index: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+IDEA additional info:
+Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
+<+>UTF-8
+===================================================================
+diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(revision efae5362cbeaf1594b18edf594e83b2cf72afce6)
++++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala      
(date 1685516047047)
+@@ -1055,7 +1055,7 @@
+     // data from this dead executor so we would need to rerun these tasks on 
other executors.
+     val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+       (reason.isInstanceOf[ExecutorDecommission] || 
!env.blockManager.externalShuffleServiceEnabled)
+-    if (maybeShuffleMapOutputLoss && !isZombie) {
++    if (maybeShuffleMapOutputLoss && !isZombie && !Utils.isRssEnabled(conf)) {
+       for ((tid, info) <- taskInfos if info.executorId == execId) {
+         val index = info.index
+         lazy val isShuffleMapOutputAvailable = reason match {

Reply via email to