This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 3cf1802e7 [CELEBORN-1928][CIP-12] Support HARD_SPLIT in PushMergedData
should support handle older worker success response
3cf1802e7 is described below
commit 3cf1802e78f1e0d66e22255b5e1063c85b553332
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Mar 27 15:06:01 2025 +0800
[CELEBORN-1928][CIP-12] Support HARD_SPLIT in PushMergedData should support
handle older worker success response
### What changes were proposed in this pull request?
Support HARD_SPLIT in PushMergedData should support handle older worker
success response
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #3172 from AngersZhuuuu/CELEBORN-1928-.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../celeborn/service/deploy/worker/PushDataHandler.scala | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index c8686fda1..80da9ffbc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -632,7 +632,18 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
- val replicaReason = response.get()
+ // During the rolling upgrade of the worker cluster, it is
possible for
+ // the primary worker to be upgraded to a new version that
includes
+ // the changes from [CELEBORN-1721], while the replica worker is
still running
+ // on an older version that does not have these changes.
+ // In this scenario, the replica may return a response without
any context
+ // when status of SUCCESS.
+ val replicaReason =
+ if (response.remaining() > 0) {
+ response.get()
+ } else {
+ StatusCode.SUCCESS
+ }
if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
if (response.remaining() > 0) {
try {