Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/12113#discussion_r58698221 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -492,16 +624,51 @@ private[spark] object MapOutputTracker extends Logging { } { objOut.close() } - out.toByteArray + val arr = out.toByteArray + if (minBroadcastSize >= 0 && arr.length >= minBroadcastSize) { + // Use broadcast instead. + // Important arr(0) is the tag == DIRECT, ignore that while deserializing ! + val bcast = broadcastManager.newBroadcast(arr, isLocal) + // toByteArray creates copy, so we can reuse out + out.reset() + out.write(BROADCAST) + val oos = new ObjectOutputStream(new GZIPOutputStream(out)) + oos.writeObject(bcast) + oos.close() + val outArr = out.toByteArray + logInfo("Broadcast mapstatuses size = " + outArr.length + ", actual size = " + arr.length) + (outArr, bcast) + } else { + (arr, null) + } } // Opposite of serializeMapStatuses. def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = { - val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes))) - Utils.tryWithSafeFinally { - objIn.readObject().asInstanceOf[Array[MapStatus]] - } { - objIn.close() + assert (bytes.length > 0) + + def deserializeObject(arr: Array[Byte], off: Int, len: Int): AnyRef = { + val objIn = new ObjectInputStream(new GZIPInputStream( + new ByteArrayInputStream(arr, off, len))) + Utils.tryWithSafeFinally { + objIn.readObject() + } { + objIn.close() + } + } + + bytes(0) match { + case DIRECT => + deserializeObject(bytes, 1, bytes.length - 1).asInstanceOf[Array[MapStatus]] + case BROADCAST => + // deserialize the Broadcast, pull .value array out of it, and then deserialize that + val bcast = deserializeObject(bytes, 1, bytes.length - 1). + asInstanceOf[Broadcast[Array[Byte]]] + logInfo("Broadcast mapstatuses size = " + bytes.length + + ", actual size = " + bcast.value.length) + // Important - ignore the DIRECT tag ! Start from offset 1 + deserializeObject(bcast.value, 1, bcast.value.length - 1).asInstanceOf[Array[MapStatus]] --- End diff -- sorry not sure I follow the question? The deserialize is getting the small message, which will indicate its a broadcast and then each executor will get the broadcast variable just like any other broadcast variable (done by the user). Multiple executors can fetch at the same time and it uses the torrent broadcast which will share the load as other executors get parts.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org