This is an automated email from the ASF dual-hosted git repository.
yl09099 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 69b2d5178 [#2503] fix(spark): Skip reassignment number check on
partition split (#2504)
69b2d5178 is described below
commit 69b2d517873ec8493213b58731bb9e43c53511a8
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jun 26 10:24:46 2025 +0800
[#2503] fix(spark): Skip reassignment number check on partition split
(#2504)
Merge pull request #2504 from zuston/reassinBug
[Bug] Incorrect reassignment server number for partition split (Closes
#2503)
---
.../apache/spark/shuffle/handle/MutableShuffleHandleInfo.java | 9 +++++++++
.../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java | 2 +-
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 9a47fe7a3..8e0fe9d35 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -280,6 +280,11 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
public void checkPartitionReassignServerNum(
Set<Integer> partitionIds, int legalReassignServerNum) {
for (int partitionId : partitionIds) {
+ // skip the split partition id. Some tasks may reassign without
partition split tag,
+ // but this partition may have been split.
+ if (isPartitionSplit(partitionId)) {
+ continue;
+ }
Map<Integer, List<ShuffleServerInfo>> replicas =
partitionReplicaAssignedServers.get(partitionId);
for (List<ShuffleServerInfo> servers : replicas.values()) {
@@ -403,4 +408,8 @@ public class MutableShuffleHandleInfo extends
ShuffleHandleInfoBase {
.getOrDefault(partitionId, Collections.emptyMap())
.keySet();
}
+
+ public boolean isPartitionSplit(int partitionId) {
+ return excludedServerForPartitionToReplacements.containsKey(partitionId);
+ }
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index cfa364bef..e9085cfa2 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1050,7 +1050,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
boolean serverHasReplaced = false;
Set<ShuffleServerInfo> updatedReassignServers;
- if (!partitionSplit) {
+ if (!partitionSplit &&
!internalHandle.isPartitionSplit(partitionId)) {
Set<ShuffleServerInfo> replacements =
internalHandle.getReplacements(serverId);
if (CollectionUtils.isEmpty(replacements)) {
replacements =