This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 113311df3 [CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate
all slots to disk
113311df3 is described below
commit 113311df3e99ce58715aaf6feedb62dd96f1e8b3
Author: mingji <[email protected]>
AuthorDate: Tue Nov 28 11:26:00 2023 +0800
[CELEBORN-1081][FOLLOWUP] Remove UNKNOWN_DISK and allocate all slots to disk
### What changes were proposed in this pull request?
1. Remove UNKNOWN_DISK from StorageInfo.
2. Enable load-aware slots allocation when there is HDFS.
### Why are the changes needed?
To support the application's config about available storage types.
### Does this PR introduce _any_ user-facing change?
no.
### How was this patch tested?
GA and Cluster.
Closes #2098 from FMX/B1081-1.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/common/protocol/StorageInfo.java | 50 +++----
common/src/main/proto/TransportMessages.proto | 1 +
.../apache/celeborn/common/meta/DeviceInfo.scala | 21 ++-
.../apache/celeborn/common/meta/WorkerInfo.scala | 10 +-
.../apache/celeborn/common/util/PbSerDeUtils.scala | 8 +-
.../common/protocol/PartitionLocationSuiteJ.java | 8 +-
.../celeborn/common/meta/WorkerInfoSuite.scala | 6 +-
docs/developers/slotsallocation.md | 75 +++++++++++
.../service/deploy/master/SlotsAllocator.java | 148 ++++++++++++++++-----
.../deploy/master/clustermeta/MetaUtil.java | 5 +-
master/src/main/proto/Resource.proto | 1 +
.../celeborn/service/deploy/master/Master.scala | 2 +-
.../deploy/master/SlotsAllocatorSuiteJ.java | 121 ++++++++++++++---
.../service/deploy/worker/storage/FileWriter.java | 3 +-
.../celeborn/service/deploy/worker/Worker.scala | 2 +-
.../deploy/worker/storage/StorageManager.scala | 8 +-
16 files changed, 375 insertions(+), 94 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index d3e1bf95b..5b59a10ed 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -38,7 +38,6 @@ public class StorageInfo implements Serializable {
}
}
- @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();
@@ -57,7 +56,7 @@ public class StorageInfo implements Serializable {
// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
- private String mountPoint = UNKNOWN_DISK;
+ private String mountPoint = "";
// if a file is committed, field "finalResult" will be true
private boolean finalResult = false;
private String filePath;
@@ -72,27 +71,10 @@ public class StorageInfo implements Serializable {
this.filePath = filePath;
}
- public StorageInfo(String mountPoint, int availableStorageTypes) {
+ public StorageInfo(String mountPoint, StorageInfo.Type type, int
availableStorageTypes) {
this.mountPoint = mountPoint;
- this.availableStorageTypes = availableStorageTypes;
- }
-
- public StorageInfo(Type type, String mountPoint) {
- this.type = type;
- this.mountPoint = mountPoint;
- }
-
- public StorageInfo(Type type, String mountPoint, boolean finalResult) {
- this.type = type;
- this.mountPoint = mountPoint;
- this.finalResult = finalResult;
- }
-
- public StorageInfo(Type type, String mountPoint, boolean finalResult, String
filePath) {
this.type = type;
- this.mountPoint = mountPoint;
- this.finalResult = finalResult;
- this.filePath = filePath;
+ this.availableStorageTypes = availableStorageTypes;
}
public StorageInfo(
@@ -147,21 +129,41 @@ public class StorageInfo implements Serializable {
+ '}';
}
- public boolean localDiskAvailable() {
+ public static boolean localDiskAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & LOCAL_DISK_MASK) > 0;
}
- public boolean HDFSAvailable() {
+ public boolean localDiskAvailable() {
+ return StorageInfo.localDiskAvailable(availableStorageTypes);
+ }
+
+ public static boolean HDFSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & HDFS_MASK) > 0;
}
- public boolean OSSAvailable() {
+ public boolean HDFSAvailable() {
+ return StorageInfo.HDFSAvailable(availableStorageTypes);
+ }
+
+ public static boolean HDFSOnly(int availableStorageTypes) {
+ return availableStorageTypes == HDFS_MASK;
+ }
+
+ public boolean HDFSOnly() {
+ return StorageInfo.HDFSOnly(availableStorageTypes);
+ }
+
+ public static boolean OSSAvailable(int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & OSS_MASK) > 0;
}
+ public boolean OSSAvailable() {
+ return StorageInfo.OSSAvailable(availableStorageTypes);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index a595a785d..b33271e0e 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -137,6 +137,7 @@ message PbDiskInfo {
int64 usedSlots = 4;
int32 status = 5;
int64 avgFetchTime = 6;
+ int32 storageType = 7;
}
message PbWorkerInfo {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
index 56933a2cf..990a2b127 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala
@@ -49,6 +49,17 @@ class DiskInfo(
this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots,
List.empty, null)
}
+ def this(
+ mountPoint: String,
+ usableSpace: Long,
+ avgFlushTime: Long,
+ avgFetchTime: Long,
+ activeSlots: Long,
+ storageType: StorageInfo.Type) = {
+ this(mountPoint, usableSpace, avgFlushTime, avgFetchTime, activeSlots,
List.empty, null)
+ this.storageType = storageType
+ }
+
def this(
mountPoint: String,
dirs: List[File],
@@ -70,10 +81,14 @@ class DiskInfo(
var status: DiskStatus = DiskStatus.HEALTHY
var threadCount = 1
var configuredUsableSpace = 0L
- var storageType: StorageInfo.Type = _
+ var storageType: StorageInfo.Type = StorageInfo.Type.SSD
var maxSlots: Long = 0
lazy val shuffleAllocations = new util.HashMap[String, Integer]()
+ def setStorageType(storageType: StorageInfo.Type) = {
+ this.storageType = storageType
+ }
+
def setStatus(status: DiskStatus): this.type = this.synchronized {
this.status = status
this
@@ -145,9 +160,11 @@ class DiskInfo(
s" usableSpace: ${Utils.bytesToString(actualUsableSpace)}," +
s" avgFlushTime: ${Utils.nanoDurationToString(avgFlushTime)}," +
s" avgFetchTime: ${Utils.nanoDurationToString(avgFetchTime)}," +
- s" activeSlots: $activeSlots)" +
+ s" activeSlots: $activeSlots," +
+ s" storageType: ${storageType})" +
s" status: $status" +
s" dirs ${dirs.mkString("\t")}"
+
}
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index 65dc22918..05ea35d63 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.internal.Logging
+import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.rpc.RpcEndpointRef
import org.apache.celeborn.common.rpc.netty.NettyRpcEndpointRef
@@ -156,12 +157,12 @@ class WorkerInfo(
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
- if (estimatedPartitionSize.nonEmpty) {
+ if (estimatedPartitionSize.nonEmpty && curDisk.storageType !=
StorageInfo.Type.HDFS) {
curDisk.maxSlots = curDisk.actualUsableSpace /
estimatedPartitionSize.get
}
curDisk.setStatus(newDisk.status)
} else {
- if (estimatedPartitionSize.nonEmpty) {
+ if (estimatedPartitionSize.nonEmpty && newDisk.storageType !=
StorageInfo.Type.HDFS) {
newDisk.maxSlots = newDisk.actualUsableSpace /
estimatedPartitionSize.get
}
diskInfos.put(mountPoint, newDisk)
@@ -239,6 +240,11 @@ class WorkerInfo(
result = 31 * result + replicatePort.hashCode()
result
}
+
+ def haveDisk(): Boolean = {
+ diskInfos.values().asScala.exists(p =>
+ p.storageType == StorageInfo.Type.SSD || p.storageType ==
StorageInfo.Type.HDD)
+ }
}
object WorkerInfo {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
index 2748d6cf3..d681152f7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/util/PbSerDeUtils.scala
@@ -63,14 +63,17 @@ object PbSerDeUtils {
.setMinor(minor)
.build.toByteArray
- def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo =
- new DiskInfo(
+ def fromPbDiskInfo(pbDiskInfo: PbDiskInfo): DiskInfo = {
+ val diskInfo = new DiskInfo(
pbDiskInfo.getMountPoint,
pbDiskInfo.getUsableSpace,
pbDiskInfo.getAvgFlushTime,
pbDiskInfo.getAvgFetchTime,
pbDiskInfo.getUsedSlots)
.setStatus(Utils.toDiskStatus(pbDiskInfo.getStatus))
+
diskInfo.setStorageType(StorageInfo.typesMap.get(pbDiskInfo.getStorageType))
+ diskInfo
+ }
def toPbDiskInfo(diskInfo: DiskInfo): PbDiskInfo =
PbDiskInfo.newBuilder
@@ -80,6 +83,7 @@ object PbSerDeUtils {
.setAvgFetchTime(diskInfo.avgFetchTime)
.setUsedSlots(diskInfo.activeSlots)
.setStatus(diskInfo.status.getValue)
+ .setStorageType(diskInfo.storageType.getValue)
.build
def fromPbFileInfo(pbFileInfo: PbFileInfo): FileInfo =
diff --git
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
index 927b3e9c1..e4200c913 100644
---
a/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
+++
b/common/src/test/java/org/apache/celeborn/common/protocol/PartitionLocationSuiteJ.java
@@ -178,7 +178,9 @@ public class PartitionLocationSuiteJ {
PartitionLocation location2 =
new PartitionLocation(
partitionId, epoch, host, rpcPort, pushPort, fetchPort,
replicatePort, mode, peer);
- StorageInfo storageInfo = new StorageInfo(StorageInfo.Type.MEMORY,
"/mnt/disk/0");
+ StorageInfo storageInfo =
+ new StorageInfo(
+ "/mnt/disk/0", StorageInfo.Type.MEMORY,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
RoaringBitmap bitmap = new RoaringBitmap();
bitmap.add(1);
bitmap.add(2);
@@ -205,7 +207,7 @@ public class PartitionLocationSuiteJ {
+ "
host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ " peer:(empty)\n"
- + " storage hint:StorageInfo{type=MEMORY,
mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ + " storage hint:StorageInfo{type=MEMORY, mountPoint='',
finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp2 =
"PartitionLocation[\n"
@@ -213,7 +215,7 @@ public class PartitionLocationSuiteJ {
+ "
host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4\n"
+ " mode:PRIMARY\n"
+ "
peer:(host-rpcPort-pushPort-fetchPort-replicatePort:localhost-3-1-2-4)\n"
- + " storage hint:StorageInfo{type=MEMORY,
mountPoint='UNKNOWN_DISK', finalResult=false, filePath=null}\n"
+ + " storage hint:StorageInfo{type=MEMORY, mountPoint='',
finalResult=false, filePath=null}\n"
+ " mapIdBitMap:{}]";
String exp3 =
"PartitionLocation[\n"
diff --git
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
index b42862924..ebd83682e 100644
---
a/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/meta/WorkerInfoSuite.scala
@@ -284,9 +284,9 @@ class WorkerInfoSuite extends CelebornFunSuite {
|SlotsUsed: 60
|LastHeartbeat: 0
|Disks: $placeholder
- | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB,
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30) status: HEALTHY dirs
$placeholder
- | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB,
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10) status: HEALTHY dirs
$placeholder
- | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB,
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20) status: HEALTHY dirs
$placeholder
+ | DiskInfo0: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk3, usableSpace: 2048.0 MiB,
avgFlushTime: 3 ns, avgFetchTime: 3 ns, activeSlots: 30, storageType: SSD)
status: HEALTHY dirs $placeholder
+ | DiskInfo1: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk1, usableSpace: 2048.0 MiB,
avgFlushTime: 1 ns, avgFetchTime: 1 ns, activeSlots: 10, storageType: SSD)
status: HEALTHY dirs $placeholder
+ | DiskInfo2: DiskInfo(maxSlots: 0, committed shuffles 0
shuffleAllocations: Map(), mountPoint: disk2, usableSpace: 2048.0 MiB,
avgFlushTime: 2 ns, avgFetchTime: 2 ns, activeSlots: 20, storageType: SSD)
status: HEALTHY dirs $placeholder
|UserResourceConsumption: $placeholder
| UserIdentifier: `tenant1`.`name1`, ResourceConsumption:
ResourceConsumption(diskBytesWritten: 20.0 MiB, diskFileCount: 1,
hdfsBytesWritten: 50.0 MiB, hdfsFileCount: 1)
|WorkerRef: null
diff --git a/docs/developers/slotsallocation.md
b/docs/developers/slotsallocation.md
index e69de29bb..78644b16c 100644
--- a/docs/developers/slotsallocation.md
+++ b/docs/developers/slotsallocation.md
@@ -0,0 +1,75 @@
+---
+license: |
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+
+# Slots allocation
+
+This article describes the detailed design of Celeborn workers' slots
allocation.
+Slots allocation is the core components about how Celeborn distribute workload
amount workers.
+We have achieved two approaches of slots allocation.
+
+## Principle
+Allocate slots to local disks unless explicit assigned to HDFS.
+
+## LoadAware
+### Related configs
+```properties
+celeborn.master.slot.assign.policy LOADAWARE
+celeborn.master.slot.assign.loadAware.numDiskGroups 5
+celeborn.master.slot.assign.loadAware.diskGroupGradient 0.1
+celeborn.master.slot.assign.loadAware.flushTimeWeight 0
+celeborn.master.slot.assign.loadAware.fetchTimeWeight 0
+[spark.client.]celeborn.storage.availableTypes HDD,SSD
+```
+### Detail
+Load-aware slots allocation will take following elements into consideration.
+- disk's fetch time
+- disk's flush time
+- disk's usable space
+- disk's used slot
+
+Slots allocator will find out all worker involved in this allocation and sort
their disks by
+`disk's average flushtime * flush time weight + disk's average fetch time *
fetch time weight`.
+After getting the sorted disks list, Celeborn will split the disks into
+`celeborn.master.slot.assign.loadAware.numDiskGroups` groups. The slots number
to be placed into a disk group
+is controlled by the `celeborn.master.slot.assign.loadAware.diskGroupGradient`
which means that a group's
+allocated slots number will be
(1+`celeborn.master.slot.assign.loadAware.diskGroupGradient`)
+times to the group's slower than it.
+For example, there is 5 groups, G1 , G2, G3, G4 and G5. If the G5 is allocated
100 slots.
+Other groups will be G4:110, G3:121, G2:133, G1:146.
+
+After Celeborn has decided the slots number of a disk group, slots will be
distributed in disks of a disk group.
+Each disk has a usableSlots which is calculated by `(disk's usable
space)/(average partition size)-usedSlots`.
+The slots number to allocate in a disk is calculated by ` slots of this disk
group * ( current disk's usableSlots / the sum of all disks' usableSlots in
this group)`.
+For example, G5 need to allocate 100 slots and have 3 disks D1 with usable
slots 100, D2 with usable slots 50, D3 with usable slots 20.
+The distribution will be D1:59, D2: 29, D3: 12.
+
+If all slots can be place in disk groups, the slots allocation process is
done.
+
+requested slots are more than all usable slots, slots can not be placed into
disks.
+Worker will need to allocate these slots to workers with local disks one by
one.
+
+## RoundRobin
+### Detail
+Roundrobin slots allocation will distribute all slots into all registered
workers with disks. Celeborn will treat
+all workers as an array and place 1 slots in a worker until all slots are
allocated.
+If a worker has multiple disks, the chosen disk index is `(monotone increasing
disk index +1) % disk count`.
+
+## Celeborn Worker's Behavior
+1. When reserve slots Celeborn worker will decide a slot be placed in local
disks or HDFS when reserve slots.
+2. If a partition is evicted from memory, the partition might be placed in
HDFS.
+3. If a slot is explicitly assigned to HDFS, worker will put the slot in HDFS.
\ No newline at end of file
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index e964e57e5..e54ba8763 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -18,6 +18,7 @@
package org.apache.celeborn.service.deploy.master;
import java.util.*;
+import java.util.stream.Collectors;
import scala.Double;
import scala.Option;
@@ -64,22 +65,30 @@ public class SlotsAllocator {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
- Map<WorkerInfo, List<UsableDiskInfo>> restrictions = new HashMap<>();
+ Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
- restrictions.computeIfAbsent(worker, v -> new ArrayList<>());
+ slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
for (Map.Entry<String, DiskInfo> diskInfoEntry :
worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
- usableDisks.add(
- new UsableDiskInfo(
- diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ if (StorageInfo.localDiskAvailable(availableStorageTypes)
+ && diskInfoEntry.getValue().storageType() !=
StorageInfo.Type.HDFS) {
+ usableDisks.add(
+ new UsableDiskInfo(
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ } else if (StorageInfo.HDFSAvailable(availableStorageTypes)
+ && diskInfoEntry.getValue().storageType() ==
StorageInfo.Type.HDFS) {
+ usableDisks.add(
+ new UsableDiskInfo(
+ diskInfoEntry.getValue(),
diskInfoEntry.getValue().availableSlots()));
+ }
}
}
}
return locateSlots(
partitionIds,
workers,
- restrictions,
+ slotsRestrictions,
shouldReplicate,
shouldRackAware,
availableStorageTypes);
@@ -109,6 +118,10 @@ public class SlotsAllocator {
if (workers.size() < 2 && shouldReplicate) {
return new HashMap<>();
}
+ if (StorageInfo.HDFSOnly(availableStorageTypes)) {
+ return offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
+ }
List<DiskInfo> usableDisks = new ArrayList<>();
Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
@@ -126,7 +139,8 @@ public class SlotsAllocator {
diskReserveRatio.isEmpty()
? Option.empty()
: Option.apply(diskReserveRatio.get()))
- && diskInfo.status().equals(DiskStatus.HEALTHY)) {
+ && diskInfo.status().equals(DiskStatus.HEALTHY)
+ && diskInfo.storageType() != StorageInfo.Type.HDFS) {
usableDisks.add(diskInfo);
}
}));
@@ -151,15 +165,15 @@ public class SlotsAllocator {
initLoadAwareAlgorithm(diskGroupCount, diskGroupGradient);
}
- Map<WorkerInfo, List<UsableDiskInfo>> restrictions =
- getRestriction(
+ Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions =
+ getSlotsRestrictionsByLoadAwareAlgorithm(
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight,
fetchTimeWeight),
diskToWorkerMap,
shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
return locateSlots(
partitionIds,
workers,
- restrictions,
+ slotsRestrictions,
shouldReplicate,
shouldRackAware,
availableStorageTypes);
@@ -172,16 +186,43 @@ public class SlotsAllocator {
Map<WorkerInfo, Integer> workerDiskIndex,
int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
- List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+ StorageInfo storageInfo;
int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
- while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
- diskIndex = (diskIndex + 1) % usableDiskInfos.size();
+ if (restrictions != null) {
+ List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
+ while (usableDiskInfos.get(diskIndex).usableSlots <= 0) {
+ diskIndex = (diskIndex + 1) % usableDiskInfos.size();
+ }
+ usableDiskInfos.get(diskIndex).usableSlots--;
+ DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
+ if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
+ storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
+ } else {
+ storageInfo =
+ new StorageInfo(
+ selectedDiskInfo.mountPoint(),
+ selectedDiskInfo.storageType(),
+ availableStorageTypes);
+ workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
usableDiskInfos.size());
+ }
+ } else {
+ if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+ DiskInfo[] diskInfos =
+ selectedWorker.diskInfos().values().stream()
+ .filter(p -> p.storageType() != StorageInfo.Type.HDFS)
+ .collect(Collectors.toList())
+ .toArray(new DiskInfo[0]);
+ storageInfo =
+ new StorageInfo(
+ diskInfos[diskIndex].mountPoint(),
+ diskInfos[diskIndex].storageType(),
+ availableStorageTypes);
+ diskIndex = (diskIndex + 1) % diskInfos.length;
+ workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
diskInfos.length);
+ } else {
+ storageInfo = new StorageInfo("", StorageInfo.Type.HDFS,
availableStorageTypes);
+ }
}
- usableDiskInfos.get(diskIndex).usableSlots--;
- StorageInfo storageInfo =
- new StorageInfo(
- usableDiskInfos.get(diskIndex).diskInfo.mountPoint(),
availableStorageTypes);
- workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
usableDiskInfos.size());
return storageInfo;
}
@@ -195,10 +236,10 @@ public class SlotsAllocator {
locateSlots(
List<Integer> partitionIds,
List<WorkerInfo> workers,
- Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+ Map<WorkerInfo, List<UsableDiskInfo>> slotRestrictions,
boolean shouldReplicate,
boolean shouldRackAware,
- int activeStorageTypes) {
+ int availableStorageTypes) {
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
new HashMap<>();
@@ -206,18 +247,24 @@ public class SlotsAllocator {
roundRobin(
slots,
partitionIds,
- new LinkedList<>(restrictions.keySet()),
- restrictions,
+ new LinkedList<>(slotRestrictions.keySet()),
+ slotRestrictions,
shouldReplicate,
shouldRackAware,
- activeStorageTypes);
+ availableStorageTypes);
if (!remain.isEmpty()) {
remain =
roundRobin(
- slots, remain, workers, null, shouldReplicate, shouldRackAware,
activeStorageTypes);
+ slots,
+ remain,
+ workers,
+ null,
+ shouldReplicate,
+ shouldRackAware,
+ availableStorageTypes);
}
if (!remain.isEmpty()) {
- roundRobin(slots, remain, workers, null, shouldReplicate, false,
activeStorageTypes);
+ roundRobin(slots, remain, workers, null, shouldReplicate, false,
availableStorageTypes);
}
return slots;
}
@@ -226,7 +273,7 @@ public class SlotsAllocator {
Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>> slots,
List<Integer> partitionIds,
List<WorkerInfo> workers,
- Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
+ Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {
@@ -241,9 +288,10 @@ public class SlotsAllocator {
int nextPrimaryInd = primaryIndex;
int partitionId = iter.next();
- StorageInfo storageInfo = new StorageInfo();
- if (restrictions != null) {
- while (!haveUsableSlots(restrictions, workers, nextPrimaryInd)) {
+ StorageInfo storageInfo;
+ if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
+ // this means that we'll select a mount point
+ while (!haveUsableSlots(slotsRestrictions, workers, nextPrimaryInd)) {
nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
if (nextPrimaryInd == primaryIndex) {
break outer;
@@ -253,17 +301,29 @@ public class SlotsAllocator {
getStorageInfo(
workers,
nextPrimaryInd,
- restrictions,
+ slotsRestrictions,
workerDiskIndexForPrimary,
availableStorageTypes);
+ } else {
+ if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+ while (!workers.get(nextPrimaryInd).haveDisk()) {
+ nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
+ if (nextPrimaryInd == primaryIndex) {
+ break outer;
+ }
+ }
+ }
+ storageInfo =
+ getStorageInfo(
+ workers, nextPrimaryInd, null, workerDiskIndexForPrimary,
availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null,
storageInfo, true);
if (shouldReplicate) {
int nextReplicaInd = (nextPrimaryInd + 1) % workers.size();
- if (restrictions != null) {
- while (!haveUsableSlots(restrictions, workers, nextReplicaInd)
+ if (slotsRestrictions != null) {
+ while (!haveUsableSlots(slotsRestrictions, workers, nextReplicaInd)
|| !satisfyRackAware(shouldRackAware, workers, nextPrimaryInd,
nextReplicaInd)) {
nextReplicaInd = (nextReplicaInd + 1) % workers.size();
if (nextReplicaInd == nextPrimaryInd) {
@@ -274,7 +334,7 @@ public class SlotsAllocator {
getStorageInfo(
workers,
nextReplicaInd,
- restrictions,
+ slotsRestrictions,
workerDiskIndexForReplica,
availableStorageTypes);
} else if (shouldRackAware) {
@@ -284,6 +344,18 @@ public class SlotsAllocator {
break outer;
}
}
+ } else {
+ if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
+ while (!workers.get(nextPrimaryInd).haveDisk()) {
+ nextPrimaryInd = (nextPrimaryInd + 1) % workers.size();
+ if (nextPrimaryInd == primaryIndex) {
+ break outer;
+ }
+ }
+ }
+ storageInfo =
+ getStorageInfo(
+ workers, nextReplicaInd, null, workerDiskIndexForReplica,
availableStorageTypes);
}
PartitionLocation replicaPartition =
createLocation(
@@ -369,7 +441,11 @@ public class SlotsAllocator {
return diskGroups;
}
- private static Map<WorkerInfo, List<UsableDiskInfo>> getRestriction(
+ /**
+ * This method implement the load aware slots allocation algorithm. See
details at
+ * /docs/developers/slotsallocation.md
+ */
+ private static Map<WorkerInfo, List<UsableDiskInfo>>
getSlotsRestrictionsByLoadAwareAlgorithm(
List<List<DiskInfo>> groups, Map<DiskInfo, WorkerInfo> diskWorkerMap,
int partitionCnt) {
int groupSize = groups.size();
long[] groupAllocations = new long[groupSize];
@@ -505,8 +581,8 @@ public class SlotsAllocator {
jointLocations.addAll(slots.get(worker)._2);
for (PartitionLocation location : jointLocations) {
String mountPoint = location.getStorageInfo().getMountPoint();
- // ignore slots for UNKNOWN_DISK
- if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
+ // skip non local disks slots
+ if (!mountPoint.isEmpty()) {
if (slotsPerDisk.containsKey(mountPoint)) {
slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
} else {
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
index ec3094fc2..03e1d21e3 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/MetaUtil.java
@@ -24,6 +24,7 @@ import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.identity.UserIdentifier$;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.common.quota.ResourceConsumption;
import org.apache.celeborn.common.util.Utils;
@@ -61,7 +62,8 @@ public class MetaUtil {
v.getUsableSpace(),
v.getAvgFlushTime(),
v.getAvgFetchTime(),
- v.getUsedSlots());
+ v.getUsedSlots(),
+ StorageInfo.typesMap.get(v.getStorageType()));
diskInfo.setStatus(Utils.toDiskStatus(v.getStatus()));
map.put(k, diskInfo);
});
@@ -81,6 +83,7 @@ public class MetaUtil {
.setAvgFlushTime(v.avgFlushTime())
.setAvgFetchTime(v.avgFetchTime())
.setUsedSlots(v.activeSlots())
+ .setStorageType(v.storageType().getValue())
.setStatus(v.status().getValue())
.build()));
return map;
diff --git a/master/src/main/proto/Resource.proto
b/master/src/main/proto/Resource.proto
index c7cde94dc..78dc477bd 100644
--- a/master/src/main/proto/Resource.proto
+++ b/master/src/main/proto/Resource.proto
@@ -66,6 +66,7 @@ message DiskInfo {
required int64 usedSlots = 4;
required int32 status = 5;
required int64 avgFetchTime = 6;
+ required int32 storageType =7;
}
message RequestSlotsRequest {
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index 1c03903a7..e17bb46bf 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -687,7 +687,7 @@ private[celeborn] class Master(
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME,
s"offerSlots-${Random.nextInt()}") {
statusSystem.workers.synchronized {
- if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE &&
!hasHDFSStorage) {
+ if (slotsAssignPolicy == SlotsAssignPolicy.LOADAWARE) {
SlotsAllocator.offerSlotsLoadAware(
selectedWorkers,
requestSlots.partitionIdList,
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index 92f2ecfbd..fd724cd70 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -17,15 +17,7 @@
package org.apache.celeborn.service.deploy.master;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
import scala.Option;
import scala.Tuple2;
@@ -294,16 +286,34 @@ public class SlotsAllocatorSuiteJ {
List<WorkerInfo> workers,
List<Integer> partitionIds,
boolean shouldReplicate,
- boolean expectSuccess) {
+ boolean expectSuccess,
+ boolean roundrobin) {
String shuffleKey = "appId-1";
CelebornConf conf = new CelebornConf();
conf.set("celeborn.active.storage.levels", "HDFS");
- Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- SlotsAllocator.offerSlotsRoundRobin(
- workers, partitionIds, shouldReplicate, false,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
-
+ Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots;
+ if (roundrobin) {
+ slots =
+ SlotsAllocator.offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, false,
StorageInfo.HDFS_MASK);
+ } else {
+ slots =
+ SlotsAllocator.offerSlotsLoadAware(
+ workers,
+ partitionIds,
+ shouldReplicate,
+ false,
+ 1000_000_000,
+ Option.empty(),
+ 3,
+ 0.1,
+ 0,
+ 1,
+ StorageInfo.LOCAL_DISK_MASK | StorageInfo.HDFS_MASK);
+ }
int allocatedPartitionCount = 0;
-
+ Map<WorkerInfo, Map<String, Integer>> slotsDistribution =
+ SlotsAllocator.slotsToDiskAllocations(slots);
for (Map.Entry<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>>
workerToPartitions : slots.entrySet()) {
WorkerInfo workerInfo = workerToPartitions.getKey();
@@ -332,7 +342,86 @@ public class SlotsAllocatorSuiteJ {
partitionIds.add(i);
}
final boolean shouldReplicate = true;
- checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true);
+ checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
+ }
+
+ @Test
+ public void testLocalDisksAndHDFSOnRoundRobin() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ DiskInfo hdfs1 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ DiskInfo hdfs2 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ DiskInfo hdfs3 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+ hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+ hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+ workers.get(0).diskInfos().put("HDFS", hdfs1);
+ workers.get(1).diskInfos().put("HDFS", hdfs2);
+ workers.get(2).diskInfos().put("HDFS", hdfs3);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+ checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, true);
+ }
+
+ @Test
+ public void testLocalDisksAndHDFSOnLoadAware() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ DiskInfo hdfs1 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ DiskInfo hdfs2 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ // DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999,
999999, Integer.MAX_VALUE,
+ // StorageInfo.Type.HDFS);
+ hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+ hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+ // hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+ workers.get(0).diskInfos().put("HDFS", hdfs1);
+ workers.get(1).diskInfos().put("HDFS", hdfs2);
+ // workers.get(2).diskInfos().put("HDFS", hdfs3);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+ checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
+ }
+
+ @Test
+ public void testLocalDisksAndHDFSOnLoadAwareWithInsufficientSlots() {
+ final List<WorkerInfo> workers = prepareWorkers(true);
+ DiskInfo hdfs1 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ DiskInfo hdfs2 =
+ new DiskInfo(
+ "HDFS", Long.MAX_VALUE, 999999, 999999, Integer.MAX_VALUE,
StorageInfo.Type.HDFS);
+ // DiskInfo hdfs3 = new DiskInfo("HDFS", Long.MAX_VALUE, 999999,
999999, Integer.MAX_VALUE,
+ // StorageInfo.Type.HDFS);
+ hdfs1.maxSlots_$eq(Long.MAX_VALUE);
+ hdfs2.maxSlots_$eq(Long.MAX_VALUE);
+ // hdfs3.maxSlots_$eq(Long.MAX_VALUE);
+ workers.get(0).diskInfos().put("HDFS", hdfs1);
+ workers.get(1).diskInfos().put("HDFS", hdfs2);
+ for (Map.Entry<String, DiskInfo> diskEntry :
workers.get(2).diskInfos().entrySet()) {
+ diskEntry.getValue().maxSlots_$eq(100);
+ }
+ // workers.get(2).diskInfos().put("HDFS", hdfs3);
+ final List<Integer> partitionIds = new ArrayList<>();
+ for (int i = 0; i < 3000; i++) {
+ partitionIds.add(i);
+ }
+ final boolean shouldReplicate = true;
+ checkSlotsOnHDFS(workers, partitionIds, shouldReplicate, true, false);
}
@Test
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 8ec6497b4..ca38a49c4 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -234,7 +234,8 @@ public abstract class FileWriter implements DeviceObserver {
public StorageInfo getStorageInfo() {
if (flusher instanceof LocalFlusher) {
LocalFlusher localFlusher = (LocalFlusher) flusher;
- return new StorageInfo(localFlusher.diskType(),
localFlusher.mountPoint(), true);
+ // do not write file path to reduce rpc size
+ return new StorageInfo(localFlusher.diskType(), true, "");
} else {
if (deleted) {
return null;
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 80e207309..d8c7049e5 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -343,7 +343,7 @@ private[celeborn] class Worker(
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map {
disk =>
disk.mountPoint -> disk
- }.toMap.asJava).values().asScala.toSeq
+ }.toMap.asJava).values().asScala.toSeq ++ storageManager.hdfsDiskInfo
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
host,
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index b1239b497..f3c4e99e0 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -40,7 +40,7 @@ import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, DiskStatus,
FileInfo, TimeWindow}
import org.apache.celeborn.common.metrics.source.AbstractSource
import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf}
-import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType}
+import org.apache.celeborn.common.protocol.{PartitionLocation,
PartitionSplitMode, PartitionType, StorageInfo}
import org.apache.celeborn.common.quota.ResourceConsumption
import org.apache.celeborn.common.util.{CelebornExitKind, CelebornHadoopUtils,
JavaUtils, PbSerDeUtils, ThreadUtils, Utils}
import org.apache.celeborn.service.deploy.worker._
@@ -72,6 +72,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
DeviceInfo.getDeviceAndDiskInfos(workingDirInfos, conf)
}
val mountPoints = new util.HashSet[String](diskInfos.keySet())
+ val hdfsDiskInfo =
+ if (conf.hasHDFSStorage)
+ Option(new DiskInfo("HDFS", Long.MaxValue, 999999, 999999, 0,
StorageInfo.Type.HDFS))
+ else None
def disksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
@@ -362,7 +366,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
}
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
- if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
+ if ((dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) ||
location.getStorageInfo.HDFSOnly()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
val fileInfo =