This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 3cf1802e7 [CELEBORN-1928][CIP-12] Support HARD_SPLIT in PushMergedData 
should support handle older worker success response
3cf1802e7 is described below

commit 3cf1802e78f1e0d66e22255b5e1063c85b553332
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Mar 27 15:06:01 2025 +0800

    [CELEBORN-1928][CIP-12] Support HARD_SPLIT in PushMergedData should support 
handle older worker success response
    
    ### What changes were proposed in this pull request?
    Support HARD_SPLIT in PushMergedData should support handle older worker 
success response
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #3172 from AngersZhuuuu/CELEBORN-1928-.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../celeborn/service/deploy/worker/PushDataHandler.scala    | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index c8686fda1..80da9ffbc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -632,7 +632,18 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
           // Handle the response from replica
           val wrappedCallback = new RpcResponseCallback() {
             override def onSuccess(response: ByteBuffer): Unit = {
-              val replicaReason = response.get()
+              // During the rolling upgrade of the worker cluster, it is 
possible for
+              // the primary worker to be upgraded to a new version that 
includes
+              // the changes from [CELEBORN-1721], while the replica worker is 
still running
+              // on an older version that does not have these changes.
+              // In this scenario, the replica may return a response without 
any context
+              // when status of SUCCESS.
+              val replicaReason =
+                if (response.remaining() > 0) {
+                  response.get()
+                } else {
+                  StatusCode.SUCCESS
+                }
               if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
                 if (response.remaining() > 0) {
                   try {

Reply via email to