This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c146c925b3 [SPARK-45762][CORE] Support shuffle managers defined in 
user jars by changing startup order
7c146c925b3 is described below

commit 7c146c925b363fc67eedc7411068f24dd780b583
Author: Alessandro Bellina <abell...@nvidia.com>
AuthorDate: Thu Nov 16 21:06:06 2023 -0600

    [SPARK-45762][CORE] Support shuffle managers defined in user jars by 
changing startup order
    
    ### What changes were proposed in this pull request?
    As reported here https://issues.apache.org/jira/browse/SPARK-45762, 
`ShuffleManager` instances defined in a user jar cannot be used in all cases, 
unless specified in the `extraClassPath`. We would like to avoid adding extra 
configurations if this instance is already included in a jar passed via 
`--jars`.
    
    Proposed changes:
    
    Refactor code so we initialize the `ShuffleManager` later, after jars have 
been localized. This is especially necessary in the executor, where we would 
need to move this initialization until after the `replClassLoader` is updated 
with jars passed in `--jars`.
    
    Before this change, the `ShuffleManager` is instantiated at `SparkEnv` 
creation. Having to instantiate the `ShuffleManager` this early doesn't work, 
because user jars have not been localized in all scenarios, and we will fail to 
load the `ShuffleManager` defined in `--jars`. We propose moving the 
`ShuffleManager` instantiation to `SparkContext` on the driver, and `Executor`.
    
    ### Why are the changes needed?
    This is not a new API but a change of startup order. The changed are needed 
to improve the user experience for the user by reducing extra configurations 
depending on how a spark application is launched.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, but it's backwards compatible. Users no longer need to specify a 
`ShuffleManager` jar in `extraClassPath`, but they are able to if they desire.
    
    This change is not binary compatible with Spark 3.5.0 (see MIMA comments 
below). I have added a rule to MimaExcludes to handle it 
https://github.com/apache/spark/pull/43627/commits/970bff4edc6ba14d8de78aa175415e204d6a627b
    
    ### How was this patch tested?
    Added a unit test showing that a test `ShuffleManager` is available after 
`--jars` are passed, but not without (using local-cluster mode).
    
    Tested manually with standalone mode, local-cluster mode, yarn client and 
cluster mode, k8s.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43627 from abellina/shuffle_manager_initialization_order.
    
    Authored-by: Alessandro Bellina <abell...@nvidia.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  1 +
 .../src/main/scala/org/apache/spark/SparkEnv.scala | 38 ++++++-----
 .../scala/org/apache/spark/executor/Executor.scala | 13 +++-
 .../org/apache/spark/shuffle/ShuffleManager.scala  | 26 +++++++-
 .../org/apache/spark/storage/BlockManager.scala    | 14 +++-
 .../spark/storage/BlockManagerMasterEndpoint.scala |  9 ++-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 77 ++++++++++++++++++++++
 .../apache/spark/deploy/SparkSubmitTestUtils.scala |  6 +-
 project/MimaExcludes.scala                         |  4 +-
 9 files changed, 160 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 73dcaffa6ce..ed00baa01d6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -577,6 +577,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Initialize any plugins before the task scheduler is initialized.
     _plugins = PluginContainer(this, _resources.asJava)
+    _env.initializeShuffleManager()
 
     // Create and start the scheduler
     val (sched, ts) = SparkContext.createTaskScheduler(this, master)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 3277f86e367..94a4debd026 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -18,13 +18,13 @@
 package org.apache.spark
 
 import java.io.File
-import java.util.Locale
 
 import scala.collection.concurrent
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.util.Properties
 
+import com.google.common.base.Preconditions
 import com.google.common.cache.CacheBuilder
 import org.apache.hadoop.conf.Configuration
 
