This is an automated email from the ASF dual-hosted git repository.

yl09099 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b2d5178 [#2503] fix(spark): Skip reassignment number check on 
partition split (#2504)
69b2d5178 is described below

commit 69b2d517873ec8493213b58731bb9e43c53511a8
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jun 26 10:24:46 2025 +0800

    [#2503] fix(spark): Skip reassignment number check on partition split 
(#2504)
    
    Merge pull request #2504 from zuston/reassinBug
    
    [Bug] Incorrect reassignment server number for partition split (Closes 
#2503)
---
 .../apache/spark/shuffle/handle/MutableShuffleHandleInfo.java    | 9 +++++++++
 .../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java    | 2 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
index 9a47fe7a3..8e0fe9d35 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/handle/MutableShuffleHandleInfo.java
@@ -280,6 +280,11 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
   public void checkPartitionReassignServerNum(
       Set<Integer> partitionIds, int legalReassignServerNum) {
     for (int partitionId : partitionIds) {
+      // skip the split partition id. Some tasks may reassign without 
partition split tag,
+      // but this partition may have been split.
+      if (isPartitionSplit(partitionId)) {
+        continue;
+      }
       Map<Integer, List<ShuffleServerInfo>> replicas =
           partitionReplicaAssignedServers.get(partitionId);
       for (List<ShuffleServerInfo> servers : replicas.values()) {
@@ -403,4 +408,8 @@ public class MutableShuffleHandleInfo extends 
ShuffleHandleInfoBase {
         .getOrDefault(partitionId, Collections.emptyMap())
         .keySet();
   }
+
+  public boolean isPartitionSplit(int partitionId) {
+    return excludedServerForPartitionToReplacements.containsKey(partitionId);
+  }
 }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index cfa364bef..e9085cfa2 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -1050,7 +1050,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
           boolean serverHasReplaced = false;
 
           Set<ShuffleServerInfo> updatedReassignServers;
-          if (!partitionSplit) {
+          if (!partitionSplit && 
!internalHandle.isPartitionSplit(partitionId)) {
             Set<ShuffleServerInfo> replacements = 
internalHandle.getReplacements(serverId);
             if (CollectionUtils.isEmpty(replacements)) {
               replacements =

Reply via email to