This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 6c72f2680 [CELEBORN-2088] Fix NPE if
`celeborn.client.spark.fetch.cleanFailedShuffle` enabled
6c72f2680 is described below
commit 6c72f26804be069001868b73a8e8db8700f7d109
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Jul 31 21:15:51 2025 -0700
[CELEBORN-2088] Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle`
enabled
### What changes were proposed in this pull request?
Fix NPE if `celeborn.client.spark.fetch.cleanFailedShuffle` is true.
This PR also refine the code for `FailedShuffleCleaner`.
### Why are the changes needed?
`failedShuffleCleaner` is null in executor end.
```
25/07/29 17:58:40 ERROR SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[CoarseGrainedExecutorBackend-stop-executor,5,main]
java.lang.NullPointerException: Cannot invoke
"org.apache.celeborn.spark.FailedShuffleCleaner.reset()" because
"this.failedShuffleCleaner" is null
at
org.apache.spark.shuffle.celeborn.SparkShuffleManager.stop(SparkShuffleManager.java:272)
~[celeborn-client-spark-3-shaded_2.12-0.6.0-rc3.jar:?]
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
Closes #3401 from turboFei/fix_npe_cleaner.
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 20a629a432d94d98c7d21dd34347565249922563)
Signed-off-by: Wang, Fei <[email protected]>
---
.../celeborn/spark/FailedShuffleCleaner.scala | 41 ++++++++++------------
.../shuffle/celeborn/SparkShuffleManager.java | 5 +--
2 files changed, 22 insertions(+), 24 deletions(-)
diff --git
a/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
b/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
index e88f6f640..a2e88ba53 100644
---
a/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
+++
b/client-spark/common/src/main/scala/org/apache/celeborn/spark/FailedShuffleCleaner.scala
@@ -34,28 +34,10 @@ private[celeborn] class
FailedShuffleCleaner(lifecycleManager: LifecycleManager)
private val shufflesToBeCleaned = new LinkedBlockingQueue[Int]()
private val cleanedShuffleIds = new mutable.HashSet[Int]
- private lazy val cleanInterval =
- lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
-
- // for test
- def reset(): Unit = {
- shufflesToBeCleaned.clear()
- cleanedShuffleIds.clear()
- if (cleanerThreadPool != null) {
- cleanerThreadPool.shutdownNow()
- cleanerThreadPool = null
- }
- }
-
- def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
- val Array(appShuffleId, _, _) =
SparkCommonUtils.decodeAppShuffleIdentifier(
- appShuffleIdentifier)
- lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
- case (_, (celebornShuffleId, _)) =>
shufflesToBeCleaned.put(celebornShuffleId)
- }
- }
+ private val cleanInterval =
lifecycleManager.conf.clientFetchCleanFailedShuffleIntervalMS
+ private var cleanerThreadPool: ScheduledExecutorService = _
- def init(): Unit = {
+ private def init(): Unit = {
cleanerThreadPool = ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"failedShuffleCleanerThreadPool")
cleanerThreadPool.scheduleWithFixedDelay(
@@ -85,9 +67,24 @@ private[celeborn] class
FailedShuffleCleaner(lifecycleManager: LifecycleManager)
init()
+ def addShuffleIdToBeCleaned(appShuffleIdentifier: String): Unit = {
+ val Array(appShuffleId, _, _) =
SparkCommonUtils.decodeAppShuffleIdentifier(
+ appShuffleIdentifier)
+ lifecycleManager.getShuffleIdMapping.get(appShuffleId.toInt).foreach {
+ case (_, (celebornShuffleId, _)) =>
shufflesToBeCleaned.put(celebornShuffleId)
+ }
+ }
+
def removeCleanedShuffleId(celebornShuffleId: Int): Unit = {
cleanedShuffleIds.remove(celebornShuffleId)
}
- private var cleanerThreadPool: ScheduledExecutorService = _
+ def stop(): Unit = {
+ shufflesToBeCleaned.clear()
+ cleanedShuffleIds.clear()
+ if (cleanerThreadPool != null) {
+ ThreadUtils.shutdown(cleanerThreadPool)
+ cleanerThreadPool = null
+ }
+ }
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
index 8a4318faf..ed7865e19 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java
@@ -268,8 +268,9 @@ public class SparkShuffleManager implements ShuffleManager {
_sortShuffleManager.stop();
_sortShuffleManager = null;
}
- if (celebornConf.clientFetchCleanFailedShuffle()) {
- failedShuffleCleaner.reset();
+ if (failedShuffleCleaner != null) {
+ failedShuffleCleaner.stop();
+ failedShuffleCleaner = null;
}
}