This is an automated email from the ASF dual-hosted git repository. zhouky pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 4fd6e56da5e6006a8e9ed400f6e6a1f0e35425d3 Author: zky.zhoukeyong <[email protected]> AuthorDate: Wed Jun 7 22:42:26 2023 +0800 fast fail for reserve slots --- .../apache/celeborn/client/LifecycleManager.scala | 30 ++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 1f766fed6..02a5e9fce 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -1164,18 +1164,22 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin val parallelism = Math.min(Math.max(1, slots.size()), conf.rpcMaxParallelism) ThreadUtils.parmap(slots.asScala.to, "ReserveSlot", parallelism) { case (workerInfo, (masterLocations, slaveLocations)) => - val res = requestReserveSlots( - workerInfo.endpoint, - ReserveSlots( - applicationId, - shuffleId, - masterLocations, - slaveLocations, - partitionSplitThreshold, - partitionSplitMode, - getPartitionType(shuffleId), - rangeReadFilter, - userIdentifier)) + val res = if (blacklist.contains(workerInfo)) { + ReserveSlotsResponse(StatusCode.RESERVE_SLOTS_FAILED, "") + } else { + requestReserveSlots( + workerInfo.endpoint, + ReserveSlots( + applicationId, + shuffleId, + masterLocations, + slaveLocations, + partitionSplitThreshold, + partitionSplitMode, + getPartitionType(shuffleId), + rangeReadFilter, + userIdentifier)) + } if (res.status.equals(StatusCode.SUCCESS)) { logDebug(s"Successfully allocated " + s"partitions buffer for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + @@ -1185,13 +1189,13 @@ class LifecycleManager(appId: String, val conf: CelebornConf) extends RpcEndpoin s" reserve buffers for ${Utils.makeShuffleKey(applicationId, shuffleId)}" + s" from worker ${workerInfo.readableAddress()}. Reason: ${res.reason}") reserveSlotFailedWorkers.put(workerInfo, (res.status, System.currentTimeMillis())) + recordWorkerFailure(reserveSlotFailedWorkers) } } if (failureInfos.asScala.nonEmpty) { logError(s"Aggregated error of reserveSlots failure:${failureInfos.asScala.foldLeft("")( (x, y) => s"$x \n $y")}") } - recordWorkerFailure(reserveSlotFailedWorkers) new util.ArrayList[WorkerInfo](reserveSlotFailedWorkers.asScala.keys.toList.asJava) }
