This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b39f2d6acf2 [SPARK-40636][CORE] Fix wrong remained shuffles log in 
BlockManagerDecommissioner
b39f2d6acf2 is described below

commit b39f2d6acf25726d99bf2c2fa84ba6a227d0d909
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Tue Oct 4 13:38:17 2022 -0700

    [SPARK-40636][CORE] Fix wrong remained shuffles log in 
BlockManagerDecommissioner
    
    ### What changes were proposed in this pull request
    Fix wrong remained shuffles log in BlockManagerDecommissioner
    
    ### Why are the changes needed?
    BlockManagerDecommissioner should log correct remained shuffles. Current 
log used all shuffles num as remained.
    
    ```
    4 of 24 local shuffles are added. In total, 24 shuffles are remained.
    2022-09-30 17:42:15.035 PDT
    0 of 24 local shuffles are added. In total, 24 shuffles are remained.
    2022-09-30 17:42:45.069 PDT
    0 of 24 local shuffles are added. In total, 24 shuffles are remained.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually tested
    
    Closes #38078 from warrenzhu25/deco-log.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../scala/org/apache/spark/storage/BlockManagerDecommissioner.scala    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 6e3cf9c9b41..3a698ce4f70 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -280,8 +280,9 @@ private[storage] class BlockManagerDecommissioner(
       .sortBy(b => (b.shuffleId, b.mapId))
     shufflesToMigrate.addAll(newShufflesToMigrate.map(x => (x, 0)).asJava)
     migratingShuffles ++= newShufflesToMigrate
+    val remainedShuffles = migratingShuffles.size - numMigratedShuffles.get()
     logInfo(s"${newShufflesToMigrate.size} of ${localShuffles.size} local 
shuffles " +
-      s"are added. In total, ${migratingShuffles.size} shuffles are remained.")
+      s"are added. In total, $remainedShuffles shuffles are remained.")
 
     // Update the threads doing migrations
     val livePeerSet = bm.getPeers(false).toSet


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to