By default, the depth of the tree is 2. Each partition will be one node. Sincerely,
DB Tsai ------------------------------------------------------- Blog: https://www.dbtsai.com On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar <raghav0110...@gmail.com> wrote: > Hey Reza, > > Thanks for your response! > > Your response clarifies some of my initial thoughts. However, what I don't > understand is how the depth of the tree is used to identify how many > intermediate reducers there will be, and how many partitions are sent to the > intermediate reducers. Could you provide some insight into this? > > Thanks, > Raghav > > On Thursday, June 4, 2015, Reza Zadeh <r...@databricks.com> wrote: >> >> In a regular reduce, all partitions have to send their reduced value to a >> single machine, and that machine can become a bottleneck. >> >> In a treeReduce, the partitions talk to each other in a logarithmic number >> of rounds. Imagine a binary tree that has all the partitions at its leaves >> and the root will contain the final reduced value. This way there is no >> single bottleneck machine. >> >> It remains to decide the number of children each node should have and how >> deep the tree should be, which is some of the logic in the method you >> pasted. >> >> On Wed, Jun 3, 2015 at 7:10 PM, raggy <raghav0110...@gmail.com> wrote: >>> >>> I am trying to understand what the treeReduce function for an RDD does, >>> and >>> how it is different from the normal reduce function. My current >>> understanding is that treeReduce tries to split up the reduce into >>> multiple >>> steps. We do a partial reduce on different nodes, and then a final reduce >>> is >>> done to get the final result. Is this correct? If so, I guess what I am >>> curious about is, how does spark decide how many nodes will be on each >>> level, and how many partitions will be sent to a given node? >>> >>> The bulk of the implementation is within this function: >>> >>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth) >>> .getOrElse(throw new UnsupportedOperationException("empty >>> collection")) >>> >>> The above function is expanded to >>> >>> val cleanSeqOp = context.clean(seqOp) >>> val cleanCombOp = context.clean(combOp) >>> val aggregatePartition = >>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, >>> cleanCombOp) >>> var partiallyAggregated = mapPartitions(it => >>> Iterator(aggregatePartition(it))) >>> var numPartitions = partiallyAggregated.partitions.length >>> val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / >>> depth)).toInt, 2) >>> // If creating an extra level doesn't help reduce >>> // the wall-clock time, we stop tree aggregation. >>> while (numPartitions > scale + numPartitions / scale) { >>> numPartitions /= scale >>> val curNumPartitions = numPartitions >>> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex >>> { >>> (i, iter) => iter.map((i % curNumPartitions, _)) >>> }.reduceByKey(new HashPartitioner(curNumPartitions), >>> cleanCombOp).values >>> } >>> partiallyAggregated.reduce(cleanCombOp) >>> >>> I am completely lost about what is happening in this function. I would >>> greatly appreciate some sort of explanation. >>> >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> --------------------------------------------------------------------- >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org