@@ -63,7 +63,6 @@ class SparkEnv (
     val closureSerializer: Serializer,
     val serializerManager: SerializerManager,
     val mapOutputTracker: MapOutputTracker,
-    val shuffleManager: ShuffleManager,
     val broadcastManager: BroadcastManager,
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
@@ -72,6 +71,12 @@ class SparkEnv (
     val outputCommitCoordinator: OutputCommitCoordinator,
     val conf: SparkConf) extends Logging {
 
+  // We initialize the ShuffleManager later in SparkContext and Executor to 
allow
+  // user jars to define custom ShuffleManagers.
+  private var _shuffleManager: ShuffleManager = _
+
+  def shuffleManager: ShuffleManager = _shuffleManager
+
   @volatile private[spark] var isStopped = false
 
   /**
@@ -100,7 +105,9 @@ class SparkEnv (
       isStopped = true
       pythonWorkers.values.foreach(_.stop())
       mapOutputTracker.stop()
-      shuffleManager.stop()
+      if (shuffleManager != null) {
+        shuffleManager.stop()
+      }
       broadcastManager.stop()
       blockManager.stop()
       blockManager.master.stop()
@@ -186,6 +193,12 @@ class SparkEnv (
     releasePythonWorker(
       pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, 
envVars, worker)
   }
+
+  private[spark] def initializeShuffleManager(): Unit = {
+    Preconditions.checkState(null == _shuffleManager,
+      "Shuffle manager already initialized to %s", _shuffleManager)
+    _shuffleManager = ShuffleManager.create(conf, executorId == 
SparkContext.DRIVER_IDENTIFIER)
+  }
 }
 
 object SparkEnv extends Logging {
@@ -356,16 +369,6 @@ object SparkEnv extends Logging {
       new MapOutputTrackerMasterEndpoint(
         rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
 
-    // Let the user specify short names for shuffle managers
-    val shortShuffleMgrNames = Map(
-      "sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
-      "tungsten-sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
-    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
-    val shuffleMgrClass =
-      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), 
shuffleMgrName)
-    val shuffleManager = 
Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
-      shuffleMgrClass, conf, isDriver)
-
     val memoryManager: MemoryManager = UnifiedMemoryManager(conf, 
numUsableCores)
 
     val blockManagerPort = if (isDriver) {
@@ -403,7 +406,7 @@ object SparkEnv extends Logging {
             None
           }, blockManagerInfo,
           mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
-          shuffleManager,
+          _shuffleManager = null,
           isDriver)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
@@ -416,6 +419,10 @@ object SparkEnv extends Logging {
         advertiseAddress, blockManagerPort, numUsableCores, 
blockManagerMaster.driverEndpoint)
 
     // NB: blockManager is not valid until initialize() is called later.
+    //     SPARK-45762 introduces a change where the ShuffleManager is 
initialized later
+    //     in the SparkContext and Executor, to allow for custom 
ShuffleManagers defined
+    //     in user jars. The BlockManager uses a lazy val to obtain the
+    //     shuffleManager from the SparkEnv.
     val blockManager = new BlockManager(
       executorId,
       rpcEnv,
@@ -424,7 +431,7 @@ object SparkEnv extends Logging {
       conf,
       memoryManager,
       mapOutputTracker,
-      shuffleManager,
+      _shuffleManager = null,
       blockTransferService,
       securityManager,
       externalShuffleClient)
@@ -463,7 +470,6 @@ object SparkEnv extends Logging {
       closureSerializer,
       serializerManager,
       mapOutputTracker,
-      shuffleManager,
       broadcastManager,
       blockManager,
       securityManager,
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e340667173b..f2a65aab1ba 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -330,14 +330,21 @@ private[spark] class Executor(
     }
   updateDependencies(initialUserFiles, initialUserJars, initialUserArchives, 
defaultSessionState)
 
-  // Plugins need to load using a class loader that includes the executor's 
user classpath.
-  // Plugins also needs to be initialized after the heartbeater started
-  // to avoid blocking to send heartbeat (see SPARK-32175).
+  // Plugins and shuffle managers need to load using a class loader that 
includes the executor's
+  // user classpath. Plugins also needs to be initialized after the 
heartbeater started
+  // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762).
   private val plugins: Option[PluginContainer] =
     Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
       PluginContainer(env, resources.asJava)
     }
 
+  // Skip local mode because the ShuffleManager is already initialized
+  if (!isLocal) {
+    Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
+      env.initializeShuffleManager()
+    }
+  }
+
   metricsPoller.start()
 
   private[executor] def numRunningTasks: Int = runningTasks.size()
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 4e2183451c2..8e4636cfefb 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -17,7 +17,11 @@
 
 package org.apache.spark.shuffle
 
-import org.apache.spark.{ShuffleDependency, TaskContext}
+import java.util.Locale
+
+import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
+import org.apache.spark.internal.config
+import org.apache.spark.util.Utils
 
 /**
  * Pluggable interface for shuffle systems. A ShuffleManager is created in 
SparkEnv on the driver
@@ -94,3 +98,23 @@ private[spark] trait ShuffleManager {
   /** Shut down this ShuffleManager. */
   def stop(): Unit
 }
+
+/**
+ * Utility companion object to create a ShuffleManager given a spark 
configuration.
+ */
+private[spark] object ShuffleManager {
+  def create(conf: SparkConf, isDriver: Boolean): ShuffleManager = {
+    Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
+      getShuffleManagerClassName(conf), conf, isDriver)
+  }
+
+  def getShuffleManagerClassName(conf: SparkConf): String = {
+    val shortShuffleMgrNames = Map(
+      "sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
+      "tungsten-sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
+
+    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
+    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), 
shuffleMgrName)
+  }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index da48af90a9c..e64c33382dc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -187,12 +187,17 @@ private[spark] class BlockManager(
     val conf: SparkConf,
     memoryManager: MemoryManager,
     mapOutputTracker: MapOutputTracker,
-    shuffleManager: ShuffleManager,
+    private val _shuffleManager: ShuffleManager,
     val blockTransferService: BlockTransferService,
     securityManager: SecurityManager,
     externalBlockStoreClient: Option[ExternalBlockStoreClient])
   extends BlockDataManager with BlockEvictionHandler with Logging {
 
+  // We initialize the ShuffleManager later in SparkContext and Executor, to 
allow
+  // user jars to define custom ShuffleManagers, as such `_shuffleManager` 
will be null here
+  // (except for tests) and we ask for the instance from the SparkEnv.
+  private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+
   // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)`
   private[spark] val externalShuffleServiceEnabled: Boolean = 
externalBlockStoreClient.isDefined
   private val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
@@ -587,12 +592,15 @@ private[spark] class BlockManager(
 
   private def registerWithExternalShuffleServer(): Unit = {
     logInfo("Registering executor with local external shuffle service.")
+    // we obtain the class name from the configuration, instead of the 
ShuffleManager
+    // instance because the ShuffleManager has not been created at this point.
+    val shuffleMgrClass = ShuffleManager.getShuffleManagerClassName(conf)
     val shuffleManagerMeta =
       if (Utils.isPushBasedShuffleEnabled(conf, isDriver = isDriver, 
checkSerializer = false)) {
-        s"${shuffleManager.getClass.getName}:" +
+        s"${shuffleMgrClass}:" +
           s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
       } else {
-        shuffleManager.getClass.getName
+        shuffleMgrClass
       }
     val shuffleConfig = new ExecutorShuffleInfo(
       diskBlockManager.localDirsString,
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 0d52b66a400..17cd0891532 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 
 import com.google.common.cache.CacheBuilder
 
-import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, 
SparkEnv}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED
@@ -55,10 +55,15 @@ class BlockManagerMasterEndpoint(
     externalBlockStoreClient: Option[ExternalBlockStoreClient],
     blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
     mapOutputTracker: MapOutputTrackerMaster,
-    shuffleManager: ShuffleManager,
+    private val _shuffleManager: ShuffleManager,
     isDriver: Boolean)
   extends IsolatedThreadSafeRpcEndpoint with Logging {
 
+  // We initialize the ShuffleManager later in SparkContext and Executor, to 
allow
+  // user jars to define custom ShuffleManagers, as such `_shuffleManager` 
will be null here
+  // (except for tests) and we ask for the instance from the SparkEnv.
+  private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+
   // Mapping from executor id to the block manager's local disk directories.
   private val executorIdToLocalDirs =
     CacheBuilder
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 7ebb0165e62..a032e9aa16b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1414,6 +1414,83 @@ class SparkSubmitSuite
     runSparkSubmit(args)
   }
 
+  test("SPARK-45762: The ShuffleManager plugin to use can be defined in a user 
jar") {
+    val shuffleManagerBody = """
+      |@Override
+      |public <K, V, C> org.apache.spark.shuffle.ShuffleHandle registerShuffle(
+      |    int shuffleId,
+      |    org.apache.spark.ShuffleDependency<K, V, C> dependency) {
+      |  throw new java.lang.UnsupportedOperationException("This is a test 
ShuffleManager!");
+      |}
+      |
+      |@Override
+      |public <K, V> org.apache.spark.shuffle.ShuffleWriter<K, V> getWriter(
+      |    org.apache.spark.shuffle.ShuffleHandle handle,
+      |    long mapId,
+      |    org.apache.spark.TaskContext context,
+      |    org.apache.spark.shuffle.ShuffleWriteMetricsReporter metrics) {
+      |  throw new java.lang.UnsupportedOperationException("This is a test 
ShuffleManager!");
+      |}
+      |
+      |@Override
+      |public <K, C> org.apache.spark.shuffle.ShuffleReader<K, C> getReader(
+      |    org.apache.spark.shuffle.ShuffleHandle handle,
+      |    int startMapIndex,
+      |    int endMapIndex,
+      |    int startPartition,
+      |    int endPartition,
+      |    org.apache.spark.TaskContext context,
+      |    org.apache.spark.shuffle.ShuffleReadMetricsReporter metrics) {
+      |  throw new java.lang.UnsupportedOperationException("This is a test 
ShuffleManager!");
+      |}
+      |
+      |@Override
+      |public boolean unregisterShuffle(int shuffleId) {
+      |  throw new java.lang.UnsupportedOperationException("This is a test 
ShuffleManager!");
+      |}
+      |
+      |@Override
+      |public org.apache.spark.shuffle.ShuffleBlockResolver 
shuffleBlockResolver() {
+      |  throw new java.lang.UnsupportedOperationException("This is a test 
ShuffleManager!");
+      |}
+      |
+      |@Override
+      |public void stop() {
+      |}
+  """.stripMargin
+
+    val tempDir = Utils.createTempDir()
+    val compiledShuffleManager = TestUtils.createCompiledClass(
+      "TestShuffleManager",
+      tempDir,
+      "",
+      null,
+      Seq.empty,
+      Seq("org.apache.spark.shuffle.ShuffleManager"),
+      shuffleManagerBody)
+
+    val jarUrl = TestUtils.createJar(
+      Seq(compiledShuffleManager),
+      new File(tempDir, "testplugin.jar"))
+
+    val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+    val argsBase = Seq(
+      "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
+      "--name", "testApp",
+      "--master", "local-cluster[1,1,1024]",
+      "--conf", "spark.shuffle.manager=TestShuffleManager",
+      "--conf", "spark.ui.enabled=false")
+
+    val argsError = argsBase :+ unusedJar.toString
+    // check process error exit code
+    assertResult(1)(runSparkSubmit(argsError, expectFailure = true))
+
+    val argsSuccess = (argsBase ++ Seq("--jars", jarUrl.toString)) :+ 
unusedJar.toString
+    // check process success exit code
+    assertResult(0)(
+      runSparkSubmit(argsSuccess, expectFailure = false))
+  }
+
   private def testRemoteResources(
       enableHttpFs: Boolean,
       forceDownloadSchemes: Seq[String] = Nil): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
index 932e972374c..e38be4cefc0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala
@@ -42,7 +42,8 @@ trait SparkSubmitTestUtils extends SparkFunSuite with 
TimeLimits {
       args: Seq[String],
       sparkHomeOpt: Option[String] = None,
       timeout: Span = defaultSparkSubmitTimeout,
-      isSparkTesting: Boolean = true): Unit = {
+      isSparkTesting: Boolean = true,
+      expectFailure: Boolean = false): Int = {
     val sparkHome = sparkHomeOpt.getOrElse(
       sys.props.getOrElse("spark.test.home", fail("spark.test.home is not 
set!")))
     val history = ArrayBuffer.empty[String]
@@ -77,7 +78,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with 
TimeLimits {
 
     try {
       val exitCode = failAfter(timeout) { process.waitFor() }
-      if (exitCode != 0) {
+      if (exitCode != 0 && !expectFailure) {
         // include logs in output. Note that logging is async and may not have 
completed
         // at the time this exception is raised
         Thread.sleep(1000)
@@ -90,6 +91,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with 
TimeLimits {
            """.stripMargin
         }
       }
+      exitCode
     } catch {
       case to: TestFailedDueToTimeoutException =>
         val historyLog = history.mkString("\n")
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c0275e16272..d080b16fdc5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -52,7 +52,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.QueryContext.callSite"),
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.QueryContext.summary"),
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI$default$3"),
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI")
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.types.Decimal.fromStringANSI"),
+    // [SPARK-45762][CORE] Support shuffle managers defined in user jars by 
changing startup order
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.this")
   )
 
   // Default exclude rules


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to