This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new ffdb41674 [CELEBORN-2270] Fix problem with eviction to tiered storage
during partition split
ffdb41674 is described below
commit ffdb41674596970c9a3ad7a85a08f81b37ae622d
Author: Enrico Olivelli <[email protected]>
AuthorDate: Mon Mar 2 18:07:39 2026 +0800
[CELEBORN-2270] Fix problem with eviction to tiered storage during
partition split
NOTE: this PR is stacked on top of
https://github.com/apache/celeborn/pull/3608
Please consider only 756d25e49ef5f0321b90002d319b72924b9f4196
### What changes were proposed in this pull request?
Handle the eviction to a different location type.
### Why are the changes needed?
Because it may happen that a MEMORY file is to be evicted to another
storage type (i.e. S3). This does not work.
Usually, as described in tests in #3608 when you have tiered storage, the
primary partition type is usually not MEMORY, but it may happen that during a
partition split the client decides to use MEMORY.
This patch fixes the problem on the worker side.
An alternative fix would be to change the behavior of the client, and
simulate what the master does when offering slots.
Such a change would be more involved and it won't make the server side
resilient to this scenario.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
- New integration tests
- Manual testing on real k8s cluster with S3
Closes #3610 from eolivelli/CELEBORN-2270-fix-partition-split.
Authored-by: Enrico Olivelli <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../spark/EvictMemoryToTieredStorageTest.scala | 63 ++++++++++++++++++----
.../deploy/worker/storage/StorageManager.scala | 10 +++-
.../deploy/worker/storage/StoragePolicy.scala | 23 +++++---
.../storage/storagePolicy/StoragePolicyCase1.scala | 1 +
.../storage/storagePolicy/StoragePolicyCase2.scala | 1 +
.../storage/storagePolicy/StoragePolicyCase3.scala | 1 +
.../storage/storagePolicy/StoragePolicyCase4.scala | 1 +
7 files changed, 82 insertions(+), 18 deletions(-)
diff --git
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
index 567a5b283..fd6b6e4b1 100644
---
a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
+++
b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala
@@ -71,9 +71,9 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
val s3url = container.getS3URL
val augmentedConfiguration = Map(
- CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,HDD,S3",
- CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,HDD,S3",
- // CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
+ CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3",
+ CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3",
+ CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3",
// note that in S3 (and Minio) you cannot upload parts smaller than 5MB,
so we trigger eviction only when there
// is enough data
CelebornConf.WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE.key -> "5MB",
@@ -199,6 +199,52 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
celebornSparkSession.stop()
}
+ test("celeborn spark integration test - memory evict to s3 after partition
split") {
+ assumeS3LibraryIsLoaded()
+
+ val sparkConf = new
SparkConf().setAppName("celeborn-demo").setMaster("local[2]")
+ val celebornSparkSession = SparkSession.builder()
+ .config(updateSparkConfWithStorageTypes(sparkConf, ShuffleMode.HASH,
"MEMORY,S3"))
+ // Set split threshold equal to WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE
(5MB) to trigger
+ // the following sequence that reproduces the production failure:
+ // 1. MemoryTierWriter accumulates ~5MB → eviction → DfsTierWriter (S3)
created.
+ // getDiskFileInfo() is now non-null, enabling the regular
split-threshold check.
+ // (Without prior eviction to disk, getDiskFileInfo() == null and no
split fires.)
+ // 2. The S3 file grows past 5MB → SOFT_SPLIT response sent to the Spark
client.
+ // 3. ChangePartitionManager calls allocateFromCandidates, which builds
the new location
+ // with type=MEMORY (storageTypes.head for "MEMORY,S3") and
availableTypes=MEMORY|S3.
+ // 4. The new MemoryTierWriter fills again → eviction triggered on the
MEMORY-typed
+ // location → StorageManager.createDiskFile must handle type=MEMORY
as a valid S3
+ // target (using availableStorageTypes) rather than rejecting it.
+ .config(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}",
"5MB")
+ .getOrCreate()
+
+ // 20MB covers all three phases:
+ // ~5MB to fill the first MemoryTierWriter and trigger eviction to S3,
+ // ~5MB of additional S3 writes to exceed the split threshold and fire
SOFT_SPLIT,
+ // ~10MB remaining for the second MemoryTierWriter to fill and trigger
the second eviction.
+ val sampleSeq: immutable.Seq[(String, Int)] = buildDataSet(20 * 1024 *
1024)
+
+ repartition(celebornSparkSession, sequence = sampleSeq, partitions = 1)
+
+ // After splits there are more than 2 committed locations (one per epoch),
so we assert
+ // type and path for each rather than an exact count.
+ assert(seenPartitionLocationsOpenReader.size >= 2)
+ seenPartitionLocationsOpenReader.asScala.foreach(location => {
+ assert(
+ location.getStorageInfo.getType == Type.MEMORY ||
location.getStorageInfo.getType == Type.S3)
+ assert(location.getStorageInfo.getFilePath == "")
+ })
+ assert(seenPartitionLocationsUpdateFileGroups.size >= 2)
+ seenPartitionLocationsUpdateFileGroups.asScala.foreach { location =>
+ if (location.getStorageInfo.getType == Type.MEMORY)
+ assert(location.getStorageInfo.getFilePath == "")
+ else if (location.getStorageInfo.getType == Type.S3)
+ assert(location.getStorageInfo.getFilePath.startsWith("s3://"))
+ }
+ celebornSparkSession.stop()
+ }
+
test("celeborn spark integration test - push fails no way of evicting") {
assumeS3LibraryIsLoaded()
@@ -216,16 +262,15 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite
celebornSparkSession.stop()
}
- private def buildBigDataSet = {
+ private def buildBigDataSet: immutable.Seq[(String, Int)] = buildDataSet(10
* 1024 * 1024)
+
+ private def buildDataSet(sizeBytes: Int): immutable.Seq[(String, Int)] = {
val big1KBString: String = StringUtils.repeat(' ', 1024)
- val partitionSize = 10 * 1024 * 1024
- val numValues = partitionSize / big1KBString.length
- // we need to write enough to trigger eviction from MEMORY to S3
- val sampleSeq: immutable.Seq[(String, Int)] = (1 to numValues)
+ val numValues = sizeBytes / big1KBString.length
+ (1 to numValues)
.map(i => big1KBString + i) // all different keys
.toList
.map(v => (v.toUpperCase, Random.nextInt(12) + 1))
- sampleSeq
}
def interceptLocationsSeenByClient(): Unit = {
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 116f431b8..6d375932b 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -1124,9 +1124,15 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
fileName: String,
userIdentifier: UserIdentifier,
partitionType: PartitionType,
- partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
+ partitionSplitEnabled: Boolean,
+ overrideStorageType: StorageInfo.Type = null): (Flusher, DiskFileInfo,
File) = {
val suggestedMountPoint = location.getStorageInfo.getMountPoint
- val storageType = location.getStorageInfo.getType
+
+ val storageType =
+ if (overrideStorageType != null)
+ overrideStorageType
+ else location.getStorageInfo.getType
+
var retryCount = 0
var exception: IOException = null
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index 028e54bd7..7168a809d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -49,7 +49,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
true)
}
}
- logError(s"Create evict file failed for
${partitionDataWriterContext.getPartitionLocation}")
+ logError(s"Create evict file failed for
${partitionDataWriterContext.getPartitionLocation} - no policy for
${celebornFile.storageType.name()}")
null
}
@@ -94,6 +94,7 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
}
def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase
= {
+ val overrideType = if (evict) storageInfoType else
location.getStorageInfo.getType
try {
storageInfoType match {
case StorageInfo.Type.MEMORY =>
@@ -118,11 +119,14 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
partitionDataWriterContext,
storageManager)
} else {
+ logWarning(
+ s"Not creating ${storageInfoType} file from
${location.getStorageInfo.getType} for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
null
}
case StorageInfo.Type.HDD | StorageInfo.Type.SSD |
StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
if (storageManager.localOrDfsStorageAvailable) {
- logDebug(s"create non-memory file for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
+ logDebug(
+ s"create non-memory file type $storageInfoType (evict=$evict,
override=$overrideType) for ${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
val (flusher, diskFileInfo, workingDir) =
storageManager.createDiskFile(
location,
partitionDataWriterContext.getAppId,
@@ -130,7 +134,9 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
location.getFileName,
partitionDataWriterContext.getUserIdentifier,
partitionDataWriterContext.getPartitionType,
- partitionDataWriterContext.isPartitionSplitEnabled)
+ partitionDataWriterContext.isPartitionSplitEnabled,
+ overrideType // this is different from location type, in case
of eviction
+ )
partitionDataWriterContext.setWorkingDir(workingDir)
val metaHandler = getPartitionMetaHandler(diskFileInfo)
if (flusher.isInstanceOf[LocalFlusher]
@@ -167,7 +173,9 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
}
} catch {
case e: Exception =>
- logError(s"create celeborn file for storage $storageInfoType
failed", e)
+ logError(
+ s"create celeborn file for storage $storageInfoType failed for
${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}",
+ e)
null
}
}
@@ -193,7 +201,8 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
}
val maxSize = order.get.length
- for (i <- tryCreateFileTypeIndex until maxSize) {
+ val firstIndex = tryCreateFileTypeIndex
+ for (i <- firstIndex until maxSize) {
val storageStr = order.get(i)
val storageInfoType = StorageInfo.fromStrToType(storageStr)
val file = tryCreateFileByType(storageInfoType)
@@ -203,9 +212,9 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
}
logError(
- s"Could not create file for storage type
${location.getStorageInfo.getType}")
+ s"Could not create file for storage type
${location.getStorageInfo.getType}, tried ${order.get} firstIndex $firstIndex
for ${partitionDataWriterContext.getShuffleKey}
${partitionDataWriterContext.getPartitionLocation.getFileName}")
throw new CelebornIOException(
- s"Create file failed for context ${partitionDataWriterContext.toString}")
+ s"Create file failed for context ${partitionDataWriterContext.toString},
tried ${order.get} firstIndex $firstIndex")
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
index 640a67113..bdf9ce7cc 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
any(),
any(),
any(),
+ any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
val memoryHintPartitionLocation =
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
index 7a7ab956d..9dcec7e52 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
any(),
any(),
any(),
+ any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
val memoryHintPartitionLocation =
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
index 909703700..8f21a7f4c 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
any(),
any(),
any(),
+ any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
val memoryHintPartitionLocation =
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
index e25837d2b..dc321738e 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala
@@ -66,6 +66,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
any(),
any(),
any(),
+ any(),
any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile))
val memoryHintPartitionLocation =