Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16399782
  
    --- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -109,99 +137,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
       private def readObject(in: ObjectInputStream) {
         in.defaultReadObject()
         TorrentBroadcast.synchronized {
    -      SparkEnv.get.blockManager.getSingle(broadcastId) match {
    +      SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) 
match {
             case Some(x) =>
    -          value_ = x.asInstanceOf[T]
    +          _value = x.asInstanceOf[T]
     
             case None =>
    -          val start = System.nanoTime
               logInfo("Started reading broadcast variable " + id)
    -
    -          // Initialize @transient variables that will receive garbage 
values from the master.
    -          resetWorkerVariables()
    -
    -          if (receiveBroadcast()) {
    -            value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, 
totalBytes, totalBlocks)
    -
    -            /* Store the merged copy in cache so that the next worker 
doesn't need to rebuild it.
    -             * This creates a trade-off between memory usage and latency. 
Storing copy doubles
    -             * the memory footprint; not storing doubles deserialization 
cost. Also,
    -             * this does not need to be reported to BlockManagerMaster 
since other executors
    -             * does not need to access this block (they only need to fetch 
the chunks,
    -             * which are reported).
    -             */
    -            SparkEnv.get.blockManager.putSingle(
    -              broadcastId, value_, StorageLevel.MEMORY_AND_DISK, 
tellMaster = false)
    -
    -            // Remove arrayOfBlocks from memory once value_ is on local 
cache
    -            resetWorkerVariables()
    -          } else {
    -            logError("Reading broadcast variable " + id + " failed")
    -          }
    -
    -          val time = (System.nanoTime - start) / 1e9
    +          val start = System.nanoTime()
    +          val blocks = readBlocks()
    +          val time = (System.nanoTime() - start) / 1e9
               logInfo("Reading broadcast variable " + id + " took " + time + " 
s")
    -      }
    -    }
    -  }
    -
    -  private def resetWorkerVariables() {
    -    arrayOfBlocks = null
    -    totalBytes = -1
    -    totalBlocks = -1
    -    hasBlocks = 0
    -  }
    -
    -  private def receiveBroadcast(): Boolean = {
    -    // Receive meta-info about the size of broadcast data,
    -    // the number of chunks it is divided into, etc.
    -    val metaId = BroadcastBlockId(id, "meta")
    -    var attemptId = 10
    -    while (attemptId > 0 && totalBlocks == -1) {
    -      SparkEnv.get.blockManager.getSingle(metaId) match {
    -        case Some(x) =>
    -          val tInfo = x.asInstanceOf[TorrentInfo]
    -          totalBlocks = tInfo.totalBlocks
    -          totalBytes = tInfo.totalBytes
    -          arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
    -          hasBlocks = 0
    -
    -        case None =>
    -          Thread.sleep(500)
    -      }
    -      attemptId -= 1
    -    }
    -
    -    if (totalBlocks == -1) {
    -      return false
    -    }
     
    -    /*
    -     * Fetch actual chunks of data. Note that all these chunks are stored 
in
    -     * the BlockManager and reported to the master, so that other executors
    -     * can find out and pull the chunks from this executor.
    -     */
    -    val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 
1).toList)
    -    for (pid <- recvOrder) {
    -      val pieceId = BroadcastBlockId(id, "piece" + pid)
    -      SparkEnv.get.blockManager.getSingle(pieceId) match {
    -        case Some(x) =>
    -          arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
    -          hasBlocks += 1
    +          _value = TorrentBroadcast.unBlockifyObject[T](blocks)
    +          // Store the merged copy in BlockManager so other tasks on this 
executor doesn't
    --- End diff --
    
    nit: doesn't -> don't


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to