Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/11327#discussion_r61798109 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -169,43 +169,41 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) var noLocality = true // if true if no preferredLocations exists for parent RDD - // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) - def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { - prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) - } - - // this class just keeps iterating and rotating infinitely over the partitions of the RDD - // next() returns the next preferred machine that a partition is replicated on - // the rotator first goes through the first replica copy of each partition, then second, third - // the iterators return type is a tuple: (replicaString, partition) - class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] { - - var it: Iterator[(String, Partition)] = resetIterator() - - override val isEmpty = !it.hasNext - - // initializes/resets to start iterating from the beginning - def resetIterator(): Iterator[(String, Partition)] = { - val iterators = (0 to 2).map { x => - prev.partitions.iterator.flatMap { p => - if (currPrefLocs(p, prev).size > x) Some((currPrefLocs(p, prev)(x), p)) else None + class PartitionLocations(prev: RDD[_]) { + + // contains all the partitions from the previous RDD that don't have preferred locations + val partsWithoutLocs = ArrayBuffer[Partition]() + // contains all the partitions from the previous RDD that have preferred locations + val partsWithLocs: Array[(String, Partition)] = getAllPrefLocs(prev) + + // has side affect of filling in partitions without locations as well + def getAllPrefLocs(prev: RDD[_]): Array[(String, Partition)] = { + val partsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() + // first get the locations for each partition, only do this once since it can be expensive + prev.partitions.foreach(p => { + val locs = currPrefLocs(p, prev) + if (locs.size > 0) { + partsWithLocs.put(p, locs) + } else { + partsWithoutLocs += p + } } - } - iterators.reduceLeft((x, y) => x ++ y) + ) + // convert it into an array of host to partition + val allLocs = (0 to 2).map(x => + partsWithLocs.toArray.flatMap(parts => { + val p = parts._1 + val locs = parts._2 + if (locs.size > x) Some((locs(x), p)) else None + } ) + ) + allLocs.reduceLeft((x, y) => x ++ y) } + } - // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD - override def hasNext: Boolean = { !isEmpty } - - // return the next preferredLocation of some partition of the RDD - override def next(): (String, Partition) = { - if (it.hasNext) { - it.next() - } else { - it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning - it.next() - } - } + // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) + def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { --- End diff -- private or inline this?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org