mridulm commented on a change in pull request #30480:
URL: https://github.com/apache/spark/pull/30480#discussion_r602972892



##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -449,21 +605,32 @@ private[spark] class MapOutputTrackerMaster(
       try {
         while (true) {
           try {
-            val data = mapOutputRequests.take()
-             if (data == PoisonPill) {
+            val data = mapOutputTrackerMasterMessages.take()
+            if (data == PoisonPill) {
               // Put PoisonPill back so that other MessageLoops can see it.
-              mapOutputRequests.offer(PoisonPill)
+              mapOutputTrackerMasterMessages.offer(PoisonPill)
               return
             }
-            val context = data.context
-            val shuffleId = data.shuffleId
-            val hostPort = context.senderAddress.hostPort
-            logDebug("Handling request to send map output locations for 
shuffle " + shuffleId +
-              " to " + hostPort)
-            val shuffleStatus = shuffleStatuses.get(shuffleId).head
-            context.reply(
-              shuffleStatus.serializedMapStatus(broadcastManager, isLocal, 
minSizeForBroadcast,
-                conf))
+
+            data match {
+              case GetMapStatusMessage(shuffleId, context) =>
+                val hostPort = context.senderAddress.hostPort
+                val shuffleStatus = shuffleStatuses.get(shuffleId).head
+                logDebug("Handling request to send map output locations for 
shuffle " + shuffleId +
+                  " to " + hostPort)
+                context.reply(
+                  shuffleStatus.serializedOutputStatus(broadcastManager, 
isLocal,
+                    minSizeForBroadcast, conf, isMapOutput = true))
+
+              case GetMergeStatusMessage(shuffleId, context) =>
+                val hostPort = context.senderAddress.hostPort
+                val shuffleStatus = shuffleStatuses.get(shuffleId).head
+                logDebug("Handling request to send merge output locations for" 
+
+                  " shuffle " + shuffleId + " to " + hostPort)
+                context.reply(
+                  shuffleStatus.serializedOutputStatus(broadcastManager, 
isLocal,
+                    minSizeForBroadcast, conf, isMapOutput = false))
+            }

Review comment:
       That would be for a new message right ? Will it have changes to these 
existing messages ? If not, would something like this not work ?
   
   ```
   
   def handleStatusMessage(shuffleId: Int, context: RpcCallContext, 
messageType: String, mapOutput: Boolean): Unit = {
     val hostPort = context.senderAddress.hostPort
     val shuffleStatus = shuffleStatuses.get(shuffleId).head
     logDebug(s"Handling request to send $messageType output locations for 
shuffle $shuffleId to $hostPort")
     context.reply(
       shuffleStatus.serializedOutputStatus(broadcastManager, isLocal, 
minSizeForBroadcast, conf, isMapOutput = mapOutput))
   }
   
   data match {
     case GetMapStatusMessage(shuffleId, context) => 
handleStatusMessage(shuffleId, context, "map", true)
     case GetMergeStatusMessage(shuffleId, context) => 
handleStatusMessage(shuffleId, context, "map", false)
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to