This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new ffdb41674 [CELEBORN-2270] Fix problem with eviction to tiered storage 
during partition split
ffdb41674 is described below

commit ffdb41674596970c9a3ad7a85a08f81b37ae622d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Mar 2 18:07:39 2026 +0800

    [CELEBORN-2270] Fix problem with eviction to tiered storage during 
partition split
    
    NOTE: this PR is stacked on top of 
https://github.com/apache/celeborn/pull/3608
    
    Please consider only 756d25e49ef5f0321b90002d319b72924b9f4196
    
    ### What changes were proposed in this pull request?
    
    Handle the eviction to a different location type.
    
    ### Why are the changes needed?
    
    Because it may happen that a MEMORY file is to be evicted to another 
storage type (i.e. S3). This does not work.
    
    Usually, as described in tests in #3608 when you have tiered storage, the 
primary partition type is usually not MEMORY, but it may happen that during a 
partition split the client decides to use MEMORY.
    
    This patch fixes the problem on the worker side.
    An alternative fix would be to change the behavior of the client, and 
simulate what the master does when offering slots.
    Such a change would be more involved and it won't make the server side 
resilient to this scenario.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    - New integration tests
    - Manual testing on real k8s cluster with S3
    
    Closes #3610 from eolivelli/CELEBORN-2270-fix-partition-split.
    
    Authored-by: Enrico Olivelli <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../spark/EvictMemoryToTieredStorageTest.scala     | 63 ++++++++++++++++++----
 .../deploy/worker/storage/StorageManager.scala     | 10 +++-
 .../deploy/worker/storage/StoragePolicy.scala      | 23 +++++---
 .../storage/storagePolicy/StoragePolicyCase1.scala |  1 +
 .../storage/storagePolicy/StoragePolicyCase2.scala |  1 +
 .../storage/storagePolicy/StoragePolicyCase3.scala |  1 +
 .../storage/storagePolicy/StoragePolicyCase4.scala |  1 +
 7 files changed, 82 insertions(+), 18 deletions(-)

diff --git 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
index 567a5b283..fd6b6e4b1 100644
--- 
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
+++ 
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
@@ -71,9 +71,9 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
 
     val s3url = container.getS3URL
     val augmentedConfiguration = Map(
-      CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,HDD,S3",
-      CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,HDD,S3",
-      // CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
+      CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
+      CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
+      CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
       // note that in S3 (and Minio) you cannot upload parts smaller than 5MB, 
so we trigger eviction only when there
       // is enough data
       CelebornConf.WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE.key -> "5MB",
@@ -199,6 +199,52 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
     celebornSparkSession.stop()
   }
 
+  test("celeborn spark integration test - memory evict to s3 after partition 
split") {
+    assumeS3LibraryIsLoaded()
+
+    val sparkConf = new 
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
+    val celebornSparkSession = SparkSession.builder()
+      .config(updateSparkConfWithStorageTypes(sparkConf, ShuffleMode.HASH, 
"MEMORY,S3"))
+      // Set split threshold equal to WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE 
(5MB) to trigger
+      // the following sequence that reproduces the production failure:
+      // 1. MemoryTierWriter accumulates ~5MB → eviction → DfsTierWriter (S3) 
created.
+      //    getDiskFileInfo() is now non-null, enabling the regular 
split-threshold check.
+      //    (Without prior eviction to disk, getDiskFileInfo() == null and no 
split fires.)
+      // 2. The S3 file grows past 5MB → SOFT_SPLIT response sent to the Spark 
client.
+      // 3. ChangePartitionManager calls allocateFromCandidates, which builds 
the new location
+      //    with type=MEMORY (storageTypes.head for "MEMORY,S3") and 
availableTypes=MEMORY|S3.
+      // 4. The new MemoryTierWriter fills again → eviction triggered on the 
MEMORY-typed
+      //    location → StorageManager.createDiskFile must handle type=MEMORY 
as a valid S3
+      //    target (using availableStorageTypes) rather than rejecting it.
+      .config(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}", 
"5MB")
+      .getOrCreate()
+
+    // 20MB covers all three phases:
+    //   ~5MB to fill the first MemoryTierWriter and trigger eviction to S3,
+    //   ~5MB of additional S3 writes to exceed the split threshold and fire 
SOFT_SPLIT,
+    //   ~10MB remaining for the second MemoryTierWriter to fill and trigger 
the second eviction.
+    val sampleSeq: immutable.Seq[(String, Int)] = buildDataSet(20 * 1024 * 
1024)
+
+    repartition(celebornSparkSession, sequence = sampleSeq, partitions = 1)
+
+    // After splits there are more than 2 committed locations (one per epoch), 
so we assert
+    // type and path for each rather than an exact count.
+    assert(seenPartitionLocationsOpenReader.size >= 2)
+    seenPartitionLocationsOpenReader.asScala.foreach(location => {
+      assert(
+        location.getStorageInfo.getType == Type.MEMORY || 
location.getStorageInfo.getType == Type.S3)
+      assert(location.getStorageInfo.getFilePath == "")
+    })
+    assert(seenPartitionLocationsUpdateFileGroups.size >= 2)
+    seenPartitionLocationsUpdateFileGroups.asScala.foreach { location =>
+      if (location.getStorageInfo.getType == Type.MEMORY)
+        assert(location.getStorageInfo.getFilePath == "")
+      else if (location.getStorageInfo.getType == Type.S3)
+        assert(location.getStorageInfo.getFilePath.startsWith("s3://"))
+    }
+    celebornSparkSession.stop()
+  }
+
   test("celeborn spark integration test - push fails no way of evicting") {
     assumeS3LibraryIsLoaded()
 
@@ -216,16 +262,15 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
     celebornSparkSession.stop()
   }
 
