This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 7542adf70 [CELEBORN-1948] Fix the issue where replica may lose data
when HARD_SPLIT occurs during handlePushMergeData
7542adf70 is described below
commit 7542adf708a89f8f15429faacb7e9aac5842e6d1
Author: Xianming Lei <[email protected]>
AuthorDate: Fri May 9 10:39:52 2025 +0800
[CELEBORN-1948] Fix the issue where replica may lose data when HARD_SPLIT
occurs during handlePushMergeData
### What changes were proposed in this pull request?
Fix data lost when push merged data of replica and hard split happen.
### Why are the changes needed?
There is a problem with replicate rpc callback. The code should satisfy the
following conditions: when comparing the status returned by primary and replica
data, the status on the left should be used as the final status for the client,
FAILURE > HARD_SPLIT > CONGESTION > SUCCESS. The status on the right cannot
cover the status on the left.
There are two problems with the code now
1. HARD_SPLIT can cover FAILURE, which will affect the exclude worker
logic, and there may be some problems
2. When processing a pushMergedData request, some partitionLocations are
committed, PushDataHandler cannot stop pushing replicas as long as there are
any partitions that have not been committed, otherwise data loss will occur.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes #3185 from leixm/CELEBORN-1948.
Lead-authored-by: Xianming Lei <[email protected]>
Co-authored-by: Xianming Lei <[email protected]>
Co-authored-by: Shuang <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/PushDataHandler.scala | 92 +++++++++++-----------
1 file changed, 46 insertions(+), 46 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index d74e7a380..9edf91741 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -490,8 +490,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
// Fetch real batchId from body will add more cost and no meaning for
replicate.
- val doReplicate =
- partitionIdToLocations.head._2 != null &&
partitionIdToLocations.head._2.hasPeer && isPrimary
+ val doReplicate = isPrimary && partitionIdToLocations.exists(p => p._2 !=
null && p._2.hasPeer)
// find FileWriters responsible for the data
var index = 0
@@ -602,7 +601,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
val writePromise = Promise[Array[StatusCode]]()
// for primary, send data to replica
if (doReplicate) {
- val location = partitionIdToLocations.head._2
+ val location = partitionIdToLocations.find(p => p._2 != null &&
p._2.hasPeer).get._2
val peer = location.getPeer
val peerWorker = new WorkerInfo(
peer.getHost,
@@ -631,49 +630,6 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
// Handle the response from replica
val wrappedCallback = new RpcResponseCallback() {
override def onSuccess(response: ByteBuffer): Unit = {
- // During the rolling upgrade of the worker cluster, it is
possible for
- // the primary worker to be upgraded to a new version that
includes
- // the changes from [CELEBORN-1721], while the replica worker is
still running
- // on an older version that does not have these changes.
- // In this scenario, the replica may return a response without
any context
- // when status of SUCCESS.
- val replicaReason =
- if (response.remaining() > 0) {
- response.get()
- } else {
- StatusCode.SUCCESS
- }
- if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
- if (response.remaining() > 0) {
- try {
- val pushMergedDataResponse:
PbPushMergedDataSplitPartitionInfo =
- TransportMessage.fromByteBuffer(
-
response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]()
- pushMergedDataCallback.unionReplicaSplitPartitions(
- pushMergedDataResponse.getSplitPartitionIndexesList,
- pushMergedDataResponse.getStatusCodesList)
- } catch {
- case e: CelebornIOException =>
- pushMergedDataCallback.onFailure(e)
- return
- case e: IllegalArgumentException =>
- pushMergedDataCallback.onFailure(new
CelebornIOException(e))
- return
- }
- } else {
- // During the rolling upgrade of the worker cluster, it is
possible for the primary worker
- // to be upgraded to a new version that includes the changes
from [CELEBORN-1721], while
- // the replica worker is still running on an older version
that does not have these changes.
- // In this scenario, the replica may return a response with
a status of HARD_SPLIT, but
- // will not provide a PbPushMergedDataSplitPartitionInfo.
- logWarning(
- s"The response status from the replica (shuffle
$shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no
PbPushMergedDataSplitPartitionInfo is present.")
- partitionIdToLocations.indices.foreach(index =>
- pushMergedDataCallback.addSplitPartition(index,
StatusCode.HARD_SPLIT))
- pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT)
- return
- }
- }
Try(Await.result(writePromise.future, Duration.Inf)) match {
case Success(result) =>
var index = 0
@@ -683,6 +639,50 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
}
index += 1
}
+ // During the rolling upgrade of the worker cluster, it is
possible for
+ // the primary worker to be upgraded to a new version that
includes
+ // the changes from [CELEBORN-1721], while the replica
worker is still running
+ // on an older version that does not have these changes.
+ // In this scenario, the replica may return a response
without any context
+ // when status of SUCCESS.
+ val replicaReason =
+ if (response.remaining() > 0) {
+ response.get()
+ } else {
+ StatusCode.SUCCESS
+ }
+ if (replicaReason == StatusCode.HARD_SPLIT.getValue) {
+ if (response.remaining() > 0) {
+ try {
+ val pushMergedDataResponse:
PbPushMergedDataSplitPartitionInfo =
+ TransportMessage.fromByteBuffer(
+
response).getParsedPayload[PbPushMergedDataSplitPartitionInfo]()
+ pushMergedDataCallback.unionReplicaSplitPartitions(
+ pushMergedDataResponse.getSplitPartitionIndexesList,
+ pushMergedDataResponse.getStatusCodesList)
+ } catch {
+ case e: CelebornIOException =>
+ pushMergedDataCallback.onFailure(e)
+ return
+ case e: IllegalArgumentException =>
+ pushMergedDataCallback.onFailure(new
CelebornIOException(e))
+ return
+ }
+ } else {
+ // During the rolling upgrade of the worker cluster, it
is possible for the primary worker
+ // to be upgraded to a new version that includes the
changes from [CELEBORN-1721], while
+ // the replica worker is still running on an older
version that does not have these changes.
+ // In this scenario, the replica may return a response
with a status of HARD_SPLIT, but
+ // will not provide a PbPushMergedDataSplitPartitionInfo.
+ logWarning(
+ s"The response status from the replica (shuffle
$shuffleKey map $mapId attempt $attemptId) is HARD_SPLIT, but no
PbPushMergedDataSplitPartitionInfo is present.")
+ partitionIdToLocations.indices.foreach(index =>
+ pushMergedDataCallback.addSplitPartition(index,
StatusCode.HARD_SPLIT))
+ }
+ pushMergedDataCallback.onSuccess(StatusCode.HARD_SPLIT)
+ return
+ }
+
// Only primary data enable replication will push data to
replica
Option(CongestionController.instance()) match {
case Some(congestionController) =>