FMX commented on code in PR #3347:
URL: https://github.com/apache/celeborn/pull/3347#discussion_r2204090611
##########
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala:
##########
@@ -2932,6 +2936,24 @@ object CelebornConf extends Logging {
SlotsAssignPolicy.LOADAWARE.name))
.createWithDefault(SlotsAssignPolicy.ROUNDROBIN.name)
+ val MASTER_SLOT_ASSIGN_INTERRUPTION_AWARE: ConfigEntry[Boolean] =
+ buildConf("celeborn.master.slot.assign.interruptionAware")
+ .categories("master")
+ .version("0.7.0")
+ .doc("If this is set to true, Celeborn master will prioritize partition
placement on workers that are not " +
+ "in scope for maintenance soon.")
+ .booleanConf
+ .createWithDefault(false)
+
+ val MASTER_SLOT_INTERRUPTION_AWARE_THRESHOLD: ConfigEntry[Int] =
+ buildConf("celeborn.master.slot.assign.interruptionAware.threshold")
+ .categories("master")
+ .doc("This controls what percentage of hosts would be selected for slot
selection in the first iteration " +
+ "of creating partitions. Default is 50%.")
+ .version("0.7.0")
+ .intConf
+ .createWithDefault(50)
Review Comment:
Add a `checkValue` here to avoid incorrect config values.
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -341,22 +481,53 @@ static List<WorkerInfo>
generateRackAwareWorkers(List<WorkerInfo> workers) {
return Collections.unmodifiableList(result);
}
+ /**
+ * Assigns slots in a roundrobin fashion given lists of primary and replica
worker candidates and
+ * other restrictions.
+ *
+ * @param slots the slots that have been assigned for each partitionId
+ * @param partitionIds the partitionIds that require slot selection still
+ * @param primaryWorkers list of worker candidates that can be used for
primary workers.
+ * @param replicaWorkers list of worker candidates that can be used for
replica workers.
+ * @param slotsRestrictions restrictions for each available slot based on
worker characteristics
+ * @param shouldReplicate if replication is enabled within the cluster
+ * @param shouldRackAware if rack-aware replication is enabled within the
cluster.
+ * @param availableStorageTypes available storage types coming from the
offer slots request.
+ * @param replicaSameAsPrimary if the worker candidates list for primaries
and replicas is the
+ * same. This is to prevent index mismatch while assigning slots across
both lists.
+ * @return the partitionIds that were not able to be assigned slots in this
iteration with the
+ * current primary and replica worker candidates and slot restrictions..
+ */
private static List<Integer> roundRobin(
Map<WorkerInfo, Tuple2<List<PartitionLocation>,
List<PartitionLocation>>> slots,
List<Integer> partitionIds,
- List<WorkerInfo> workers,
+ List<WorkerInfo> primaryWorkers,
+ List<WorkerInfo> replicaWorkers,
Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
boolean shouldReplicate,
boolean shouldRackAware,
- int availableStorageTypes) {
+ int availableStorageTypes,
+ boolean replicaSameAsPrimary) {
Review Comment:
It would be more intuitive if you renamed the last parameter to
`skipLocationsOnSameWorkerCheck`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]