Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1936933162 Thanks for understanding @dongjoon-hyun ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
dongjoon-hyun commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1931550906 @mridulm . Of course, it's legit if it's not easy or there is no other way. Also, we have a similar breaking proposal, #45052 , too. While reviewing that PR, I double-checked this PR briefly. I'm totally fine if this is inevitable here and there. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1931474172 @dongjoon-hyun, tt is `@DeveloperApi` from point of view of usage - `SparkEnv` is not expected to be created by users, as some of the constructor parameters are not externally visible (`RpcEnv`, for example, cannot be created as it is `private[spark]`). There have been changes to its constructor in the past as well, after it was marked `@DeveloperApi` - though to be fair, these were a while back. In general, I am conflicted about trying to preserve compatibility for things which are clearly private to spark - it inhibits the ability for the project to evolve: especially around major version boundaries (though we do have a lot of these instances where we try to maintain compatibility). Given how long `SparkEnv` has been around, I can see valid case being made for adding a constructor which preserves earlier signature. Thiughts @tgravescs ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1816533283 Thanks @mridulm @tgravescs and @beliefer for the reviews and rework! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
tgravescs commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1816518927 Thanks @mridulm @abellina -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1815678388 Merged to master. Thanks for working on this @abellina ! Thanks for the reviews @tgravescs, @beliefer :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm closed pull request #43627: [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order URL: https://github.com/apache/spark/pull/43627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1814730910 There were some CI failures around missing dependencies in the documentation build (all tests are passing otherwise). So I have upmerged. I also tweaked a couple of comments here: https://github.com/apache/spark/pull/43627/commits/5480faa160bb704f9eb85d51e3fa64d15305268e -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1394870890 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -415,6 +418,11 @@ object SparkEnv extends Logging { advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) // NB: blockManager is not valid until initialize() is called later. +// SPARK-45762 introduces a change where the ShuffleManager is initialized later +// in the SparkContext and Executor, to allow for custom ShuffleManagers defined +// in user jars. In the executor, the BlockManager uses a lazy val to obtain the +// shuffleManager from the SparkEnv. In the driver, the SparkEnv's shuffleManager Review Comment: Thanks @tgravescs. Handled both comments here: https://github.com/apache/spark/pull/43627/commits/6d002a361ac2c1dfad48ee530766c9b0a605696f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
tgravescs commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1394837773 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -415,6 +418,11 @@ object SparkEnv extends Logging { advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) // NB: blockManager is not valid until initialize() is called later. +// SPARK-45762 introduces a change where the ShuffleManager is initialized later +// in the SparkContext and Executor, to allow for custom ShuffleManagers defined +// in user jars. In the executor, the BlockManager uses a lazy val to obtain the +// shuffleManager from the SparkEnv. In the driver, the SparkEnv's shuffleManager Review Comment: I think this comment it no longer true. Driver SparkEnv shufflemanager is created after the plugin initialized. ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -71,6 +70,12 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { + // We initialize the ShuffleManager later in SparkContext, and Executor, to allow Review Comment: ```suggestion // We initialize the ShuffleManager later in SparkContext and Executor to allow ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1813246792 Thanks @mridulm, yes the commits make sense, it brings back the late initialization in the driver. I tested the change, the main difference from your patch @mridulm is I had to still get the shuffle manage class names using the method we added to the `ShuffleManager` object here https://github.com/apache/spark/pull/43627/files#diff-42a673b8fa5f2b999371dc97a5de7ebd2c2ec19447353d39efb7e8ebc012fe32R592, because the `shuffleManager` is not set yet at this point. @tgravescs fyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1806707560 @mridulm updated. Thanks for your comments and patch. I took most of it verbatim and cleaned up a couple of things. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1803110054 > On the SPIP, I did send it to dev as a DISCUSS thread on November 4. I would like to get input on that as well. You are right, I do [see it here](https://lists.apache.org/thread/m5pv7vzg3109ts706b13t1c9p5zys2rq) - looks like I missed it :-( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1803024221 @mridulm thanks for the comments. I think they mostly work and I’m testing it at the moment. I’ll push something either tonight or tomorrow am. On the SPIP, I did send it to dev as a DISCUSS thread on November 4. I would like to get input on that as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386166263 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -277,6 +287,11 @@ object SparkEnv extends Logging { hostname, numCores, ioEncryptionKey, isLocal) } + private def shuffleBlockGetterFn(shuffleId: Int, mapId: Long): Seq[BlockId] = { +val env = SparkEnv.get +env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapId) + } + Review Comment: Looking at this later, preserving this is mainly to minimize test code changes, and allow for a way to override it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386016225 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -185,6 +191,10 @@ class SparkEnv ( releasePythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, worker) } + + private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { +_shuffleManager = shuffleManager + } Review Comment: Instead of setting it, expose an initialize method. We use `initializeShuffleManager` in driver/executor after classpath has been fixed up (see more below in comment for `shuffleBlockGetterFn`). ```suggestion private[spark] def initiailzeShuffleManager(): Unit = { Preconditions.checkState(null == _shuffleManager, "Shuffle manager already initialized to %s", _shuffleManager) _shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1801296392 @abellina, given [SPARK-45792](https://issues.apache.org/jira/browse/SPARK-45792) is an SPIP, can you please surface in spark-dev@ and initiate a discussion on it ? I dont remember seeing it there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386033849 ## core/src/main/scala/org/apache/spark/executor/Executor.scala: ## @@ -329,14 +329,22 @@ private[spark] class Executor( } updateDependencies(initialUserFiles, initialUserJars, initialUserArchives, defaultSessionState) - // Plugins need to load using a class loader that includes the executor's user classpath. - // Plugins also needs to be initialized after the heartbeater started - // to avoid blocking to send heartbeat (see SPARK-32175). + // Plugins and shuffle managers need to load using a class loader that includes the executor's + // user classpath. Plugins also needs to be initialized after the heartbeater started + // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762). private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(defaultSessionState.replClassLoader) { PluginContainer(env, resources.asJava) } + private val shuffleManager = +Utils.withContextClassLoader(defaultSessionState.replClassLoader) { + ShuffleManager.create(conf, true) +} + + env.setShuffleManager(shuffleManager) + env.blockManager.setShuffleManager(shuffleManager) Review Comment: ```suggestion if (! isLocal) { Utils.withContextClassLoader(defaultSessionState.replClassLoader) { env.initiailzeShuffleManager() } } ``` I have not tested this, but I think this should work. If it does not, most of my suggestions will need to be discarded :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386032325 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -355,16 +370,6 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) -// Let the user specify short names for shuffle managers -val shortShuffleMgrNames = Map( - "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) -val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) -val shuffleMgrClass = - shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) -val shuffleManager = Utils.instantiateSerializerOrShuffleManager[ShuffleManager]( - shuffleMgrClass, conf, isDriver) - val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) Review Comment: ~Instead, do:~ ~`val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null`~ ~and keep rest of this method the same.~ Simply pass `shuffleManager = null` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386166263 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -277,6 +287,11 @@ object SparkEnv extends Logging { hostname, numCores, ioEncryptionKey, isLocal) } + private def shuffleBlockGetterFn(shuffleId: Int, mapId: Long): Seq[BlockId] = { +val env = SparkEnv.get +env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapId) + } + Review Comment: Looking at this later, preserving this is mainly to minimize test code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386011282 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging { } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) +_env.blockManager.setShuffleManager(shuffleManager) FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf) Review Comment: We only need `_env.initiailzeShuffleManager()` (to replace `env.setShuffleManager`) in this class - we can revert the rest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386016225 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -185,6 +191,10 @@ class SparkEnv ( releasePythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, worker) } + + private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { +_shuffleManager = shuffleManager + } Review Comment: Instead of setting it, expose an initialize method. In driver, we should directly set the variable as part of create - while we use `initializeShuffleManager` for executor after classpath has been fixed up (see more below in comment for `shuffleBlockGetterFn`). ```suggestion private[spark] def initiailzeShuffleManager(): Unit = { Preconditions.checkState(null == _shuffleManager, "Shuffle manager already initialized to %s", _shuffleManager) _shuffleManager = ShuffleManager.create(conf, executorId == SparkContext.DRIVER_IDENTIFIER) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386011282 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging { } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) +_env.blockManager.setShuffleManager(shuffleManager) FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf) Review Comment: The changes in this class are not related to driver classpath, and so we can revert all changes here and move the logic to `SparkEnv.create` to initialize `shuffleManager` when it is `driver`. ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -185,6 +191,10 @@ class SparkEnv ( releasePythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, worker) } + + private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { +_shuffleManager = shuffleManager + } Review Comment: Instead of setting it, expose an initialize method. In driver, we should directly set the variable as part of create - while we use `initializeShuffleManager` for executor after classpath has been fixed up (see more below in comment for `shuffleBlockGetterFn`). ```suggestion private[spark] def initiailzeShuffleManager(): Unit = { Preconditions.checkState(null == _shuffleManager, "Shuffle manager already initialized") // Must not be driver Preconditions.checkState(executorId != SparkContext.DRIVER_IDENTIFIER, "Should not be called in driver") _shuffleManager = ShuffleManager.create(_conf, false) } ``` ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -355,16 +370,6 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) -// Let the user specify short names for shuffle managers -val shortShuffleMgrNames = Map( - "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) -val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) -val shuffleMgrClass = - shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) -val shuffleManager = Utils.instantiateSerializerOrShuffleManager[ShuffleManager]( - shuffleMgrClass, conf, isDriver) - val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) Review Comment: Instead, do: `val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null` and keep rest of this method the same. ## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ## @@ -81,9 +81,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) Review Comment: With the proposed changes, we can revert all changes to this file ## core/src/main/scala/org/apache/spark/executor/Executor.scala: ## @@ -329,14 +329,22 @@ private[spark] class Executor( } updateDependencies(initialUserFiles, initialUserJars, initialUserArchives, defaultSessionState) - // Plugins need to load using a class loader that includes the executor's user classpath. - // Plugins also needs to be initialized after the heartbeater started - // to avoid blocking to send heartbeat (see SPARK-32175). + // Plugins and shuffle managers need to load using a class loader that includes the executor's + // user classpath. Plugins also needs to be initialized after the heartbeater started + // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762). private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(defaultSessionState.replClassLoader) { PluginContainer(env, resources.asJava) } + private val shuffleManager = +Utils.withContextClassLoader(defaultSessionState.replClassLoader) { + ShuffleManager.create(conf, true) +} + + env.setShuffleManager(shuffleManager) + env.blockManager.setShuffleManager(shuffleManager) Review Comment: ```suggestion Utils.withContextClassLoader(defaultSessionState.replClassLoader) { env.initiailzeShuffleManager() } ``` I have not tested this, but I think this should work. If it does not, most of my suggestions will need to be discarded :-) ## streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala: ## @@ -93,7 +93,8 @@ abstract class
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1797888003 @tgravescs The SparkEnv related change is what gave me pause ... I am less concerned about the Executor side of things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1796931246 @tgravescs @mridulm @beliefer I made a small tweak where the `executorEnvs` map in the `SparkContext` is populated with the configuration prefix `spark.executorEnv.*` after the driver plugin is instantiated (see the last two commits). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
tgravescs commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1796408368 I agree that ideally we would finish SPARK-25299, I don't see that happening anytime soon. I also don't think it covers the case of people replacing the entire ShuffleManager vs just the storage piece. ShuffleManager API isn't public either but we have multiple implementations doing that now (Ubers RSS, project Gluten, Spark Rapids, I thought Cosco was although its not open source, etc). One note is that issue SPARK-25299 had a sub issue that was going to use the SparkPlugin for configuration https://issues.apache.org/jira/browse/SPARK-30033/https://github.com/apache/spark/pull/26670 and had a pr that mentions the weird interaction with initialization and it works around it in a different way. Overall while there are a bunch of changes here most of it is just moving initialization stuff around that shouldn't impact anything else. The one thing that is user impacting is the SparkEnv api change, which if we only do with 4.0 shouldn't be a big deal, unless there is some usage I'm not aware of. @mridulm Is there a specific part you are concerned with? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1794940813 @beliefer thanks for the comments, I handled most of your comments in the last commit (except for the one about the function passing, but we can discuss that one more there). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1383413842 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -402,7 +405,7 @@ object SparkEnv extends Logging { None }, blockManagerInfo, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], - shuffleManager, + shuffleBlockGetterFn, Review Comment: I see this being an issue in tests where the `SparkEnv` would not be set, so now I'd have to make sure that the env is set and cleared in the tests. That said, if you feel strongly about this, I can look at this more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1793950325 While I am not opposed to a way to create a short name for shuffle manager, if it results in nontrivial changes to Spark, I am not very inclined towards it. IMO this should be something that is better handled in context of SPARK-25299, which unfortunately is partly done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
beliefer commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1382560338 ## core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala: ## @@ -17,7 +17,30 @@ package org.apache.spark.shuffle -import org.apache.spark.{ShuffleDependency, TaskContext} +import java.util.Locale + +import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext} +import org.apache.spark.internal.config +import org.apache.spark.util.Utils + +/** + * Utility companion object to create a ShuffleManager given a spark configuration. + */ +private[spark] object ShuffleManager { Review Comment: Shall we put the companion object at the last? ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -402,7 +405,7 @@ object SparkEnv extends Logging { None }, blockManagerInfo, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], - shuffleManager, + shuffleBlockGetterFn, Review Comment: Why not define `shuffleBlockGetterFn` in `BlockManagerMasterEndpoint`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1793562824 @mridulm thanks for the comments. I have published a SPIP here https://issues.apache.org/jira/browse/SPARK-45792 that aims to show the bigger picture. Without the change of initialization order in this PR, we couldn't carry out the SPIP linked, because the ShuffleManager is initialized really early in the Executors today. I split this up into a separate PR to not introduce too much change at once, but your point is well taken. I would like to hear your thoughts around the SPIP and how we can proceed. Note there is an alternative I can easily try and that is to instantiate a ShuffleManager wrapper, which would remove the change to the SparkEnv (we would instantiate the wrapper instead of the actual impl). We could then set the impl on this wrapper at a later time, when jars are localized and plugins are loaded. This felt a bit worse than the approach I have in this PR, but I am happy to hear opinions. Thanks again!! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1382469931 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -71,6 +69,10 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { + // We initialize the ShuffleManager later, in SparkContext and Executor, to allow + // user jars to define custom ShuffleManagers. + var shuffleManager: ShuffleManager = _ Review Comment: Will fix !! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1381167046 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -71,6 +69,10 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { + // We initialize the ShuffleManager later, in SparkContext and Executor, to allow + // user jars to define custom ShuffleManagers. + var shuffleManager: ShuffleManager = _ Review Comment: Given `SparkEnv` is a `DeveloperApi`, let us not expose this for mutation. ```suggestion private var _shuffleManager: ShuffleManager = _ def shuffleManager: ShuffleManager = _shuffleManager ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1791035949 @tgravescs thanks for the review. I have handled your comments in this commit: https://github.com/apache/spark/pull/43627/commits/0bd7e990812d23166509ad6585c8d352f78e569f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
tgravescs commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1380262372 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -423,7 +425,6 @@ object SparkEnv extends Logging { conf, memoryManager, Review Comment: there is a comment above that says " // NB: blockManager is not valid until initialize() is called later." We may want to update that to mention having to set shuffle manager ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -186,12 +186,14 @@ private[spark] class BlockManager( val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, -shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, securityManager: SecurityManager, externalBlockStoreClient: Option[ExternalBlockStoreClient]) extends BlockDataManager with BlockEvictionHandler with Logging { + // this is set after the ShuffleManager is instantiated in SparkContext and Executor + private var shuffleManager: ShuffleManager = _ Review Comment: update description above to mention having to set the shuffle manager as well. ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -71,6 +69,9 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { + // We set the ShuffleManager in SparkContext and Executor Review Comment: nit update comment to say something like: the ShuffleManager is initialized later in... to allow it being defined in user specified jars. ## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ## @@ -104,8 +105,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, -new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, -isDriver = true)), +new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, +sc.env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle, true)), Review Comment: put back the isDriver = true as last parameter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE]: Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1789727698 @tgravescs fyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE]: Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1789508860 Ok I believe given the mima code that I need to add a temporary restriction here: https://github.com/apache/spark/blob/master/project/MimaExcludes.scala#L37. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE]: Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1789408786 The MIMA tests are failing due to: ``` [error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.5.0! Found 1 potential problems (filtered 3908) [error] * method this(java.lang.String,org.apache.spark.rpc.RpcEnv,org.apache.spark.serializer.Serializer,org.apache.spark.serializer.Serializer,org.apache.spark.serializer.SerializerManager,org.apache.spark.MapOutputTracker,org.apache.spark.shuffle.ShuffleManager,org.apache.spark.broadcast.BroadcastManager,org.apache.spark.storage.BlockManager,org.apache.spark.SecurityManager,org.apache.spark.metrics.MetricsSystem,org.apache.spark.memory.MemoryManager,org.apache.spark.scheduler.OutputCommitCoordinator,org.apache.spark.SparkConf)Unit in class org.apache.spark.SparkEnv does not have a correspondent in current version ``` Which makes sense, since I changed `SparkEnv`. I am not entirely sure if adding this to `MimaExcludes` is the right approach here, and I think I need some help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45762][CORE]: Support shuffle managers defined in user jars by changing startup order [spark]
abellina commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1789318235 I'll have to figure out the CI. It seems my fork is running things, but I am getting some failures in this page (`AppVeyor` and the `Notify test workflow`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org