This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 1157d6a8c [CELEBORN-2166] Fast fail reduce stage if shuffle data is 
lost because of worker lost
1157d6a8c is described below

commit 1157d6a8c11966a2b02d0ab1a1f3501174421962
Author: Sanskar Modi <[email protected]>
AuthorDate: Wed Oct 29 10:04:03 2025 +0800

    [CELEBORN-2166] Fast fail reduce stage if shuffle data is lost because of 
worker lost
    
    ### What changes were proposed in this pull request?
    
    - Fix the WorkerStatusTracker logic, so unknown workers are marked 
correctly in excluded workers.
    - Trigger shuffle data lost if the worker hosting the shuffle data is lost.
    
    This can be extended to –
    - fast fail mapper stages as well before the commit starts.
    - with push replicate enabled with multiple workers loss.
    
    ### Why are the changes needed?
    
    Currently even if worker crashs or became unavailable for some reason and 
marked as lost by Master, reduce stage still try to read data from it and fail 
after running for sometime which is in-efficient. We can detect this early and 
fail the reduce stage with SHUFFLE_DATA_LOST before starting the stage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NA
    
    ### How was this patch tested?
    WIP
    
    Closes #3496 from s0nskar/CELEBORN-2166.
    
    Authored-by: Sanskar Modi <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../org/apache/celeborn/client/LifecycleManager.scala   |  4 ++--
 .../apache/celeborn/client/WorkerStatusTracker.scala    |  6 +++---
 .../client/commit/ReducePartitionCommitHandler.scala    | 16 +++++++++++++++-
 .../celeborn/client/WorkerStatusTrackerSuite.scala      | 17 ++++++++++++-----
 .../scala/org/apache/celeborn/common/CelebornConf.scala |  9 +++++++++
 docs/configuration/client.md                            |  1 +
 6 files changed, 42 insertions(+), 11 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala 
b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
index bb008edab..231898535 100644
--- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala
@@ -593,7 +593,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
                 e)
               connectFailedWorkers.put(
                 workerInfo,
-                (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+                (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
           }
           iter.remove()
         }
@@ -611,7 +611,7 @@ class LifecycleManager(val appUniqueId: String, val conf: 
CelebornConf) extends
         logError(s"Init rpc client failed for $shuffleId on $workerInfo during 
reserve slots, reason: Timeout.")
         connectFailedWorkers.put(
           workerInfo,
-          (StatusCode.WORKER_UNKNOWN, System.currentTimeMillis()))
+          (StatusCode.WORKER_UNRESPONSIVE, System.currentTimeMillis()))
       }
     }
   }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 698032014..2e94e6cb4 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -157,8 +157,7 @@ class WorkerStatusTracker(
       excludedWorkers.asScala.foreach {
         case (workerInfo: WorkerInfo, (statusCode, registerTime)) =>
           statusCode match {
-            case StatusCode.WORKER_UNKNOWN |
-                StatusCode.WORKER_UNRESPONSIVE |
+            case StatusCode.WORKER_UNRESPONSIVE |
                 StatusCode.COMMIT_FILE_EXCEPTION |
                 StatusCode.NO_AVAILABLE_WORKING_DIR |
                 StatusCode.RESERVE_SLOTS_FAILED |
@@ -185,7 +184,8 @@ class WorkerStatusTracker(
         }
       }
       for (worker <- res.unknownWorkers.asScala) {
-        if (!excludedWorkers.containsKey(worker)) {
+        if (!excludedWorkers.containsKey(worker) || excludedWorkers.get(
+            worker)._1 != StatusCode.WORKER_UNKNOWN) {
           excludedWorkers.put(worker, (StatusCode.WORKER_UNKNOWN, current))
           statusChanged = true
         }
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
index 9ba5405df..fb557ebbe 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala
@@ -136,10 +136,24 @@ class ReducePartitionCommitHandler(
     if (mockShuffleLost) {
       mockShuffleLostShuffle == shuffleId
     } else {
-      dataLostShuffleSet.contains(shuffleId)
+      dataLostShuffleSet.contains(shuffleId) || 
isStageDataLostInUnknownWorker(shuffleId)
     }
   }
 
+  private def isStageDataLostInUnknownWorker(shuffleId: Int): Boolean = {
+    if (conf.clientShuffleDataLostOnUnknownWorkerEnabled && 
!conf.clientPushReplicateEnabled) {
+      val allocatedWorkers = shuffleAllocatedWorkers.get(shuffleId)
+      if (allocatedWorkers != null) {
+        return workerStatusTracker.excludedWorkers.asScala.collect {
+          case (workerId, (status, _))
+              if status == StatusCode.WORKER_UNKNOWN && 
allocatedWorkers.contains(workerId) =>
+            workerId
+        }.nonEmpty
+      }
+    }
+    false
+  }
+
   override def isPartitionInProcess(shuffleId: Int, partitionId: Int): Boolean 
= {
     isStageEndOrInProcess(shuffleId)
   }
diff --git 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
index efaeb8439..0355b4d86 100644
--- 
a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
+++ 
b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala
@@ -36,7 +36,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     val statusTracker = new WorkerStatusTracker(celebornConf, null)
 
     val registerTime = System.currentTimeMillis()
-    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNKNOWN, registerTime))
+    statusTracker.excludedWorkers.put(mock("host1"), 
(StatusCode.WORKER_UNRESPONSIVE, registerTime))
     statusTracker.excludedWorkers.put(mock("host2"), 
(StatusCode.WORKER_SHUTDOWN, registerTime))
 
     // test reserve (only statusCode list in handleHeartbeatResponse)
@@ -46,7 +46,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     // only reserve host1
     Assert.assertEquals(
       statusTracker.excludedWorkers.get(mock("host1")),
-      (StatusCode.WORKER_UNKNOWN, registerTime))
+      (StatusCode.WORKER_UNRESPONSIVE, registerTime))
     
Assert.assertFalse(statusTracker.excludedWorkers.containsKey(mock("host2")))
 
     // add shutdown/excluded worker
@@ -55,13 +55,20 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
     statusTracker.handleHeartbeatResponse(response1)
 
     // test keep Unknown register time
+    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host1")))
     Assert.assertEquals(
-      statusTracker.excludedWorkers.get(mock("host1")),
-      (StatusCode.WORKER_UNKNOWN, registerTime))
+      statusTracker.excludedWorkers.get(mock("host1"))._1,
+      StatusCode.WORKER_UNKNOWN)
+    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+    Assert.assertEquals(
+      statusTracker.excludedWorkers.get(mock("host3"))._1,
+      StatusCode.WORKER_UNKNOWN)
 
     // test new added shutdown/excluded workers
     Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host0")))
