This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 4758e195f [CELEBORN-2081] PushDataHandler onFailure log shuffle key
4758e195f is described below

commit 4758e195f40aa42ddcf80eac3cdeb0d26a408d37
Author: sychen <syc...@ctrip.com>
AuthorDate: Fri Jul 25 20:53:46 2025 +0800

    [CELEBORN-2081] PushDataHandler onFailure log shuffle key
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
    `id-epoch` cannot locate which application is specific. We can add shuffle 
key to the log.
    
    ```
    25/07/25 04:23:06,439 ERROR [celeborn-push-timeout-checker-0] 
PushDataHandler: PushMergedData replicate failed for partitionLocation: 
PartitionLocation[
      id-epoch:1623-0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    Closes #3390 from cxzl25/CELEBORN-2081.
    
    Authored-by: sychen <syc...@ctrip.com>
    Signed-off-by: SteNicholas <programg...@163.com>
    (cherry picked from commit abd6233a50b3ff915b29e01a2d37d9974dfbfe20)
    Signed-off-by: SteNicholas <programg...@163.com>
---
 .../apache/celeborn/service/deploy/worker/PushDataHandler.scala   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 85eb45dbd..3e526a71e 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -337,7 +337,9 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             }
 
             override def onFailure(e: Throwable): Unit = {
-              logError(s"PushData replication failed for partitionLocation: 
$location", e)
+              logError(
+                s"PushData replication failed for shuffle: $shuffleKey, 
partitionLocation: $location",
+                e)
               // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA
@@ -717,7 +719,9 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
             }
 
             override def onFailure(e: Throwable): Unit = {
-              logError(s"PushMergedData replicate failed for 
partitionLocation: $location", e)
+              logError(
+                s"PushMergedData replicate failed for shuffle: $shuffleKey, 
partitionLocation: $location",
+                e)
               // 1. Throw PUSH_DATA_WRITE_FAIL_REPLICA by replica peer worker
               // 2. Throw PUSH_DATA_TIMEOUT_REPLICA by TransportResponseHandler
               // 3. Throw IOException by channel, convert to 
PUSH_DATA_CONNECTION_EXCEPTION_REPLICA

Reply via email to