This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new bce61f54c Fix CometShuffleManager hang by deferring SparkEnv access
(#3002)
bce61f54c is described below
commit bce61f54ce2450f76223d45c3305802c3bb77601
Author: Shekhar Prasad Rajak <[email protected]>
AuthorDate: Mon Dec 29 20:58:29 2025 +0530
Fix CometShuffleManager hang by deferring SparkEnv access (#3002)
---
.github/workflows/spark_sql_test.yml | 4 --
.../execution/shuffle/CometShuffleManager.scala | 45 +++++++++++++++-------
2 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/.github/workflows/spark_sql_test.yml
b/.github/workflows/spark_sql_test.yml
index 3d7aa2e2f..d143ef83a 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -59,10 +59,6 @@ jobs:
- {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l
org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
- {name: "sql_hive-2", args1: "", args2: "hive/testOnly * -- -n
org.apache.spark.tags.ExtendedHiveTest"}
- {name: "sql_hive-3", args1: "", args2: "hive/testOnly * -- -n
org.apache.spark.tags.SlowHiveTest"}
- # Skip sql_hive-1 for Spark 4.0 due to
https://github.com/apache/datafusion-comet/issues/2946
- exclude:
- - spark-version: {short: '4.0', full: '4.0.1', java: 17}
- module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * --
-l org.apache.spark.tags.ExtendedHiveTest -l
org.apache.spark.tags.SlowHiveTest"}
fail-fast: false
name: spark-sql-${{ matrix.module.name }}/${{ matrix.os }}/spark-${{
matrix.spark-version.full }}/java-${{ matrix.spark-version.java }}
runs-on: ${{ matrix.os }}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
index 927e30932..aa47dfa16 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleManager.scala
@@ -58,7 +58,37 @@ class CometShuffleManager(conf: SparkConf) extends
ShuffleManager with Logging {
*/
private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int,
OpenHashSet[Long]]()
- private lazy val shuffleExecutorComponents =
loadShuffleExecutorComponents(conf)
+ // Lazy initialization to avoid accessing SparkEnv.get during ShuffleManager
construction,
+ // which can cause hangs when SparkEnv is not fully initialized (e.g.,
during Hive metastore ops)
+ // This is only initialized when getWriter/getReader is called (during task
execution),
+ // at which point SparkEnv should be fully available
+ @volatile private var _shuffleExecutorComponents: ShuffleExecutorComponents
= _
+
+ private def shuffleExecutorComponents: ShuffleExecutorComponents = {
+ if (_shuffleExecutorComponents == null) {
+ synchronized {
+ if (_shuffleExecutorComponents == null) {
+ val executorComponents =
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
+ val extraConfigs =
+
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
+ // SparkEnv.get should be available when getWriter/getReader is
called
+ // (during task execution), but check for null to avoid hangs
+ val env = SparkEnv.get
+ if (env == null) {
+ throw new IllegalStateException(
+ "SparkEnv.get is null during shuffleExecutorComponents
initialization. " +
+ "This may indicate a timing issue with SparkEnv
initialization.")
+ }
+ executorComponents.initializeExecutor(
+ conf.getAppId,
+ env.executorId,
+ extraConfigs.asJava)
+ _shuffleExecutorComponents = executorComponents
+ }
+ }
+ }
+ _shuffleExecutorComponents
+ }
override val shuffleBlockResolver: IndexShuffleBlockResolver = {
// The patch versions of Spark 3.4 have different constructor signatures:
@@ -253,19 +283,6 @@ class CometShuffleManager(conf: SparkConf) extends
ShuffleManager with Logging {
object CometShuffleManager extends Logging {
- /**
- * Loads executor components for shuffle data IO.
- */
- private def loadShuffleExecutorComponents(conf: SparkConf):
ShuffleExecutorComponents = {
- val executorComponents =
ShuffleDataIOUtils.loadShuffleDataIO(conf).executor()
- val extraConfigs =
conf.getAllWithPrefix(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX).toMap
- executorComponents.initializeExecutor(
- conf.getAppId,
- SparkEnv.get.executorId,
- extraConfigs.asJava)
- executorComponents
- }
-
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]):
Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]