This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 066e71e6c [#2563] improvement(spark): Add more logs of shuffle write 
on reassignment failure (#2564)
066e71e6c is described below

commit 066e71e6c50859e38457381a81393361fd400d3e
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 30 17:22:00 2025 +0800

    [#2563] improvement(spark): Add more logs of shuffle write on reassignment 
failure (#2564)
    
    ### What changes were proposed in this pull request?
    
    This PR is to add more logs of shuffle write on reassignment failure for 
the next inspection when it occurs.
    
    ### Why are the changes needed?
    
    for #2563 .
    
    ```
    25/07/29 08:13:25 ERROR TaskResources: Task 14257 failed by error:
    org.apache.uniffle.common.exception.RssException: No available replacement 
server for: 10.xxxxx-23100-23104
            at 
org.apache.spark.shuffle.writer.RssShuffleWriter.reassignAndResendBlocks(RssShuffleWriter.java:832)
            at 
org.apache.spark.shuffle.writer.RssShuffleWriter.collectFailedBlocksToResend(RssShuffleWriter.java:672)
            at 
org.apache.spark.shuffle.writer.RssShuffleWriter.checkDataIfAnyFailure(RssShuffleWriter.java:570)
            at 
org.apache.spark.shuffle.writer.RssShuffleWriter.checkBlockSendResult(RssShuffleWriter.java:532)
            at 
org.apache.spark.shuffle.writer.RssShuffleWriter.internalCheckBlockSendResult(RssShuffleWriter.java:518)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Neen't
---
 .../spark/shuffle/writer/TaskAttemptAssignment.java   | 13 +++++++++++++
 .../apache/spark/shuffle/writer/RssShuffleWriter.java | 19 ++++++++++++++++---
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
index f3d89c583..18eee16e2 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.shuffle.writer;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -66,4 +67,16 @@ public class TaskAttemptAssignment {
     PartitionSplitInfo splitInfo = 
this.handle.getPartitionSplitInfo(partitionId);
     return splitInfo.isSplit() && splitInfo.getMode() == 
PartitionSplitMode.LOAD_BALANCE;
   }
+
+  /**
+   * @param partitionId
+   * @return all assigned shuffle servers for one partition id
+   */
+  public List<ShuffleServerInfo> list(int partitionId) {
+    Map<Integer, List<ShuffleServerInfo>> servers = 
this.handle.getAllPartitionServersForReader();
+    if (servers == null) {
+      return Collections.emptyList();
+    }
+    return servers.get(partitionId);
+  }
 }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index e2c29b324..b6776fa2f 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -766,9 +766,17 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       }
       MutableShuffleHandleInfo handle = 
MutableShuffleHandleInfo.fromProto(response.getHandle());
       taskAttemptAssignment.update(handle);
-      LOG.info(
-          "Success to reassign. The latest available assignment is {}",
-          handle.getAvailablePartitionServersForWriter());
+
+      // print the lastest assignment for those reassignment partition ids
+      Map<Integer, List<String>> reassignments = new HashMap<>();
+      for (Map.Entry<Integer, List<ReceivingFailureServer>> entry :
+          failurePartitionToServers.entrySet()) {
+        int partitionId = entry.getKey();
+        List<ShuffleServerInfo> servers = 
taskAttemptAssignment.retrieve(partitionId);
+        reassignments.put(
+            partitionId, servers.stream().map(x -> 
x.getId()).collect(Collectors.toList()));
+      }
+      LOG.info("Succeed to reassign that the latest assignment is {}", 
reassignments);
     } catch (Exception e) {
       throw new RssException(
           "Errors on reassign on block send failure. failure 
partition->servers : "
@@ -827,6 +835,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       // It can not work if we want to use multiple replicas.
       ShuffleServerInfo replacement = servers.get(0);
       if 
(blockStatus.getShuffleServerInfo().getId().equals(replacement.getId())) {
+        LOG.warn(
+            "PartitionId:{} has the following assigned servers: {}. But 
currently the replacement server:{} is the same with previous one!",
+            block.getPartitionId(),
+            taskAttemptAssignment.list(block.getPartitionId()),
+            replacement);
         throw new RssException(
             "No available replacement server for: " + 
blockStatus.getShuffleServerInfo().getId());
       }

Reply via email to