-    Assert.assertTrue(statusTracker.excludedWorkers.containsKey(mock("host3")))
+    Assert.assertEquals(
+      statusTracker.excludedWorkers.get(mock("host0"))._1,
+      StatusCode.WORKER_EXCLUDED)
     
Assert.assertTrue(!statusTracker.excludedWorkers.containsKey(mock("host4")))
     Assert.assertTrue(statusTracker.shuttingWorkers.contains(mock("host4")))
 
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index ff296a98a..2fa19f5e3 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1102,6 +1102,8 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
     get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
   def clientAdaptiveOptimizeSkewedPartitionReadEnabled: Boolean =
     get(CLIENT_ADAPTIVE_OPTIMIZE_SKEWED_PARTITION_READ_ENABLED)
+  def clientShuffleDataLostOnUnknownWorkerEnabled: Boolean =
+    get(CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED)
 
   // //////////////////////////////////////////////////////
   //                   Client Shuffle                    //
@@ -6742,4 +6744,11 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(2)
 
+  val CLIENT_SHUFFLE_DATA_LOST_ON_UNKNOWN_WORKER_ENABLED: ConfigEntry[Boolean] 
=
+    buildConf("celeborn.client.shuffleDataLostOnUnknownWorker.enabled")
+      .categories("client")
+      .version("0.7.0")
+      .doc("Whether to mark shuffle data lost when unknown worker is 
detected.")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 8fd4ef30a..0b0facf44 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -120,6 +120,7 @@ license: |
 | celeborn.client.shuffle.rangeReadFilter.enabled | false | false | If a spark 
application have skewed partition, this value can set to true to improve 
performance. | 0.2.0 | celeborn.shuffle.rangeReadFilter.enabled | 
 | celeborn.client.shuffle.register.filterExcludedWorker.enabled | false | 
false | Whether to filter excluded worker when register shuffle. | 0.4.0 |  | 
 | celeborn.client.shuffle.reviseLostShuffles.enabled | false | false | Whether 
to revise lost shuffles. | 0.6.0 |  | 
+| celeborn.client.shuffleDataLostOnUnknownWorker.enabled | false | false | 
Whether to mark shuffle data lost when unknown worker is detected. | 0.7.0 |  | 
 | celeborn.client.slot.assign.maxWorkers | 10000 | false | Max workers that 
slots of one shuffle can be allocated on. Will choose the smaller positive one 
from Master side and Client side, see `celeborn.master.slot.assign.maxWorkers`. 
| 0.3.1 |  | 
 | celeborn.client.spark.fetch.cleanFailedShuffle | false | false | whether to 
clean those disk space occupied by shuffles which cannot be fetched | 0.6.0 |  
| 
 | celeborn.client.spark.fetch.cleanFailedShuffleInterval | 1s | false | the 
interval to clean the failed-to-fetch shuffle files, only valid when 
celeborn.client.spark.fetch.cleanFailedShuffle is enabled | 0.6.0 |  | 

Reply via email to