Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12113#discussion_r58698221
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends 
Logging {
         } {
           objOut.close()
         }
    -    out.toByteArray
    +    val arr = out.toByteArray
    +    if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) {
    +      // Use broadcast instead.
    +      // Important arr(0) is the tag == DIRECT, ignore that while 
deserializing !
    +      val bcast = broadcastManager.newBroadcast(arr, isLocal)
    +      // toByteArray creates copy, so we can reuse out
    +      out.reset()
    +      out.write(BROADCAST)
    +      val oos = new ObjectOutputStream(new GZIPOutputStream(out))
    +      oos.writeObject(bcast)
    +      oos.close()
    +      val outArr = out.toByteArray
    +      logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual 
size = " + arr.length)
    +      (outArr, bcast)
    +    } else {
    +      (arr, null)
    +    }
       }
     
       // Opposite of serializeMapStatuses.
       def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
    -    val objIn = new ObjectInputStream(new GZIPInputStream(new 
ByteArrayInputStream(bytes)))
    -    Utils.tryWithSafeFinally {
    -      objIn.readObject().asInstanceOf[Array[MapStatus]]
    -    } {
    -      objIn.close()
    +    assert (bytes.length > 0)
    +
    +    def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = {
    +      val objIn = new ObjectInputStream(new GZIPInputStream(
    +        new ByteArrayInputStream(arr, off, len)))
    +      Utils.tryWithSafeFinally {
    +        objIn.readObject()
    +      } {
    +        objIn.close()
    +      }
    +    }
    +
    +    bytes(0) match {
    +      case DIRECT =>
    +        deserializeObject(bytes, 1, bytes.length - 
1).asInstanceOf[Array[MapStatus]]
    +      case BROADCAST =>
    +        // deserialize the Broadcast, pull .value array out of it, and 
then deserialize that
    +        val bcast = deserializeObject(bytes, 1, bytes.length - 1).
    +          asInstanceOf[Broadcast[Array[Byte]]]
    +        logInfo("Broadcast mapstatuses size = " + bytes.length +
    +          ", actual size = " + bcast.value.length)
    +        // Important - ignore the DIRECT tag ! Start from offset 1
    +        deserializeObject(bcast.value, 1, bcast.value.length - 
1).asInstanceOf[Array[MapStatus]]
    --- End diff --
    
    sorry not sure I follow the question?
    
     The deserialize is getting the small message, which will indicate its a 
broadcast and then each executor will get the broadcast variable just like any 
other broadcast variable (done by the user).  Multiple executors can fetch at 
the same time and it uses the torrent broadcast which will share the load as 
other executors get parts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to