This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 937561f3c [CELEBORN-1919] Hardsplit batch tracking should be disabled 
when pushing only a single replica
937561f3c is described below

commit 937561f3cda2db90417b978bbe33cba35de0f10c
Author: Shuang <[email protected]>
AuthorDate: Mon Apr 28 15:44:39 2025 +0800

    [CELEBORN-1919] Hardsplit batch tracking should be disabled when pushing 
only a single replica
    
    ### What changes were proposed in this pull request?
    Remove unused batch data tracking
    
    ### Why are the changes needed?
    When the optimization to handle skewed partition reads is enabled, Celeborn 
typically tracks all failed batches to avoid potential data duplication. 
However, Tracking of hardsplit batch can be safely disabled when pushing a 
single replica, as data never write to partition data file.  as  these batches 
would definitively not write to their previous partition locations. Therefore, 
Celeborn does not need to track these batches, as doing so could overload the 
Driver
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual test & Pass GA
    
    Closes #3164 from RexXiong/CELEBORN-1919.
    
    Authored-by: Shuang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index c642223ea..c6fc7f6bd 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1120,7 +1120,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                       attemptId,
                       partitionId,
                       nextBatchId);
-                  if (dataPushFailureTrackingEnabled) {
+                  if (dataPushFailureTrackingEnabled && pushReplicateEnabled) {
                     pushState.addFailedBatch(
                         latest.getUniqueId(), new PushFailedBatch(mapId, 
attemptId, nextBatchId));
                   }
@@ -1560,7 +1560,7 @@ public class ShuffleClientImpl extends ShuffleClient {
                 pushState.onSuccess(hostPort);
                 callback.onSuccess(ByteBuffer.wrap(new byte[] 
{StatusCode.SOFT_SPLIT.getValue()}));
               } else {
-                if (dataPushFailureTrackingEnabled) {
+                if (dataPushFailureTrackingEnabled && pushReplicateEnabled) {
                   for (DataBatches.DataBatch resubmitBatch : 
batchesNeedResubmit) {
                     pushState.addFailedBatch(
                         resubmitBatch.loc.getUniqueId(),

Reply via email to