-  private def buildBigDataSet = {
+  private def buildBigDataSet: immutable.Seq[(String, Int)] = buildDataSet(10 
* 1024 * 1024)
+
+  private def buildDataSet(sizeBytes: Int): immutable.Seq[(String, Int)] = {
     val big1KBString: String = StringUtils.repeat(' ', 1024)
-    val partitionSize = 10 * 1024 * 1024
-    val numValues = partitionSize / big1KBString.length
-    // we need to write enough to trigger eviction from MEMORY to S3
-    val sampleSeq: immutable.Seq[(String, Int)] = (1 to numValues)
+    val numValues = sizeBytes / big1KBString.length
+    (1 to numValues)
       .map(i => big1KBString + i) // all different keys
       .toList
       .map(v => (v.toUpperCase, Random.nextInt(12) + 1))
-    sampleSeq
   }
 
   def interceptLocationsSeenByClient(): Unit = {
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 116f431b8..6d375932b 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -1124,9 +1124,15 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       fileName: String,
       userIdentifier: UserIdentifier,
       partitionType: PartitionType,
-      partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
+      partitionSplitEnabled: Boolean,
+      overrideStorageType: StorageInfo.Type = null): (Flusher, DiskFileInfo, 
File) = {
     val suggestedMountPoint = location.getStorageInfo.getMountPoint
-    val storageType = location.getStorageInfo.getType
+
+    val storageType =
+      if (overrideStorageType != null)
+        overrideStorageType
+      else location.getStorageInfo.getType
+
     var retryCount = 0
     var exception: IOException = null
     val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 028e54bd7..7168a809d 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -49,7 +49,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
           true)
       }
     }
-    logError(s"Create evict file failed for 
${partitionDataWriterContext.getPartitionLocation}")
+    logError(s"Create evict file failed for 
${partitionDataWriterContext.getPartitionLocation} - no policy for 
${celebornFile.storageType.name()}")
     null
   }
 
@@ -94,6 +94,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
     }
 
     def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase 
= {
+      val overrideType = if (evict) storageInfoType else 
location.getStorageInfo.getType
       try {
         storageInfoType match {
           case StorageInfo.Type.MEMORY =>
@@ -118,11 +119,14 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                 partitionDataWriterContext,
                 storageManager)
             } else {
+              logWarning(
+                s"Not creating ${storageInfoType} file from 
${location.getStorageInfo.getType} for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
               null
             }
           case StorageInfo.Type.HDD | StorageInfo.Type.SSD | 
StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
             if (storageManager.localOrDfsStorageAvailable) {
-              logDebug(s"create non-memory file for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+              logDebug(
+                s"create non-memory file type $storageInfoType (evict=$evict, 
override=$overrideType) for ${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
               val (flusher, diskFileInfo, workingDir) = 
storageManager.createDiskFile(
                 location,
                 partitionDataWriterContext.getAppId,
@@ -130,7 +134,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
                 location.getFileName,
                 partitionDataWriterContext.getUserIdentifier,
                 partitionDataWriterContext.getPartitionType,
-                partitionDataWriterContext.isPartitionSplitEnabled)
+                partitionDataWriterContext.isPartitionSplitEnabled,
+                overrideType // this is different from location type, in case 
of eviction
+              )
               partitionDataWriterContext.setWorkingDir(workingDir)
               val metaHandler = getPartitionMetaHandler(diskFileInfo)
               if (flusher.isInstanceOf[LocalFlusher]
@@ -167,7 +173,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
         }
       } catch {
         case e: Exception =>
-          logError(s"create celeborn file for storage $storageInfoType 
failed", e)
+          logError(
+            s"create celeborn file for storage $storageInfoType failed for 
${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}",
+            e)
           null
       }
     }
@@ -193,7 +201,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
       }
 
     val maxSize = order.get.length
-    for (i <- tryCreateFileTypeIndex until maxSize) {
+    val firstIndex = tryCreateFileTypeIndex
+    for (i <- firstIndex until maxSize) {
       val storageStr = order.get(i)
       val storageInfoType = StorageInfo.fromStrToType(storageStr)
       val file = tryCreateFileByType(storageInfoType)
@@ -203,9 +212,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: 
StorageManager, source:
     }
 
     logError(
-      s"Could not create file for storage type 
${location.getStorageInfo.getType}")
+      s"Could not create file for storage type 
${location.getStorageInfo.getType}, tried ${order.get} firstIndex $firstIndex 
for ${partitionDataWriterContext.getShuffleKey} 
${partitionDataWriterContext.getPartitionLocation.getFileName}")
 
     throw new CelebornIOException(
-      s"Create file failed for context ${partitionDataWriterContext.toString}")
+      s"Create file failed for context ${partitionDataWriterContext.toString}, 
tried ${order.get} firstIndex $firstIndex")
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
index 640a67113..bdf9ce7cc 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
       any(),
       any(),
       any(),
+      any(),
       any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
 
   val memoryHintPartitionLocation =
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 7a7ab956d..9dcec7e52 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
       any(),
       any(),
       any(),
+      any(),
       any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
 
   val memoryHintPartitionLocation =
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 909703700..8f21a7f4c 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
       any(),
       any(),
       any(),
+      any(),
       any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
 
   val memoryHintPartitionLocation =
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index e25837d2b..dc321738e 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
       any(),
       any(),
       any(),
+      any(),
       any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
 
   val memoryHintPartitionLocation =

Reply via email to