mridulm commented on code in PR #43627: URL: https://github.com/apache/spark/pull/43627#discussion_r1386011282
########## core/src/main/scala/org/apache/spark/SparkContext.scala: ########## @@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging { } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) + _env.blockManager.setShuffleManager(shuffleManager) FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, _conf) Review Comment: The changes in this class are not related to driver classpath, and so we can revert all changes here and move the logic to `SparkEnv.create` to initialize `shuffleManager` when it is `driver`. ########## core/src/main/scala/org/apache/spark/SparkEnv.scala: ########## @@ -185,6 +191,10 @@ class SparkEnv ( releasePythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, worker) } + + private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = { + _shuffleManager = shuffleManager + } Review Comment: Instead of setting it, expose an initialize method. In driver, we should directly set the variable as part of create - while we use `initializeShuffleManager` for executor after classpath has been fixed up (see more below in comment for `shuffleBlockGetterFn`). ```suggestion private[spark] def initiailzeShuffleManager(): Unit = { Preconditions.checkState(null == _shuffleManager, "Shuffle manager already initialized") // Must not be driver Preconditions.checkState(executorId != SparkContext.DRIVER_IDENTIFIER, "Should not be called in driver") _shuffleManager = ShuffleManager.create(_conf, false) } ``` ########## core/src/main/scala/org/apache/spark/SparkEnv.scala: ########## @@ -355,16 +370,6 @@ object SparkEnv extends Logging { new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - // Let the user specify short names for shuffle managers - val shortShuffleMgrNames = Map( - "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, - "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) - val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER) - val shuffleMgrClass = - shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName) - val shuffleManager = Utils.instantiateSerializerOrShuffleManager[ShuffleManager]( - shuffleMgrClass, conf, isDriver) - val memoryManager: MemoryManager = UnifiedMemoryManager(conf, numUsableCores) Review Comment: Instead, do: `val shuffleManager: ShuffleManager = if (isDriver) ShuffleManager.create(conf, true) else null` and keep rest of this method the same. ########## core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala: ########## @@ -81,9 +81,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) Review Comment: With the proposed changes, we can revert all changes to this file ########## core/src/main/scala/org/apache/spark/executor/Executor.scala: ########## @@ -329,14 +329,22 @@ private[spark] class Executor( } updateDependencies(initialUserFiles, initialUserJars, initialUserArchives, defaultSessionState) - // Plugins need to load using a class loader that includes the executor's user classpath. - // Plugins also needs to be initialized after the heartbeater started - // to avoid blocking to send heartbeat (see SPARK-32175). + // Plugins and shuffle managers need to load using a class loader that includes the executor's + // user classpath. Plugins also needs to be initialized after the heartbeater started + // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762). private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(defaultSessionState.replClassLoader) { PluginContainer(env, resources.asJava) } + private val shuffleManager = + Utils.withContextClassLoader(defaultSessionState.replClassLoader) { + ShuffleManager.create(conf, true) + } + + env.setShuffleManager(shuffleManager) + env.blockManager.setShuffleManager(shuffleManager) Review Comment: ```suggestion Utils.withContextClassLoader(defaultSessionState.replClassLoader) { env.initiailzeShuffleManager() } ``` I have not tested this, but I think this should work. If it does not, most of my suggestions will need to be discarded :-) ########## streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala: ########## @@ -93,7 +93,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", Review Comment: Here as well, revert all changes. ########## core/src/main/scala/org/apache/spark/SparkEnv.scala: ########## @@ -277,6 +287,11 @@ object SparkEnv extends Logging { hostname, numCores, ioEncryptionKey, isLocal) } + private def shuffleBlockGetterFn(shuffleId: Int, mapId: Long): Seq[BlockId] = { + val env = SparkEnv.get + env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapId) + } + Review Comment: Drop this ? In `BlockManagerMasterEndpoint`: * We change constructor to: `private val _shuffleManager: ShuffleManager,` * And add a field: `private lazy val shuffleManager = Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)` Do the same for `BlockManager` as well. See more below in `create`. ########## core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala: ########## @@ -143,10 +143,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe None } Review Comment: Same as with `BlockManagerReplicationSuite`, all changes can be reverted here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org