This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 1157d6a8c [CELEBORN-2166] Fast fail reduce stage if shuffle data is
lost because of worker lost
1157d6a8c is described below
commit 1157d6a8c11966a2b02d0ab1a1f3501174421962
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Oct 29 10:04:03 2025 +0800
[CELEBORN-2166] Fast fail reduce stage if shuffle data is lost because of
worker lost
### What changes were proposed in this pull request?
- Fix the WorkerStatusTracker logic, so unknown workers are marked
correctly in excluded workers.
- Trigger shuffle data lost if the worker hosting the shuffle data is lost.
This can be extended to –
- fast fail mapper stages as well before the commit starts.
- with push replicate enabled with multiple workers loss.
### Why are the changes needed?
Currently even if worker crashs or became unavailable for some reason and
marked as lost by Master, reduce stage still try to read data from it and fail
after running for sometime which is in-efficient. We can detect this early and
fail the reduce stage with SHUFFLE_DATA_LOST before starting the stage.
### Does this PR introduce _any_ user-facing change?
NA
### How was this patch tested?
WIP
Closes #3496 from s0nskar/CELEBORN-2166.
Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../org/apache/celeborn/client/LifecycleManager.scala | 4 ++--
.../apache/celeborn/client/WorkerStatusTracker.scala | 6 +++---
.../client/commit/ReducePartitionCommitHandler.scala | 16 +++++++++++++++-
.../celeborn/client/WorkerStatusTrackerSuite.scala | 17 ++++++++++++-----
.../scala/org/apache/celeborn/common/CelebornConf.scala | 9 +++++++++
docs/configuration/client.md | 1 +
6 files changed, 42 insertions(+), 11 deletions(-)
diff --git
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bb008edab..231898535 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -593,7 +593,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
e)
connectFailedWorkers.put(
workerInfo,
- (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+ (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
}
iter.remove()
}
@@ -611,7 +611,7 @@ class LifecycleManager(val appUniqueId: String, val conf:
CelebornConf) extends
logError(s"Init rpc client failed for $shuffleId on $workerInfo during
reserve slots, reason: Timeout.")
connectFailedWorkers.put(
workerInfo,
- (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+ (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
}
}
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 698032014..2e94e6cb4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -157,8 +157,7 @@ class WorkerStatusTracker(
excludedWorkers.asScala.foreach {
case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
statusCode match {
- case StatusCode.WORKER_UNKNOWN |
- StatusCode.WORKER_UNRESPONSIVE |
+ case StatusCode.WORKER_UNRESPONSIVE |
StatusCode.COMMIT_FILE_EXCEPTION |
StatusCode.NO_AVAILABLE_WORKING_DIR |
StatusCode.RESERVE_SLOTS_FAILED |
@@ -185,7 +184,8 @@ class WorkerStatusTracker(
}
}
for (worker <- res.unknownWorkers.asScala) {
- if (!excludedWorkers.containsKey(worker)) {
+ if (!excludedWorkers.containsKey(worker) || excludedWorkers.get(
+ worker)._1 != StatusCode.WORKER_UNKNOWN) {
excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
statusChanged = true
}
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 9ba5405df..fb557ebbe 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -136,10 +136,24 @@ class ReducePartitionCommitHandler(
if (mockShuffleLost) {
mockShuffleLostShuffle == shuffleId
} else {
- dataLostShuffleSet.contains(shuffleId)
+ dataLostShuffleSet.contains(shuffleId) ||
isStageDataLostInUnknownWorker(shuffleId)
}
}
+ private def isStageDataLostInUnknownWorker(shuffleId: Int): Boolean = {
+ if (conf.clientShuffleDataLostOnUnknownWorkerEnabled &&
!conf.clientPushReplicateEnabled) {
+ val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+ if (allocatedWorkers != null) {
+ return workerStatusTracker.excludedWorkers.asScala.collect {
+ case (workerId, (status, _))
+ if status == StatusCode.WORKER_UNKNOWN &&
allocatedWorkers.contains(workerId) =>
+ workerId
+ }.nonEmpty
+ }
+ }
+ false
+ }
+
override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean
= {
isStageEndOrInProcess(shuffleId)
}
diff --git
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index efaeb8439..0355b4d86 100644
---
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -36,7 +36,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
val statusTracker = new WorkerStatusTracker(celebornConf, null)
val registerTime = System.currentTimeMillis()
- statusTracker.excludedWorkers.put(mock("host1"),
(StatusCode.WORKER_UNKNOWN, registerTime))
+ statusTracker.excludedWorkers.put(mock("host1"),
(StatusCode.WORKER_UNRESPONSIVE, registerTime))
statusTracker.excludedWorkers.put(mock("host2"),
(StatusCode.WORKER_SHUTDOWN, registerTime))
// test reserve (only statusCode list in handleHeartbeatResponse)
@@ -46,7 +46,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
// only reserve host1
Assert.assertEquals(
statusTracker.excludedWorkers.get(mock("host1")),
- (StatusCode.WORKER_UNKNOWN, registerTime))
+ (StatusCode.WORKER_UNRESPONSIVE, registerTime))
Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2")))
// add shutdown/excluded worker
@@ -55,13 +55,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
statusTracker.handleHeartbeatResponse(response1)
// test keep Unknown register time
+ Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host1")))
Assert.assertEquals(
- statusTracker.excludedWorkers.get(mock("host1")),
- (StatusCode.WORKER_UNKNOWN, registerTime))
+ statusTracker.excludedWorkers.get(mock("host1"))._1,
+ StatusCode.WORKER_UNKNOWN)
+ Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+ Assert.assertEquals(
+ statusTracker.excludedWorkers.get(mock("host3"))._1,
+ StatusCode.WORKER_UNKNOWN)
// test new added shutdown/excluded workers
Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0")))
- Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+ Assert.assertEquals(
+ statusTracker.excludedWorkers.get(mock("host0"))._1,
+ StatusCode.WORKER_EXCLUDED)
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ff296a98a..2fa19f5e3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1102,6 +1102,8 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
def clientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
get(CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ_ENABLED)
+ def clientShuffleDataLostOnUnknownWorkerEnabled: Boolean =
+ get(CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED)
// //////////////////////////////////////////////////////
// Client Shuffle //
@@ -6742,4 +6744,11 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(2)
+ val CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED: ConfigEntry[Boolean]
=
+ buildConf("celeborn.client.shuffleDataLostOnUnknownWorker.enabled")
+ .categories("client")
+ .version("0.7.0")
+ .doc("Whether to mark shuffle data lost when unknown worker is
detected.")
+ .booleanConf
+ .createWithDefault(false)
}
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 8fd4ef30a..0b0facf44 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -120,6 +120,7 @@ license: |
| celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark
application have skewed partition, this value can set to true to improve
performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled |
| celeborn.client.shuffle.register.filterExcludedWorker.enabled | false |
false | Whether to filter excluded worker when register shuffle. | 0.4.0 | |
| celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether
to revise lost shuffles. | 0.6.0 | |
+| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | false | false |
Whether to mark shuffle data lost when unknown worker is detected. | 0.7.0 | |
| celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that
slots of one shuffle can be allocated on. Will choose the smaller positive one
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`.
| 0.3.1 | |
| celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to
clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 |
|
| celeborn.client.spark.fetch.cleanFailedShuffleInterval | 1s | false | the
interval to clean the failed-to-fetch shuffle files, only valid when
celeborn.client.spark.fetch.cleanFailedShuffle is enabled | 0.6.0 | |