This is an automated email from the ASF dual-hosted git repository. nicholasjiang pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push: new 4758e195f [CELEBORN-2081] PushDataHandler onFailure log shuffle key 4758e195f is described below commit 4758e195f40aa42ddcf80eac3cdeb0d26a408d37 Author: sychen <syc...@ctrip.com> AuthorDate: Fri Jul 25 20:53:46 2025 +0800 [CELEBORN-2081] PushDataHandler onFailure log shuffle key ### What changes were proposed in this pull request? ### Why are the changes needed? `id-epoch` cannot locate which application is specific. We can add shuffle key to the log. ``` 25/07/25 04:23:06,439 ERROR [celeborn-push-timeout-checker-0] PushDataHandler: PushMergedData replicate failed for partitionLocation: PartitionLocation[ id-epoch:1623-0 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA Closes #3390 from cxzl25/CELEBORN-2081. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: SteNicholas <programg...@163.com> (cherry picked from commit abd6233a50b3ff915b29e01a2d37d9974dfbfe20) Signed-off-by: SteNicholas <programg...@163.com> --- .../apache/celeborn/service/deploy/worker/PushDataHandler.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 85eb45dbd..3e526a71e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -337,7 +337,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } override def onFailure(e: Throwable): Unit = { - logError(s"PushData replication failed for partitionLocation: $location", e) + logError( + s"PushData replication failed for shuffle: $shuffleKey, partitionLocation: $location", + e) // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler // 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA @@ -717,7 +719,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler } override def onFailure(e: Throwable): Unit = { - logError(s"PushMergedData replicate failed for partitionLocation: $location", e) + logError( + s"PushMergedData replicate failed for shuffle: $shuffleKey, partitionLocation: $location", + e) // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler // 3. Throw IOException by channel, convert to PUSH_DATA_CONNECTION_EXCEPTION_REPLICA