Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11327#discussion_r61798109
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala ---
    @@ -169,43 +169,41 @@ private class DefaultPartitionCoalescer(val 
balanceSlack: Double = 0.10)
     
       var noLocality = true  // if true if no preferredLocations exists for 
parent RDD
     
    -  // gets the *current* preferred locations from the DAGScheduler (as 
opposed to the static ones)
    -  def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
    -    prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
    -  }
    -
    -  // this class just keeps iterating and rotating infinitely over the 
partitions of the RDD
    -  // next() returns the next preferred machine that a partition is 
replicated on
    -  // the rotator first goes through the first replica copy of each 
partition, then second, third
    -  // the iterators return type is a tuple: (replicaString, partition)
    -  class LocationIterator(prev: RDD[_]) extends Iterator[(String, 
Partition)] {
    -
    -    var it: Iterator[(String, Partition)] = resetIterator()
    -
    -    override val isEmpty = !it.hasNext
    -
    -    // initializes/resets to start iterating from the beginning
    -    def resetIterator(): Iterator[(String, Partition)] = {
    -      val iterators = (0 to 2).map { x =>
    -        prev.partitions.iterator.flatMap { p =>
    -          if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, 
prev)(x), p)) else None
    +  class PartitionLocations(prev: RDD[_]) {
    +
    +    // contains all the partitions from the previous RDD that don't have 
preferred locations
    +    val partsWithoutLocs = ArrayBuffer[Partition]()
    +    // contains all the partitions from the previous RDD that have 
preferred locations
    +    val partsWithLocs: Array[(String, Partition)] = getAllPrefLocs(prev)
    +
    +    // has side affect of filling in partitions without locations as well
    +    def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = {
    +      val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
    +      // first get the locations for each partition, only do this once 
since it can be expensive
    +      prev.partitions.foreach(p => {
    +          val locs = currPrefLocs(p, prev)
    +          if (locs.size > 0) {
    +            partsWithLocs.put(p, locs)
    +          } else {
    +            partsWithoutLocs += p
    +          }
             }
    -      }
    -      iterators.reduceLeft((x, y) => x ++ y)
    +      )
    +      // convert it into an array of host to partition
    +      val allLocs = (0 to 2).map(x =>
    +        partsWithLocs.toArray.flatMap(parts => {
    +          val p = parts._1
    +          val locs = parts._2
    +          if (locs.size > x) Some((locs(x), p)) else None
    +        } )
    +      )
    +      allLocs.reduceLeft((x, y) => x ++ y)
         }
    +  }
     
    -    // hasNext() is false iff there are no preferredLocations for any of 
the partitions of the RDD
    -    override def hasNext: Boolean = { !isEmpty }
    -
    -    // return the next preferredLocation of some partition of the RDD
    -    override def next(): (String, Partition) = {
    -      if (it.hasNext) {
    -        it.next()
    -      } else {
    -        it = resetIterator() // ran out of preferred locations, reset and 
rotate to the beginning
    -        it.next()
    -      }
    -    }
    +  // gets the *current* preferred locations from the DAGScheduler (as 
opposed to the static ones)
    +  def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
    --- End diff --
    
    private or inline this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to