AngersZhuuuu commented on code in PR #990:
URL:
https://github.com/apache/incubator-celeborn/pull/990#discussion_r1028933133
##########
client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala:
##########
@@ -919,104 +1072,83 @@ class LifecycleManager(appId: String, val conf:
CelebornConf) extends RpcEndpoin
// ask allLocations workers holding partitions to commit files
val masterPartMap = new ConcurrentHashMap[String, PartitionLocation]
val slavePartMap = new ConcurrentHashMap[String, PartitionLocation]
- val committedMasterIds = ConcurrentHashMap.newKeySet[String]()
- val committedSlaveIds = ConcurrentHashMap.newKeySet[String]()
- val committedMasterStorageInfos = new ConcurrentHashMap[String,
StorageInfo]()
- val committedSlaveStorageInfos = new ConcurrentHashMap[String,
StorageInfo]()
- val committedMapIdBitmap = new ConcurrentHashMap[String, RoaringBitmap]()
- val failedMasterPartitionIds = new ConcurrentHashMap[String, WorkerInfo]()
- val failedSlavePartitionIds = new ConcurrentHashMap[String, WorkerInfo]()
val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
val commitFilesFailedWorkers = new ConcurrentHashMap[WorkerInfo,
(StatusCode, Long)]()
-
- val currentShuffleFileCount = new LongAdder
val commitFileStartTime = System.nanoTime()
- val parallelism = Math.min(workerSnapshots(shuffleId).size(),
conf.rpcMaxParallelism)
- ThreadUtils.parmap(
- allocatedWorkers.asScala.to,
- "CommitFiles",
- parallelism) { case (worker, partitionLocationInfo) =>
- if (partitionLocationInfo.containsShuffle(shuffleId.toString)) {
- val masterParts =
partitionLocationInfo.getAllMasterLocations(shuffleId.toString)
- val slaveParts =
partitionLocationInfo.getAllSlaveLocations(shuffleId.toString)
- masterParts.asScala.foreach { p =>
- val partition = new PartitionLocation(p)
- partition.setFetchPort(worker.fetchPort)
- partition.setPeer(null)
- masterPartMap.put(partition.getUniqueId, partition)
- }
- slaveParts.asScala.foreach { p =>
- val partition = new PartitionLocation(p)
- partition.setFetchPort(worker.fetchPort)
- partition.setPeer(null)
- slavePartMap.put(partition.getUniqueId, partition)
- }
-
- val masterIds = masterParts.asScala.map(_.getUniqueId).asJava
- val slaveIds = slaveParts.asScala.map(_.getUniqueId).asJava
-
- val commitFiles = CommitFiles(
- applicationId,
- shuffleId,
- masterIds,
- slaveIds,
- shuffleMapperAttempts.get(shuffleId))
- val res = requestCommitFiles(worker.endpoint, commitFiles)
-
- res.status match {
- case StatusCode.SUCCESS => // do nothing
- case StatusCode.PARTIAL_SUCCESS | StatusCode.SHUFFLE_NOT_REGISTERED
| StatusCode.FAILED =>
- logDebug(s"Request $commitFiles return ${res.status} for " +
- s"${Utils.makeShuffleKey(applicationId, shuffleId)}")
- commitFilesFailedWorkers.put(worker, (res.status,
System.currentTimeMillis()))
- case _ => // won't happen
- }
-
- // record committed partitionIds
- committedMasterIds.addAll(res.committedMasterIds)
- committedSlaveIds.addAll(res.committedSlaveIds)
-
- // record committed partitions storage hint and disk hint
- committedMasterStorageInfos.putAll(res.committedMasterStorageInfos)
- committedSlaveStorageInfos.putAll(res.committedSlaveStorageInfos)
+ val shuffleCommitInfo = committedPartitionInfo.get(shuffleId)
Review Comment:
We store all commit related result in shuffleCommitInfo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]