This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 39d6e87  [SPARK-32920][FOLLOW-UP] Fix shuffleMergeFinalized directly 
calling rdd.getNumPartitions as RDD is not serialized to executor
39d6e87 is described below

commit 39d6e87bd9a027b4b4f5fc2604edb219ad68a185
Author: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
AuthorDate: Mon Jul 26 09:17:04 2021 -0500

    [SPARK-32920][FOLLOW-UP] Fix shuffleMergeFinalized directly calling 
rdd.getNumPartitions as RDD is not serialized to executor
    
    ### What changes were proposed in this pull request?
    
    `ShuffleMapTask` should not push blocks if a shuffle is already merge 
finalized. Currently block push is disabled for retry cases. Also fix 
`shuffleMergeFinalized` calling `rdd.getNumPartitions` as RDD is not serialized 
causing issues.
    
    ### Why are the changes needed?
    
    No
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    Closes #33426 from venkata91/SPARK-32920-follow-up.
    
    Authored-by: Venkata krishnan Sowrirajan <vsowrira...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit ba1a7ce5ec5bd15f81a70a3c5fcd4dc6893589ed)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 core/src/main/scala/org/apache/spark/Dependency.scala                 | 4 +++-
 .../main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala   | 3 +--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index 0a9acf4..4063d11 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -97,6 +97,8 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
   val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
     shuffleId, this)
 
+  private[this] val numPartitions = rdd.partitions.length
+
   // By default, shuffle merge is enabled for ShuffleDependency if push based 
shuffle
   // is enabled
   private[this] var _shuffleMergeEnabled =
@@ -141,7 +143,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
    */
   def shuffleMergeFinalized: Boolean = {
     // Empty RDD won't be computed therefore shuffle merge finalized should be 
true by default.
-    if (shuffleMergeEnabled && rdd.getNumPartitions > 0) {
+    if (shuffleMergeEnabled && numPartitions > 0) {
       _shuffleMergedFinalized
     } else {
       true
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
index abff650..270d23e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
@@ -21,7 +21,6 @@ import org.apache.spark.{Partition, ShuffleDependency, 
SparkEnv, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.util.Utils
 
 /**
  * The interface for customizing shuffle write process. The driver create a 
ShuffleWriteProcessor
@@ -64,7 +63,7 @@ private[spark] class ShuffleWriteProcessor extends 
Serializable with Logging {
         // The map task only takes care of converting the shuffle data file 
into multiple
         // block push requests. It delegates pushing the blocks to a different 
thread-pool -
         // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
-        if (Utils.isPushBasedShuffleEnabled(SparkEnv.get.conf) && 
dep.getMergerLocs.nonEmpty) {
+        if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && 
!dep.shuffleMergeFinalized) {
           manager.shuffleBlockResolver match {
             case resolver: IndexShuffleBlockResolver =>
               val dataFile = resolver.getDataFile(dep.shuffleId, mapId)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to