This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 6c72f2680 [CELEBORN-2088] Fix NPE if 
`celeborn.client.spark.fetch.cleanFailedShuffle` enabled
6c72f2680 is described below

commit 6c72f26804be069001868b73a8e8db8700f7d109
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Jul 31 21:15:51 2025 -0700

    [CELEBORN-2088] Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` 
enabled
    
    ### What changes were proposed in this pull request?
    
    Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` is true.
    
    This PR also refine the code for `FailedShuffleCleaner`.
    ### Why are the changes needed?
    
    `failedShuffleCleaner` is null in executor end.
    ```
    25/07/29 17:58:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
    java.lang.NullPointerException: Cannot invoke 
"org.apache.celeborn.spark.FailedShuffleCleaner.reset()" because 
"this.failedShuffleCleaner" is null
            at 
org.apache.spark.shuffle.celeborn.SparkShuffleManager.stop(SparkShuffleManager.java:272)
 ~[celeborn-client-spark-3-shaded_2.12-0.6.0-rc3.jar:?]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT.
    
    Closes #3401 from turboFei/fix_npe_cleaner.
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 20a629a432d94d98c7d21dd34347565249922563)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../celeborn/spark/FailedShuffleCleaner.scala      | 41 ++++++++++------------
 .../shuffle/celeborn/SparkShuffleManager.java      |  5 +--
 2 files changed, 22 insertions(+), 24 deletions(-)

diff --git 
a/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
 
b/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
index e88f6f640..a2e88ba53 100644
--- 
a/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
+++ 
b/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
@@ -34,28 +34,10 @@ private[celeborn] class 
FailedShuffleCleaner(lifecycleManager: LifecycleManager)
   private val shufflesToBeCleaned = new LinkedBlockingQueue[Int]()
   private val cleanedShuffleIds = new mutable.HashSet[Int]
 
-  private lazy val cleanInterval =
-    lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
-
-  // for test
-  def reset(): Unit = {
-    shufflesToBeCleaned.clear()
-    cleanedShuffleIds.clear()
-    if (cleanerThreadPool != null) {
-      cleanerThreadPool.shutdownNow()
-      cleanerThreadPool = null
-    }
-  }
-
-  def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
-    val Array(appShuffleId, _, _) = 
SparkCommonUtils.decodeAppShuffleIdentifier(
-      appShuffleIdentifier)
-    lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
-      case (_, (celebornShuffleId, _)) => 
shufflesToBeCleaned.put(celebornShuffleId)
-    }
-  }
+  private val cleanInterval = 
lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
+  private var cleanerThreadPool: ScheduledExecutorService = _
 
-  def init(): Unit = {
+  private def init(): Unit = {
     cleanerThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
       "failedShuffleCleanerThreadPool")
     cleanerThreadPool.scheduleWithFixedDelay(
@@ -85,9 +67,24 @@ private[celeborn] class 
FailedShuffleCleaner(lifecycleManager: LifecycleManager)
 
   init()
 
+  def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
+    val Array(appShuffleId, _, _) = 
SparkCommonUtils.decodeAppShuffleIdentifier(
+      appShuffleIdentifier)
+    lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
+      case (_, (celebornShuffleId, _)) => 
shufflesToBeCleaned.put(celebornShuffleId)
+    }
+  }
+
   def removeCleanedShuffleId(celebornShuffleId: Int): Unit = {
     cleanedShuffleIds.remove(celebornShuffleId)
   }
 
-  private var cleanerThreadPool: ScheduledExecutorService = _
+  def stop(): Unit = {
+    shufflesToBeCleaned.clear()
+    cleanedShuffleIds.clear()
+    if (cleanerThreadPool != null) {
+      ThreadUtils.shutdown(cleanerThreadPool)
+      cleanerThreadPool = null
+    }
+  }
 }
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8a4318faf..ed7865e19 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -268,8 +268,9 @@ public class SparkShuffleManager implements ShuffleManager {
       _sortShuffleManager.stop();
       _sortShuffleManager = null;
     }
-    if (celebornConf.clientFetchCleanFailedShuffle()) {
-      failedShuffleCleaner.reset();
+    if (failedShuffleCleaner != null) {
+      failedShuffleCleaner.stop();
+      failedShuffleCleaner = null;
     }
   }
 

Reply via email to