I am trying to understand what the treeReduce function for an RDD does, and
how it is different from the normal reduce function. My current
understanding is that treeReduce tries to split up the reduce into multiple
steps. We do a partial reduce on different nodes, and then a final reduce is
done to get the final result. Is this correct? If so, I guess what I am
curious about is, how does spark decide how many nodes will be on each
level, and how many partitions will be sent to a given node? 

The bulk of the implementation is within this function:

    partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
      .getOrElse(throw new UnsupportedOperationException("empty
collection"))

The above function is expanded to

val cleanSeqOp = context.clean(seqOp)
      val cleanCombOp = context.clean(combOp)
      val aggregatePartition =
        (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
cleanCombOp)
      var partiallyAggregated = mapPartitions(it =>
Iterator(aggregatePartition(it)))
      var numPartitions = partiallyAggregated.partitions.length
      val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
depth)).toInt, 2)
      // If creating an extra level doesn't help reduce
      // the wall-clock time, we stop tree aggregation.
      while (numPartitions > scale + numPartitions / scale) {
        numPartitions /= scale
        val curNumPartitions = numPartitions
        partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
          (i, iter) => iter.map((i % curNumPartitions, _))
        }.reduceByKey(new HashPartitioner(curNumPartitions),
cleanCombOp).values
      }
      partiallyAggregated.reduce(cleanCombOp)

I am completely lost about what is happening in this function. I would
greatly appreciate some sort of explanation. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to