mridulm commented on code in PR #43627:
URL: https://github.com/apache/spark/pull/43627#discussion_r1386011282


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging {
     }
     _ui.foreach(_.setAppId(_applicationId))
     _env.blockManager.initialize(_applicationId)
+    _env.blockManager.setShuffleManager(shuffleManager)
     FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master, 
_conf)

Review Comment:
   The changes in this class are not related to driver classpath, and so we can 
revert all changes here and move the logic to `SparkEnv.create` to initialize 
`shuffleManager` when it is `driver`.



##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -185,6 +191,10 @@ class SparkEnv (
     releasePythonWorker(
       pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, 
envVars, worker)
   }
+
+  private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit = 
{
+    _shuffleManager = shuffleManager
+  }

Review Comment:
   Instead of setting it, expose an initialize method.
   In driver, we should directly set the variable as part of create - while we 
use `initializeShuffleManager` for executor after classpath has been fixed up 
(see more below in comment for `shuffleBlockGetterFn`).
   
   ```suggestion
     private[spark] def initiailzeShuffleManager(): Unit = {
       Preconditions.checkState(null == _shuffleManager, "Shuffle manager 
already initialized")
       // Must not be driver
       Preconditions.checkState(executorId != SparkContext.DRIVER_IDENTIFIER,
           "Should not be called in driver")
       _shuffleManager = ShuffleManager.create(_conf, false)
     }
   ```



##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -355,16 +370,6 @@ object SparkEnv extends Logging {
       new MapOutputTrackerMasterEndpoint(
         rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
 
-    // Let the user specify short names for shuffle managers
-    val shortShuffleMgrNames = Map(
-      "sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
-      "tungsten-sort" -> 
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
-    val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
-    val shuffleMgrClass =
-      shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), 
shuffleMgrName)
-    val shuffleManager = 
Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
-      shuffleMgrClass, conf, isDriver)
-
     val memoryManager: MemoryManager = UnifiedMemoryManager(conf, 
numUsableCores)

Review Comment:
   Instead, do:
   
   `val shuffleManager: ShuffleManager = if (isDriver) 
ShuffleManager.create(conf, true) else null`
   
   and keep rest of this method the same.
   
   



##########
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala:
##########
@@ -81,9 +81,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
       conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
     val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, 
numCores = 1))

Review Comment:
   With the proposed changes, we can revert all changes to this file



##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -329,14 +329,22 @@ private[spark] class Executor(
     }
   updateDependencies(initialUserFiles, initialUserJars, initialUserArchives, 
defaultSessionState)
 
-  // Plugins need to load using a class loader that includes the executor's 
user classpath.
-  // Plugins also needs to be initialized after the heartbeater started
-  // to avoid blocking to send heartbeat (see SPARK-32175).
+  // Plugins and shuffle managers need to load using a class loader that 
includes the executor's
+  // user classpath. Plugins also needs to be initialized after the 
heartbeater started
+  // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762).
   private val plugins: Option[PluginContainer] =
     Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
       PluginContainer(env, resources.asJava)
     }
 
+  private val shuffleManager =
+    Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
+      ShuffleManager.create(conf, true)
+    }
+
+  env.setShuffleManager(shuffleManager)
+  env.blockManager.setShuffleManager(shuffleManager)

Review Comment:
   ```suggestion
       Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
         env.initiailzeShuffleManager()
       }
   ```
   
   I have not tested this, but I think this should work. If it does not, most 
of my suggestions will need to be discarded :-)



##########
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala:
##########
@@ -93,7 +93,8 @@ abstract class 
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]()
     blockManagerMaster = new 
BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",

Review Comment:
   Here as well, revert all changes.



##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -277,6 +287,11 @@ object SparkEnv extends Logging {
       hostname, numCores, ioEncryptionKey, isLocal)
   }
 
+  private def shuffleBlockGetterFn(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {
+    val env = SparkEnv.get
+    env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, 
mapId)
+  }
+

Review Comment:
   Drop this ?
   In `BlockManagerMasterEndpoint`:
   * We change constructor to:
   `private val _shuffleManager: ShuffleManager,`
   * And add a field:
   `private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)`
   
   Do the same for `BlockManager` as well.
   See more below in `create`.



##########
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##########
@@ -143,10 +143,11 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
       None
     }

Review Comment:
   Same as with `BlockManagerReplicationSuite`, all changes can be reverted 
here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to