This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 35d51571a80 [SPARK-42784] should still create subDir when the number 
of subDir in merge dir is less than conf
35d51571a80 is described below

commit 35d51571a803b8fa7d14542236276425b517d3af
Author: meifencheng <meifench...@meituan.com>
AuthorDate: Fri Jun 30 22:50:14 2023 -0500

    [SPARK-42784] should still create subDir when the number of subDir in merge 
dir is less than conf
    
    ### What changes were proposed in this pull request?
    Fixed a minor issue with diskBlockManager after push-based shuffle is 
enabled
    
    ### Why are the changes needed?
    this bug will affect the efficiency of push based shuffle
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #40412 from Stove-hust/feature-42784.
    
    Authored-by: meifencheng <meifench...@meituan.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala   | 2 +-
 .../test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 971647be06e..0427fbd9b62 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -273,7 +273,7 @@ private[spark] class DiskBlockManager(
       Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
         try {
           val mergeDir = new File(rootDir, mergeDirName)
-          if (!mergeDir.exists()) {
+          if (!mergeDir.exists() || mergeDir.listFiles().length < 
subDirsPerLocalDir) {
             // This executor does not find merge_manager directory, it will 
try to create
             // the merge_manager directory and the sub directories.
             logDebug(s"Try to create $mergeDir and its sub dirs since the " +
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index ac896c0b17a..48610cbc025 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -108,8 +108,8 @@ class DiskBlockManagerSuite extends SparkFunSuite {
     assert(Utils.getConfiguredLocalDirs(testConf).map(
       rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
       .filter(mergeDir => mergeDir.exists()).length === 2)
-    // mergeDir0 will be skipped as it already exists
-    assert(mergeDir0.list().length === 0)
+    // mergeDir0 can not be skipped even if it already exists
+    assert(mergeDir0.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
     // Sub directories get created under mergeDir1
     assert(mergeDir1.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to