By default, the depth of the tree is 2. Each partition will be one node.

Sincerely,

DB Tsai
-------------------------------------------------------
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar <raghav0110...@gmail.com> wrote:
> Hey Reza,
>
> Thanks for your response!
>
> Your response clarifies some of my initial thoughts. However, what I don't
> understand is how the depth of the tree is used to identify how many
> intermediate reducers there will be, and how many partitions are sent to the
> intermediate reducers. Could you provide some insight into this?
>
> Thanks,
> Raghav
>
> On Thursday, June 4, 2015, Reza Zadeh <r...@databricks.com> wrote:
>>
>> In a regular reduce, all partitions have to send their reduced value to a
>> single machine, and that machine can become a bottleneck.
>>
>> In a treeReduce, the partitions talk to each other in a logarithmic number
>> of rounds. Imagine a binary tree that has all the partitions at its leaves
>> and the root will contain the final reduced value. This way there is no
>> single bottleneck machine.
>>
>> It remains to decide the number of children each node should have and how
>> deep the tree should be, which is some of the logic in the method you
>> pasted.
>>
>> On Wed, Jun 3, 2015 at 7:10 PM, raggy <raghav0110...@gmail.com> wrote:
>>>
>>> I am trying to understand what the treeReduce function for an RDD does,
>>> and
>>> how it is different from the normal reduce function. My current
>>> understanding is that treeReduce tries to split up the reduce into
>>> multiple
>>> steps. We do a partial reduce on different nodes, and then a final reduce
>>> is
>>> done to get the final result. Is this correct? If so, I guess what I am
>>> curious about is, how does spark decide how many nodes will be on each
>>> level, and how many partitions will be sent to a given node?
>>>
>>> The bulk of the implementation is within this function:
>>>
>>>     partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>>>       .getOrElse(throw new UnsupportedOperationException("empty
>>> collection"))
>>>
>>> The above function is expanded to
>>>
>>> val cleanSeqOp = context.clean(seqOp)
>>>       val cleanCombOp = context.clean(combOp)
>>>       val aggregatePartition =
>>>         (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
>>> cleanCombOp)
>>>       var partiallyAggregated = mapPartitions(it =>
>>> Iterator(aggregatePartition(it)))
>>>       var numPartitions = partiallyAggregated.partitions.length
>>>       val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
>>> depth)).toInt, 2)
>>>       // If creating an extra level doesn't help reduce
>>>       // the wall-clock time, we stop tree aggregation.
>>>       while (numPartitions > scale + numPartitions / scale) {
>>>         numPartitions /= scale
>>>         val curNumPartitions = numPartitions
>>>         partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex
>>> {
>>>           (i, iter) => iter.map((i % curNumPartitions, _))
>>>         }.reduceByKey(new HashPartitioner(curNumPartitions),
>>> cleanCombOp).values
>>>       }
>>>       partiallyAggregated.reduce(cleanCombOp)
>>>
>>> I am completely lost about what is happening in this function. I would
>>> greatly appreciate some sort of explanation.
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to