This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5e77b851c [CELEBORN-1081] Client support
`celeborn.storage.activeTypes` config
5e77b851c is described below
commit 5e77b851c90264a0fa09b6e1deecdc24c33ece6b
Author: mingji <[email protected]>
AuthorDate: Fri Nov 3 20:03:11 2023 +0800
[CELEBORN-1081] Client support `celeborn.storage.activeTypes` config
### What changes were proposed in this pull request?
1.To support `celeborn.storage.activeTypes` in Client.
2.Master will ignore slots for "UNKNOWN_DISK".
### Why are the changes needed?
Enable client application to select storage types to use.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
GA and cluster.
Closes #2045 from FMX/B1081.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../apache/celeborn/client/LifecycleManager.scala | 4 +-
.../common/protocol/PartitionLocation.java | 1 -
.../celeborn/common/protocol/StorageInfo.java | 94 ++++++++++++++++++++--
common/src/main/proto/TransportMessages.proto | 5 +-
.../org/apache/celeborn/common/CelebornConf.scala | 16 ++--
.../common/protocol/message/ControlMessages.scala | 4 +
.../apache/celeborn/common/CelebornConfSuite.scala | 22 ++++-
.../celeborn/common/util/PbSerDeUtilsTest.scala | 40 ++++++++-
docs/configuration/client.md | 1 +
docs/configuration/master.md | 2 +-
docs/configuration/worker.md | 2 +-
docs/migration.md | 2 +
.../service/deploy/master/SlotsAllocator.java | 72 ++++++++++++-----
.../celeborn/service/deploy/master/Master.scala | 8 +-
.../master/SlotsAllocatorRackAwareSuiteJ.java | 7 +-
.../deploy/master/SlotsAllocatorSuiteJ.java | 12 ++-
.../deploy/worker/storage/StorageManager.scala | 6 +-
17 files changed, 245 insertions(+), 53 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index ebc892815..4c30b92a5 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -75,6 +75,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
val latestPartitionLocation =
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Int,
PartitionLocation]]()
private val userIdentifier: UserIdentifier =
IdentityProvider.instantiate(conf).provide()
+ private val availableStorageTypes = conf.availableStorageTypes
@VisibleForTesting
def workerSnapshots(shuffleId: Int): util.Map[WorkerInfo,
ShufflePartitionLocationInfo] =
@@ -1025,7 +1026,8 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
pushReplicateEnabled,
pushRackAwareEnabled,
userIdentifier,
- slotsAssignMaxWorkers)
+ slotsAssignMaxWorkers,
+ availableStorageTypes)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestMasterRequestSlots(req)
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
index 08e97c0ff..8e76f3488 100644
---
a/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
+++
b/common/src/main/java/org/apache/celeborn/common/protocol/PartitionLocation.java
@@ -64,7 +64,6 @@ public class PartitionLocation implements Serializable {
private StorageInfo storageInfo;
private RoaringBitmap mapIdBitMap;
private transient String _hostPushPort;
-
private transient String _hostFetchPort;
public PartitionLocation(PartitionLocation loc) {
diff --git
a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
index b7bc0e878..d3e1bf95b 100644
--- a/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
+++ b/common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java
@@ -38,7 +38,7 @@ public class StorageInfo implements Serializable {
}
}
- public static String UNKNOWN_DISK = "UNKNOWN_DISK";
+ @Deprecated public static String UNKNOWN_DISK = "UNKNOWN_DISK";
public static Map<Integer, Type> typesMap = new HashMap<>();
public static Set<String> typeNames = new HashSet<>();
@@ -49,6 +49,12 @@ public class StorageInfo implements Serializable {
}
}
+ public static final int MEMORY_MASK = 0b1;
+ public static final int LOCAL_DISK_MASK = 0b10;
+ public static final int HDFS_MASK = 0b100;
+ public static final int OSS_MASK = 0b1000;
+ public static final int ALL_TYPES_AVAILABLE_MASK = 0;
+
// Default storage Type is MEMORY.
private Type type = Type.MEMORY;
private String mountPoint = UNKNOWN_DISK;
@@ -56,6 +62,8 @@ public class StorageInfo implements Serializable {
private boolean finalResult = false;
private String filePath;
+ public int availableStorageTypes = 0;
+
public StorageInfo() {}
public StorageInfo(Type type, boolean isFinal, String filePath) {
@@ -64,8 +72,9 @@ public class StorageInfo implements Serializable {
this.filePath = filePath;
}
- public StorageInfo(String mountPoint) {
+ public StorageInfo(String mountPoint, int availableStorageTypes) {
this.mountPoint = mountPoint;
+ this.availableStorageTypes = availableStorageTypes;
}
public StorageInfo(Type type, String mountPoint) {
@@ -86,6 +95,19 @@ public class StorageInfo implements Serializable {
this.filePath = filePath;
}
+ public StorageInfo(
+ Type type,
+ String mountPoint,
+ boolean finalResult,
+ String filePath,
+ int availableStorageTypes) {
+ this.type = type;
+ this.mountPoint = mountPoint;
+ this.finalResult = finalResult;
+ this.filePath = filePath;
+ this.availableStorageTypes = availableStorageTypes;
+ }
+
public boolean isFinalResult() {
return finalResult;
}
@@ -125,13 +147,50 @@ public class StorageInfo implements Serializable {
+ '}';
}
+ public boolean localDiskAvailable() {
+ return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+ || (availableStorageTypes & LOCAL_DISK_MASK) > 0;
+ }
+
+ public boolean HDFSAvailable() {
+ return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+ || (availableStorageTypes & HDFS_MASK) > 0;
+ }
+
+ public boolean OSSAvailable() {
+ return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
+ || (availableStorageTypes & OSS_MASK) > 0;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ StorageInfo that = (StorageInfo) o;
+ return finalResult == that.finalResult
+ && availableStorageTypes == that.availableStorageTypes
+ && type == that.type
+ && Objects.equals(mountPoint, that.mountPoint)
+ && Objects.equals(filePath, that.filePath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, mountPoint, finalResult, filePath,
availableStorageTypes);
+ }
+
+ public static final boolean validate(String typeStr) {
+ return typeNames.contains(typeStr);
+ }
+
public static PbStorageInfo toPb(StorageInfo storageInfo) {
String filePath = storageInfo.getFilePath();
PbStorageInfo.Builder builder = PbStorageInfo.newBuilder();
builder
.setType(storageInfo.type.value)
.setFinalResult(storageInfo.finalResult)
- .setMountPoint(storageInfo.mountPoint);
+ .setMountPoint(storageInfo.mountPoint)
+ .setAvailableStorageTypes(storageInfo.availableStorageTypes);
if (filePath != null) {
builder.setFilePath(filePath);
}
@@ -143,10 +202,29 @@ public class StorageInfo implements Serializable {
typesMap.get(pbStorageInfo.getType()),
pbStorageInfo.getMountPoint(),
pbStorageInfo.getFinalResult(),
- pbStorageInfo.getFilePath());
- }
-
- public static boolean validateStorageType(String str) {
- return typeNames.contains(str);
+ pbStorageInfo.getFilePath(),
+ pbStorageInfo.getAvailableStorageTypes());
+ }
+
+ public static int getAvailableTypes(List<Type> types) {
+ int ava = 0;
+ for (Type type : types) {
+ switch (type) {
+ case MEMORY:
+ ava = ava | MEMORY_MASK;
+ break;
+ case HDD:
+ case SSD:
+ ava = ava | LOCAL_DISK_MASK;
+ break;
+ case HDFS:
+ ava = ava | HDFS_MASK;
+ break;
+ case OSS:
+ ava = ava | OSS_MASK;
+ break;
+ }
+ }
+ return ava;
}
}
diff --git a/common/src/main/proto/TransportMessages.proto
b/common/src/main/proto/TransportMessages.proto
index 3e2e7a54c..e05615e5b 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -97,6 +97,7 @@ message PbStorageInfo {
string mountPoint = 2;
bool finalResult = 3;
string filePath = 4;
+ int32 availableStorageTypes = 5;
}
message PbPartitionLocation {
@@ -206,6 +207,7 @@ message PbRequestSlots {
PbUserIdentifier userIdentifier = 8;
bool shouldRackAware = 9;
int32 maxWorkers = 10;
+ int32 availableStorageTypes = 11;
}
message PbSlotInfo {
@@ -365,6 +367,7 @@ message PbReserveSlots {
PbUserIdentifier userIdentifier = 9;
int64 pushDataTimeout = 10;
bool partitionSplitEnabled = 11;
+ int32 availableStorageTypes = 12;
}
message PbReserveSlotsResponse {
@@ -494,7 +497,7 @@ message PbSnapshotMetaInfo {
int64 partitionTotalWritten = 8;
int64 partitionTotalFileCount = 9;
repeated PbAppDiskUsageSnapshot appDiskUsageMetricSnapshots = 10;
- optional PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
+ PbAppDiskUsageSnapshot currentAppDiskUsageMetricsSnapshot = 11;
map<string, int64> lostWorkers = 12;
repeated PbWorkerInfo shutdownWorkers = 13;
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 194c7e039..e024f6f1e 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -514,7 +514,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// //////////////////////////////////////////////////////
def masterSlotAssignPolicy: SlotsAssignPolicy =
SlotsAssignPolicy.valueOf(get(MASTER_SLOT_ASSIGN_POLICY))
-
+ def availableStorageTypes: Int = {
+ val types =
get(ACTIVE_STORAGE_TYPES).split(",").map(StorageInfo.Type.valueOf(_)).toList
+ StorageInfo.getAvailableTypes(types.asJava)
+ }
def hasHDFSStorage: Boolean =
get(ACTIVE_STORAGE_TYPES).contains(StorageInfo.Type.HDFS.name()) &&
get(HDFS_DIR).isDefined
def masterSlotAssignLoadAwareDiskGroupNum: Int =
get(MASTER_SLOT_ASSIGN_LOADAWARE_DISKGROUP_NUM)
@@ -3955,13 +3958,16 @@ object CelebornConf extends Logging {
.createWithDefaultString("32m")
val ACTIVE_STORAGE_TYPES: ConfigEntry[String] =
- buildConf("celeborn.storage.activeTypes")
- .categories("master", "worker")
+ buildConf("celeborn.storage.availableTypes")
+ .withAlternative("celeborn.storage.activeTypes")
+ .categories("master", "worker", "client")
.version("0.3.0")
- .doc("Enabled storage levels. Available options: HDD,SSD,HDFS. ")
+ .doc(
+ "Enabled storages. Available options: MEMORY,HDD,SSD,HDFS. Note: HDD
and SSD would be treated as identical.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
- .createWithDefault("HDD,SSD")
+ .checkValue(p => p.split(",").map(StorageInfo.validate(_)).reduce(_ &&
_), "")
+ .createWithDefault("HDD")
val READ_LOCAL_SHUFFLE_FILE: ConfigEntry[Boolean] =
buildConf("celeborn.client.readLocalShuffleFile.enabled")
diff --git
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
index 749170a26..578824cc1 100644
---
a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
+++
b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala
@@ -170,6 +170,7 @@ object ControlMessages extends Logging {
shouldRackAware: Boolean,
userIdentifier: UserIdentifier,
maxWorkers: Int,
+ availableStorageTypes: Int,
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage
@@ -519,6 +520,7 @@ object ControlMessages extends Logging {
shouldRackAware,
userIdentifier,
maxWorkers,
+ availableStorageTypes,
requestId) =>
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
@@ -529,6 +531,7 @@ object ControlMessages extends Logging {
.setShouldRackAware(shouldRackAware)
.setMaxWorkers(maxWorkers)
.setRequestId(requestId)
+ .setAvailableStorageTypes(availableStorageTypes)
.setUserIdentifier(PbSerDeUtils.toPbUserIdentifier(userIdentifier))
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)
@@ -896,6 +899,7 @@ object ControlMessages extends Logging {
pbRequestSlots.getShouldRackAware,
userIdentifier,
pbRequestSlots.getMaxWorkers,
+ pbRequestSlots.getAvailableStorageTypes,
pbRequestSlots.getRequestId)
case REQUEST_SLOTS_RESPONSE_VALUE =>
diff --git
a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
index f3e01c5ab..736a44619 100644
--- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
+++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala
@@ -19,6 +19,7 @@ package org.apache.celeborn.common
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf._
+import org.apache.celeborn.common.protocol.StorageInfo
import org.apache.celeborn.common.util.Utils
class CelebornConfSuite extends CelebornFunSuite {
@@ -209,11 +210,11 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)
- conf.set("celeborn.storage.activeTypes", "SDD,HDD,HDFS")
+ conf.set("celeborn.storage.activeTypes", "SSD,HDD,HDFS")
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerBaseDirs.isEmpty)
- conf.set("celeborn.storage.activeTypes", "SDD,HDD")
+ conf.set("celeborn.storage.activeTypes", "SSD,HDD")
assert(!conf.workerBaseDirs.isEmpty)
}
@@ -223,10 +224,25 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.hdfs.dir", "hdfs:///xxx")
assert(conf.workerCommitThreads === 128)
- conf.set("celeborn.storage.activeTypes", "SDD,HDD")
+ conf.set("celeborn.storage.activeTypes", "SSD,HDD")
assert(conf.workerCommitThreads === 32)
}
+ test("Test available storage types") {
+ val conf = new CelebornConf()
+
+ assert(conf.availableStorageTypes == StorageInfo.LOCAL_DISK_MASK)
+
+ conf.set("celeborn.storage.availableTypes", "HDD,MEMORY")
+ assert(conf.availableStorageTypes == Integer.parseInt("11", 2))
+
+ conf.set("celeborn.storage.availableTypes", "HDD,HDFS")
+ assert(conf.availableStorageTypes == (StorageInfo.HDFS_MASK |
StorageInfo.LOCAL_DISK_MASK))
+
+ conf.set("celeborn.storage.availableTypes", "HDFS")
+ assert(conf.availableStorageTypes == StorageInfo.HDFS_MASK)
+ }
+
test("Test role rpcDispatcherNumThreads") {
val availableCores = 5
val conf = new CelebornConf()
diff --git
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
index 903af02c9..64b6f7e86 100644
---
a/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
+++
b/common/src/test/scala/org/apache/celeborn/common/util/PbSerDeUtilsTest.scala
@@ -23,7 +23,7 @@ import java.util
import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.identity.UserIdentifier
import org.apache.celeborn.common.meta.{DeviceInfo, DiskInfo, FileInfo,
WorkerInfo}
-import org.apache.celeborn.common.protocol.PartitionLocation
+import org.apache.celeborn.common.protocol.{PartitionLocation, StorageInfo}
import
org.apache.celeborn.common.protocol.message.ControlMessages.WorkerResource
import org.apache.celeborn.common.quota.ResourceConsumption
@@ -51,8 +51,8 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val userIdentifier1 = UserIdentifier("tenant-a", "user-a")
val userIdentifier2 = UserIdentifier("tenant-b", "user-b")
- val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000, 2000, 3000)
- val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000, 4000, 6000)
+ val chunkOffsets1 = util.Arrays.asList[java.lang.Long](1000L, 2000L, 3000L)
+ val chunkOffsets2 = util.Arrays.asList[java.lang.Long](2000L, 4000L, 6000L)
val fileInfo1 = new FileInfo("/tmp/1", chunkOffsets1, userIdentifier1)
val fileInfo2 = new FileInfo("/tmp/2", chunkOffsets2, userIdentifier2)
@@ -77,6 +77,27 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
val partitionLocation2 =
new PartitionLocation(1, 1, "host2", 20, 19, 18, 24,
PartitionLocation.Mode.REPLICA)
+ val partitionLocation3 =
+ new PartitionLocation(2, 2, "host3", 30, 29, 28, 27,
PartitionLocation.Mode.PRIMARY)
+ val partitionLocation4 =
+ new PartitionLocation(
+ 3,
+ 3,
+ "host4",
+ 40,
+ 39,
+ 38,
+ 37,
+ PartitionLocation.Mode.REPLICA,
+ partitionLocation3,
+ new StorageInfo(
+ StorageInfo.Type.HDD,
+ "mountPoint",
+ false,
+ "filePath",
+ StorageInfo.LOCAL_DISK_MASK),
+ null)
+
val workerResource = new WorkerResource()
workerResource.put(
workerInfo1,
@@ -187,4 +208,17 @@ class PbSerDeUtilsTest extends CelebornFunSuite {
assert(restoredWorkerResource.equals(workerResource))
}
+
+ test("testPbStorageInfo") {
+ val pbPartitionLocation3 =
PbSerDeUtils.toPbPartitionLocation(partitionLocation3)
+ val pbPartitionLocation4 =
PbSerDeUtils.toPbPartitionLocation(partitionLocation4)
+
+ val restoredPartitionLocation3 =
PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation3)
+ val restoredPartitionLocation4 =
PbSerDeUtils.fromPbPartitionLocation(pbPartitionLocation4)
+
+ assert(restoredPartitionLocation3.equals(partitionLocation3))
+ assert(restoredPartitionLocation4.equals(partitionLocation4))
+
assert(restoredPartitionLocation4.getStorageInfo.equals(partitionLocation4.getStorageInfo))
+ }
+
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 1ff362660..f16e43193 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -104,5 +104,6 @@ license: |
| celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold |
2147483647 | Celeborn will only accept shuffle of partition number lower than
this configuration value. | 0.3.0 |
| celeborn.client.spark.shuffle.writer | HASH | Celeborn supports the
following kind of shuffle writers. 1. hash: hash-based shuffle writer works
fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer
works fine when memory pressure is high or shuffle partition count is huge. |
0.3.0 |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 |
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
diff --git a/docs/configuration/master.md b/docs/configuration/master.md
index 30741d85f..b83b4b20c 100644
--- a/docs/configuration/master.md
+++ b/docs/configuration/master.md
@@ -38,6 +38,6 @@ license: |
| celeborn.master.slot.assign.policy | ROUNDROBIN | Policy for master to
assign slots, Celeborn supports two types of policy: roundrobin and loadaware.
Loadaware policy will be ignored when `HDFS` is enabled in
`celeborn.storage.activeTypes` | 0.3.0 |
| celeborn.master.userResourceConsumption.update.interval | 30s | Time length
for a window about compute user resource consumption. | 0.3.0 |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | Worker
unavailable info would be cleared when the retention period is expired | 0.3.1
|
-| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available
options: HDD,SSD,HDFS. | 0.3.0 |
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
<!--end-include-->
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 177a25833..a356844cb 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -22,7 +22,7 @@ license: |
| celeborn.master.endpoints | <localhost>:9097 | Endpoints of master
nodes for celeborn client to connect, allowed pattern is:
`<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If
the port is omitted, 9097 will be used. | 0.2.0 |
| celeborn.master.estimatedPartitionSize.minSize | 8mb | Ignore partition size
smaller than this configuration of partition size for estimation. | 0.3.0 |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged
shuffle data. For example, if a reducer's shuffle data is 128M and the data
will need 16 fetch chunk requests to fetch. | 0.2.0 |
-| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available
options: HDD,SSD,HDFS. | 0.3.0 |
+| celeborn.storage.availableTypes | HDD | Enabled storages. Available options:
MEMORY,HDD,SSD,HDFS. Note: HDD and SSD would be treated as identical. | 0.3.0 |
| celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for
Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.activeConnection.max | <undefined> | If the number of
active connections on a worker exceeds this configuration value, the worker
will be marked as high-load in the heartbeat report, and the master will not
include that node in the response of RequestSlots. | 0.3.1 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for
read buffer per mount point. | 0.3.0 |
diff --git a/docs/migration.md b/docs/migration.md
index 62320b997..15937d3e9 100644
--- a/docs/migration.md
+++ b/docs/migration.md
@@ -42,6 +42,8 @@ license: |
- Since 0.4.0, Celeborn deprecate `celeborn.worker.storage.baseDir.prefix` and
`celeborn.worker.storage.baseDir.number`.
Please use `celeborn.worker.storage.dirs` instead.
+- Since 0.4.0, Celeborn deprecate `celeborn.storage.activeTypes`. Please use
`celeborn.storage.availableTypes` instead.
+
## Upgrading from 0.3.1 to 0.3.2
- Since 0.3.1, Celeborn changed the default value of
`raft.client.rpc.request.timeout` from `3s` to `10s`.
diff --git
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
index ce2e4db9b..acfc22610 100644
---
a/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
+++
b/master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
@@ -53,7 +53,8 @@ public class SlotsAllocator {
List<WorkerInfo> workers,
List<Integer> partitionIds,
boolean shouldReplicate,
- boolean shouldRackAware) {
+ boolean shouldRackAware,
+ int availableStorageTypes) {
if (partitionIds.isEmpty()) {
return new HashMap<>();
}
@@ -72,7 +73,13 @@ public class SlotsAllocator {
}
}
}
- return locateSlots(partitionIds, workers, restrictions, shouldReplicate,
shouldRackAware);
+ return locateSlots(
+ partitionIds,
+ workers,
+ restrictions,
+ shouldReplicate,
+ shouldRackAware,
+ availableStorageTypes);
}
/**
@@ -90,7 +97,8 @@ public class SlotsAllocator {
int diskGroupCount,
double diskGroupGradient,
double flushTimeWeight,
- double fetchTimeWeight) {
+ double fetchTimeWeight,
+ int availableStorageTypes) {
if (partitionIds.isEmpty()) {
return new HashMap<>();
}
@@ -123,7 +131,8 @@ public class SlotsAllocator {
logger.warn(
"offer slots for {} fallback to roundrobin because there is no
usable disks",
StringUtils.join(partitionIds, ','));
- return offerSlotsRoundRobin(workers, partitionIds, shouldReplicate,
shouldRackAware);
+ return offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, shouldRackAware,
availableStorageTypes);
}
if (!initialized) {
@@ -135,14 +144,21 @@ public class SlotsAllocator {
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight,
fetchTimeWeight),
diskToWorkerMap,
shouldReplicate ? partitionIds.size() * 2 : partitionIds.size());
- return locateSlots(partitionIds, workers, restrictions, shouldReplicate,
shouldRackAware);
+ return locateSlots(
+ partitionIds,
+ workers,
+ restrictions,
+ shouldReplicate,
+ shouldRackAware,
+ availableStorageTypes);
}
private static StorageInfo getStorageInfo(
List<WorkerInfo> workers,
int workerIndex,
Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
- Map<WorkerInfo, Integer> workerDiskIndex) {
+ Map<WorkerInfo, Integer> workerDiskIndex,
+ int availableStorageTypes) {
WorkerInfo selectedWorker = workers.get(workerIndex);
List<UsableDiskInfo> usableDiskInfos = restrictions.get(selectedWorker);
int diskIndex = workerDiskIndex.computeIfAbsent(selectedWorker, v -> 0);
@@ -150,7 +166,9 @@ public class SlotsAllocator {
diskIndex = (diskIndex + 1) % usableDiskInfos.size();
}
usableDiskInfos.get(diskIndex).usableSlots--;
- StorageInfo storageInfo = new
StorageInfo(usableDiskInfos.get(diskIndex).diskInfo.mountPoint());
+ StorageInfo storageInfo =
+ new StorageInfo(
+ usableDiskInfos.get(diskIndex).diskInfo.mountPoint(),
availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) %
usableDiskInfos.size());
return storageInfo;
}
@@ -167,7 +185,8 @@ public class SlotsAllocator {
List<WorkerInfo> workers,
Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
boolean shouldReplicate,
- boolean shouldRackAware) {
+ boolean shouldRackAware,
+ int activeStorageTypes) {
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
new HashMap<>();
@@ -178,12 +197,15 @@ public class SlotsAllocator {
new LinkedList<>(restrictions.keySet()),
restrictions,
shouldReplicate,
- shouldRackAware);
+ shouldRackAware,
+ activeStorageTypes);
if (!remain.isEmpty()) {
- remain = roundRobin(slots, remain, workers, null, shouldReplicate,
shouldRackAware);
+ remain =
+ roundRobin(
+ slots, remain, workers, null, shouldReplicate, shouldRackAware,
activeStorageTypes);
}
if (!remain.isEmpty()) {
- roundRobin(slots, remain, workers, null, shouldReplicate, false);
+ roundRobin(slots, remain, workers, null, shouldReplicate, false,
activeStorageTypes);
}
return slots;
}
@@ -194,7 +216,8 @@ public class SlotsAllocator {
List<WorkerInfo> workers,
Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
boolean shouldReplicate,
- boolean shouldRackAware) {
+ boolean shouldRackAware,
+ int availableStorageTypes) {
// workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
@@ -215,7 +238,12 @@ public class SlotsAllocator {
}
}
storageInfo =
- getStorageInfo(workers, nextPrimaryInd, restrictions,
workerDiskIndexForPrimary);
+ getStorageInfo(
+ workers,
+ nextPrimaryInd,
+ restrictions,
+ workerDiskIndexForPrimary,
+ availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null,
storageInfo, true);
@@ -231,7 +259,12 @@ public class SlotsAllocator {
}
}
storageInfo =
- getStorageInfo(workers, nextReplicaInd, restrictions,
workerDiskIndexForReplica);
+ getStorageInfo(
+ workers,
+ nextReplicaInd,
+ restrictions,
+ workerDiskIndexForReplica,
+ availableStorageTypes);
} else if (shouldRackAware) {
while (!satisfyRackAware(true, workers, nextPrimaryInd,
nextReplicaInd)) {
nextReplicaInd = (nextReplicaInd + 1) % workers.size();
@@ -460,10 +493,13 @@ public class SlotsAllocator {
jointLocations.addAll(slots.get(worker)._2);
for (PartitionLocation location : jointLocations) {
String mountPoint = location.getStorageInfo().getMountPoint();
- if (slotsPerDisk.containsKey(mountPoint)) {
- slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
- } else {
- slotsPerDisk.put(mountPoint, 1);
+ // ignore slots for UNKNOWN_DISK
+ if (!mountPoint.equals(StorageInfo.UNKNOWN_DISK)) {
+ if (slotsPerDisk.containsKey(mountPoint)) {
+ slotsPerDisk.put(mountPoint, slotsPerDisk.get(mountPoint) + 1);
+ } else {
+ slotsPerDisk.put(mountPoint, 1);
+ }
}
}
}
diff --git
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
index ead05b375..c1102418a 100644
---
a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
+++
b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala
@@ -357,7 +357,7 @@ private[celeborn] class Master(
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))
- case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _) =>
+ case requestSlots @ RequestSlots(_, _, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
executeWithLeaderChecker(context, handleRequestSlots(context,
requestSlots))
@@ -668,13 +668,15 @@ private[celeborn] class Master(
slotsAssignLoadAwareDiskGroupNum,
slotsAssignLoadAwareDiskGroupGradient,
loadAwareFlushTimeWeight,
- loadAwareFetchTimeWeight)
+ loadAwareFetchTimeWeight,
+ requestSlots.availableStorageTypes)
} else {
SlotsAllocator.offerSlotsRoundRobin(
selectedWorkers,
requestSlots.partitionIdList,
requestSlots.shouldReplicate,
- requestSlots.shouldRackAware)
+ requestSlots.shouldRackAware,
+ requestSlots.availableStorageTypes)
}
}
}
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
index 8ca6d3100..60707ceb5 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorRackAwareSuiteJ.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.protocol.StorageInfo;
import org.apache.celeborn.service.deploy.master.network.CelebornRackResolver;
public class SlotsAllocatorRackAwareSuiteJ {
@@ -68,7 +69,8 @@ public class SlotsAllocatorRackAwareSuiteJ {
List<WorkerInfo> workers = prepareWorkers(resolver);
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true);
+ SlotsAllocator.offerSlotsRoundRobin(
+ workers, partitionIds, true, true,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
Consumer<PartitionLocation> assertCustomer =
new Consumer<PartitionLocation>() {
@@ -103,7 +105,8 @@ public class SlotsAllocatorRackAwareSuiteJ {
List<WorkerInfo> workers = prepareWorkers(resolver);
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds, true, true);
+ SlotsAllocator.offerSlotsRoundRobin(
+ workers, partitionIds, true, true,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
Consumer<PartitionLocation> assertConsumer =
new Consumer<PartitionLocation>() {
diff --git
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
index ccdeb5b2d..e82742bab 100644
---
a/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
+++
b/master/src/test/java/org/apache/celeborn/service/deploy/master/SlotsAllocatorSuiteJ.java
@@ -36,6 +36,7 @@ import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.meta.DiskInfo;
import org.apache.celeborn.common.meta.WorkerInfo;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.common.protocol.StorageInfo;
public class SlotsAllocatorSuiteJ {
private List<WorkerInfo> prepareWorkers(boolean hasDisks) {
@@ -239,7 +240,8 @@ public class SlotsAllocatorSuiteJ {
conf.masterSlotAssignLoadAwareDiskGroupNum(),
conf.masterSlotAssignLoadAwareDiskGroupGradient(),
conf.masterSlotAssignLoadAwareFlushTimeWeight(),
- conf.masterSlotAssignLoadAwareFetchTimeWeight());
+ conf.masterSlotAssignLoadAwareFetchTimeWeight(),
+ StorageInfo.ALL_TYPES_AVAILABLE_MASK);
if (expectSuccess) {
if (shouldReplicate) {
slots.forEach(
@@ -275,10 +277,11 @@ public class SlotsAllocatorSuiteJ {
allocateToDiskSlots += worker.usedSlots();
}
if (shouldReplicate) {
- Assert.assertEquals(partitionIds.size() * 2, unknownDiskSlots +
allocateToDiskSlots);
+ Assert.assertTrue(partitionIds.size() * 2 >= unknownDiskSlots +
allocateToDiskSlots);
} else {
- Assert.assertEquals(partitionIds.size(), unknownDiskSlots +
allocateToDiskSlots);
+ Assert.assertTrue(partitionIds.size() >= unknownDiskSlots +
allocateToDiskSlots);
}
+ Assert.assertEquals(0, unknownDiskSlots);
} else {
assert slots.isEmpty()
: "Expect to fail to offer slots, but return " + slots.size() + "
slots.";
@@ -294,7 +297,8 @@ public class SlotsAllocatorSuiteJ {
CelebornConf conf = new CelebornConf();
conf.set("celeborn.active.storage.levels", "HDFS");
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
slots =
- SlotsAllocator.offerSlotsRoundRobin(workers, partitionIds,
shouldReplicate, false);
+ SlotsAllocator.offerSlotsRoundRobin(
+ workers, partitionIds, shouldReplicate, false,
StorageInfo.ALL_TYPES_AVAILABLE_MASK);
int allocatedPartitionCount = 0;
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e059ae4af..b1239b497 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -362,7 +362,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
throw new IOException(s"No available disks! suggested mountPoint
$suggestedMountPoint")
}
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
- if (dirs.isEmpty) {
+ if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
val fileInfo =
@@ -400,7 +400,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
return hdfsWriter
- } else {
+ } else if (dirs.nonEmpty &&
location.getStorageInfo.localDiskAvailable()) {
val dir = dirs(getNextIndex() % dirs.size)
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath,
mountPoints)
val shuffleDir = new File(dir, s"$appId/$shuffleId")
@@ -471,6 +471,8 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
exception,
DiskStatus.READ_OR_WRITE_FAILURE)
}
+ } else {
+ exception = new IOException("No storage available for location:" +
location.toString)
}
retryCount += 1
}