This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b3e14d39854 [SPARK-44345][CORE] Lower `unknown shuffle map output` log 
level to WARN if shuffle migration is enabled
b3e14d39854 is described below

commit b3e14d398542bcfebdad2a0b7b52af8c52e1d62f
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Sat Sep 23 19:03:45 2023 -0700

    [SPARK-44345][CORE] Lower `unknown shuffle map output` log level to WARN if 
shuffle migration is enabled
    
    ### What changes were proposed in this pull request?
    Only log unknown shuffle map output as error when shuffle migration disabled
    
    ### Why are the changes needed?
    When decommission and shuffle migration is enabled, there're lots of error 
message like `Asked to update map output for unknown shuffle` .
    
    As shuffle clean and unregister is done by `ContextCleaner` in an async 
way, when target block manager received the shuffle block from decommissioned 
block manager, then update shuffle location to map output tracker, but at that 
time, shuffle might have been unregistered. This should not consider as error.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually tested.
    
    Closes #41906 from warrenzhu25/unknown-shuffle.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 3495536a350..4e6c2213e1e 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -680,6 +680,9 @@ private[spark] class MapOutputTrackerMaster(
   /** Whether to compute locality preferences for reduce tasks */
   private val shuffleLocalityEnabled = conf.get(SHUFFLE_REDUCE_LOCALITY_ENABLE)
 
+  private val shuffleMigrationEnabled = conf.get(DECOMMISSION_ENABLED) &&
+    conf.get(STORAGE_DECOMMISSION_ENABLED) && 
conf.get(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
+
   // Number of map and reduce tasks above which we do not assign preferred 
locations based on map
   // output sizes. We limit the size of jobs for which assign preferred 
locations as computing the
   // top locations by size becomes expensive.
@@ -808,6 +811,8 @@ private[spark] class MapOutputTrackerMaster(
     shuffleStatuses.get(shuffleId) match {
       case Some(shuffleStatus) =>
         shuffleStatus.updateMapOutput(mapId, bmAddress)
+      case None if shuffleMigrationEnabled =>
+        logWarning(s"Asked to update map output for unknown shuffle 
${shuffleId}")
       case None =>
         logError(s"Asked to update map output for unknown shuffle 
${shuffleId}")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to