This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 066e71e6c [#2563] improvement(spark): Add more logs of shuffle write
on reassignment failure (#2564)
066e71e6c is described below
commit 066e71e6c50859e38457381a81393361fd400d3e
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 30 17:22:00 2025 +0800
[#2563] improvement(spark): Add more logs of shuffle write on reassignment
failure (#2564)
### What changes were proposed in this pull request?
This PR is to add more logs of shuffle write on reassignment failure for
the next inspection when it occurs.
### Why are the changes needed?
for #2563 .
```
25/07/29 08:13:25 ERROR TaskResources: Task 14257 failed by error:
org.apache.uniffle.common.exception.RssException: No available replacement
server for: 10.xxxxx-23100-23104
at
org.apache.spark.shuffle.writer.RssShuffleWriter.reassignAndResendBlocks(RssShuffleWriter.java:832)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.collectFailedBlocksToResend(RssShuffleWriter.java:672)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.checkDataIfAnyFailure(RssShuffleWriter.java:570)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.checkBlockSendResult(RssShuffleWriter.java:532)
at
org.apache.spark.shuffle.writer.RssShuffleWriter.internalCheckBlockSendResult(RssShuffleWriter.java:518)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Neen't
---
.../spark/shuffle/writer/TaskAttemptAssignment.java | 13 +++++++++++++
.../apache/spark/shuffle/writer/RssShuffleWriter.java | 19 ++++++++++++++++---
2 files changed, 29 insertions(+), 3 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
index f3d89c583..18eee16e2 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/TaskAttemptAssignment.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.writer;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -66,4 +67,16 @@ public class TaskAttemptAssignment {
PartitionSplitInfo splitInfo =
this.handle.getPartitionSplitInfo(partitionId);
return splitInfo.isSplit() && splitInfo.getMode() ==
PartitionSplitMode.LOAD_BALANCE;
}
+
+ /**
+ * @param partitionId
+ * @return all assigned shuffle servers for one partition id
+ */
+ public List<ShuffleServerInfo> list(int partitionId) {
+ Map<Integer, List<ShuffleServerInfo>> servers =
this.handle.getAllPartitionServersForReader();
+ if (servers == null) {
+ return Collections.emptyList();
+ }
+ return servers.get(partitionId);
+ }
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index e2c29b324..b6776fa2f 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -766,9 +766,17 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
MutableShuffleHandleInfo handle =
MutableShuffleHandleInfo.fromProto(response.getHandle());
taskAttemptAssignment.update(handle);
- LOG.info(
- "Success to reassign. The latest available assignment is {}",
- handle.getAvailablePartitionServersForWriter());
+
+ // print the lastest assignment for those reassignment partition ids
+ Map<Integer, List<String>> reassignments = new HashMap<>();
+ for (Map.Entry<Integer, List<ReceivingFailureServer>> entry :
+ failurePartitionToServers.entrySet()) {
+ int partitionId = entry.getKey();
+ List<ShuffleServerInfo> servers =
taskAttemptAssignment.retrieve(partitionId);
+ reassignments.put(
+ partitionId, servers.stream().map(x ->
x.getId()).collect(Collectors.toList()));
+ }
+ LOG.info("Succeed to reassign that the latest assignment is {}",
reassignments);
} catch (Exception e) {
throw new RssException(
"Errors on reassign on block send failure. failure
partition->servers : "
@@ -827,6 +835,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
// It can not work if we want to use multiple replicas.
ShuffleServerInfo replacement = servers.get(0);
if
(blockStatus.getShuffleServerInfo().getId().equals(replacement.getId())) {
+ LOG.warn(
+ "PartitionId:{} has the following assigned servers: {}. But
currently the replacement server:{} is the same with previous one!",
+ block.getPartitionId(),
+ taskAttemptAssignment.list(block.getPartitionId()),
+ replacement);
throw new RssException(
"No available replacement server for: " +
blockStatus.getShuffleServerInfo().getId());
}