This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new a1bb38c88 [CELEBORN-2106] CommitFile/Reserved location shows detail
primary location UniqueId
a1bb38c88 is described below
commit a1bb38c888f9689ff3e9242cb6ac3dcedfed3190
Author: dz <[email protected]>
AuthorDate: Thu Aug 21 10:45:30 2025 +0800
[CELEBORN-2106] CommitFile/Reserved location shows detail primary location
UniqueId
…ation UniqueId info
### What changes were proposed in this pull request?
CommitFile/Reserved location shows detail primary location UniqueId info
### Why are the changes needed?
CommitFile/Reserved should display detailed partitionLocation uniqueId logs
to facilitate troubleshooting.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3420 from xy2953396112/controller_log.
Authored-by: dz <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
(cherry picked from commit 8a37a7ca17e279190c1ae5877b6a40019155779d)
Signed-off-by: SteNicholas <[email protected]>
---
.../service/deploy/worker/Controller.scala | 29 ++++++++++++----------
1 file changed, 16 insertions(+), 13 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
index 96a18a76a..6d8802f36 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala
@@ -290,8 +290,9 @@ private[deploy] class Controller(
Utils.getSlotsPerDisk(requestPrimaryLocs, requestReplicaLocs))
workerSource.incCounter(WorkerSource.SLOTS_ALLOCATED, primaryLocs.size() +
replicaLocs.size())
- logInfo(s"Reserved ${primaryLocs.size()} primary location" +
- s" and ${replicaLocs.size()} replica location for $shuffleKey ")
+ logInfo(s"Reserved ${primaryLocs.size()} primary location " +
+ s"${primaryLocs.asScala.map(_.getUniqueId).mkString(",")} and
${replicaLocs.size()} replica location " +
+ s"${replicaLocs.asScala.map(_.getUniqueId).mkString(",")} for
$shuffleKey ")
if (log.isDebugEnabled()) {
logDebug(s"primary: $primaryLocs\nreplica: $replicaLocs.")
}
@@ -467,7 +468,9 @@ private[deploy] class Controller(
epochWaitTimeMap.put(epoch, (commitStartWaitTime, context))
return
} else {
- logInfo(s"Start commitFiles for $shuffleKey")
+ logInfo(
+ s"Start commitFiles for ${shuffleKey}, primaryIds :
${primaryIds.asScala.mkString(",")}, "
+ + s"replicaIds : ${replicaIds.asScala.mkString(",")}")
commitInfo.status = CommitInfo.COMMIT_INPROCESS
workerSource.startTimer(WorkerSource.COMMIT_FILES_TIME, shuffleKey)
}
@@ -552,12 +555,12 @@ private[deploy] class Controller(
if (failedPrimaryIds.isEmpty && failedReplicaIds.isEmpty) {
logInfo(
s"CommitFiles for $shuffleKey success with " +
- s"${committedPrimaryIds.size()} committed primary partitions, " +
+ s"${committedPrimaryIds.size()} committed primary partitions
${committedPrimaryIds.asScala.mkString(",")}, " +
s"${emptyFilePrimaryIds.size()} empty primary partitions
${emptyFilePrimaryIds.asScala.mkString(",")}, " +
- s"${failedPrimaryIds.size()} failed primary partitions, " +
- s"${committedReplicaIds.size()} committed replica partitions, " +
+ s"${failedPrimaryIds.size()} failed primary partitions
${failedPrimaryIds.asScala.mkString(",")}, " +
+ s"${committedReplicaIds.size()} committed replica partitions
${committedReplicaIds.asScala.mkString(",")}, " +
s"${emptyFileReplicaIds.size()} empty replica partitions
${emptyFileReplicaIds.asScala.mkString(",")}, " +
- s"${failedReplicaIds.size()} failed replica partitions.")
+ s"${failedReplicaIds.size()} failed replica partitions
${failedReplicaIds.asScala.mkString(",")}.")
CommitFilesResponse(
StatusCode.SUCCESS,
committedPrimaryIdList,
@@ -572,12 +575,12 @@ private[deploy] class Controller(
} else {
logWarning(
s"CommitFiles for $shuffleKey failed with " +
- s"${committedPrimaryIds.size()} committed primary partitions, " +
- s"${emptyFilePrimaryIds.size()} empty primary partitions, " +
- s"${failedPrimaryIds.size()} failed primary partitions, " +
- s"${committedReplicaIds.size()} committed replica partitions, " +
- s"${emptyFileReplicaIds.size()} empty replica partitions, " +
- s"${failedReplicaIds.size()} failed replica partitions.")
+ s"${committedPrimaryIds.size()} committed primary partitions
${committedPrimaryIds.asScala.mkString(",")}, " +
+ s"${emptyFilePrimaryIds.size()} empty primary partitions
${emptyFilePrimaryIds.asScala.mkString(",")}, " +
+ s"${failedPrimaryIds.size()} failed primary partitions,
${failedPrimaryIds.asScala.mkString(",")}, " +
+ s"${committedReplicaIds.size()} committed replica partitions
${committedReplicaIds.asScala.mkString(",")}, " +
+ s"${emptyFileReplicaIds.size()} empty replica partitions
${emptyFileReplicaIds.asScala.mkString(",")}, " +
+ s"${failedReplicaIds.size()} failed replica partitions
${failedReplicaIds.asScala.mkString(",")}.")
CommitFilesResponse(
StatusCode.PARTIAL_SUCCESS,
committedPrimaryIdList,