This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 532cedbfd [CELEBORN-1844][FOLLOWUP] alway try to use memory storage if
available
532cedbfd is described below
commit 532cedbfd21f26a6de4c70bd6811d6da17f72ccc
Author: mingji <[email protected]>
AuthorDate: Thu Jul 10 15:55:29 2025 +0800
[CELEBORN-1844][FOLLOWUP] alway try to use memory storage if available
### What changes were proposed in this pull request?
Try to use memory storage first if it is available.
To increase performance, if a cluster is set to use MEMORY and HDFS.
### Why are the changes needed?
To keep the old behavior as in release 0.5, always try to use memory
storage first because the slots allocator won't allocate slots on memory
storage.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA.
Closes #3352 from FMX/b1844-2.
Authored-by: mingji <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../service/deploy/worker/storage/StoragePolicy.scala | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
index b5166d009..db6123e0a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala
@@ -185,8 +185,15 @@ class StoragePolicy(conf: CelebornConf, storageManager:
StorageManager, source:
if (evict) {
0
} else {
- order.get.indexOf(
-
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+ // keep the old behavior, always try to use memory if worker
+ // has configured to use memory storage, because slots allocator
+ // will not allocate slots on memory storage
+ if (order.contains(StorageInfo.Type.MEMORY.name())) {
+ order.get.indexOf(StorageInfo.Type.MEMORY.name())
+ } else {
+ order.get.indexOf(
+
partitionDataWriterContext.getPartitionLocation.getStorageInfo.getType.name())
+ }
}
val maxSize = order.get.length