Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
For the first round, you will have 16 reducers working since you have
32 partitions. Two of 32 partitions will know which reducer they will
go by sharing the same key using reduceByKey.

After this step is done, you will have 16 partitions, so the next
round will be 8 reducers.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 12:06 PM, Raghav Shankar  wrote:
> Hey DB,
>
> Thanks for the reply!
>
> I still don't think this answers my question. For example, if I have a top()
> action being executed and I have 32 workers(32 partitions), and I choose a
> depth of 4, what does the overlay of intermediate reducers look like? How
> many reducers are there excluding the master and the worker? How many
> partitions get sent to each of these intermediate reducers? Does this number
> vary at each level?
>
> Thanks!
>
>
> On Thursday, June 4, 2015, DB Tsai  wrote:
>>
>> By default, the depth of the tree is 2. Each partition will be one node.
>>
>> Sincerely,
>>
>> DB Tsai
>> ---
>> Blog: https://www.dbtsai.com
>>
>>
>> On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar 
>> wrote:
>> > Hey Reza,
>> >
>> > Thanks for your response!
>> >
>> > Your response clarifies some of my initial thoughts. However, what I
>> > don't
>> > understand is how the depth of the tree is used to identify how many
>> > intermediate reducers there will be, and how many partitions are sent to
>> > the
>> > intermediate reducers. Could you provide some insight into this?
>> >
>> > Thanks,
>> > Raghav
>> >
>> > On Thursday, June 4, 2015, Reza Zadeh  wrote:
>> >>
>> >> In a regular reduce, all partitions have to send their reduced value to
>> >> a
>> >> single machine, and that machine can become a bottleneck.
>> >>
>> >> In a treeReduce, the partitions talk to each other in a logarithmic
>> >> number
>> >> of rounds. Imagine a binary tree that has all the partitions at its
>> >> leaves
>> >> and the root will contain the final reduced value. This way there is no
>> >> single bottleneck machine.
>> >>
>> >> It remains to decide the number of children each node should have and
>> >> how
>> >> deep the tree should be, which is some of the logic in the method you
>> >> pasted.
>> >>
>> >> On Wed, Jun 3, 2015 at 7:10 PM, raggy  wrote:
>> >>>
>> >>> I am trying to understand what the treeReduce function for an RDD
>> >>> does,
>> >>> and
>> >>> how it is different from the normal reduce function. My current
>> >>> understanding is that treeReduce tries to split up the reduce into
>> >>> multiple
>> >>> steps. We do a partial reduce on different nodes, and then a final
>> >>> reduce
>> >>> is
>> >>> done to get the final result. Is this correct? If so, I guess what I
>> >>> am
>> >>> curious about is, how does spark decide how many nodes will be on each
>> >>> level, and how many partitions will be sent to a given node?
>> >>>
>> >>> The bulk of the implementation is within this function:
>> >>>
>> >>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>> >>>   .getOrElse(throw new UnsupportedOperationException("empty
>> >>> collection"))
>> >>>
>> >>> The above function is expanded to
>> >>>
>> >>> val cleanSeqOp = context.clean(seqOp)
>> >>>   val cleanCombOp = context.clean(combOp)
>> >>>   val aggregatePartition =
>> >>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
>> >>> cleanCombOp)
>> >>>   var partiallyAggregated = mapPartitions(it =>
>> >>> Iterator(aggregatePartition(it)))
>> >>>   var numPartitions = partiallyAggregated.partitions.length
>> >>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
>> >>> depth)).toInt, 2)
>> >>>   // If creating an extra level doesn't help reduce
>> >>>   // the wall-clock time, we stop tree aggregation.
>> >>>   while (numPartitions > scale + numPartitions / scale) {

Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey DB,

Thanks for the reply!

I still don't think this answers my question. For example, if I have a
top() action being executed and I have 32 workers(32 partitions), and I
choose a depth of 4, what does the overlay of intermediate reducers look
like? How many reducers are there excluding the master and the worker? How
many partitions get sent to each of these intermediate reducers? Does this
number vary at each level?

Thanks!

On Thursday, June 4, 2015, DB Tsai  wrote:

> By default, the depth of the tree is 2. Each partition will be one node.
>
> Sincerely,
>
> DB Tsai
> ---
> Blog: https://www.dbtsai.com
>
>
> On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar  > wrote:
> > Hey Reza,
> >
> > Thanks for your response!
> >
> > Your response clarifies some of my initial thoughts. However, what I
> don't
> > understand is how the depth of the tree is used to identify how many
> > intermediate reducers there will be, and how many partitions are sent to
> the
> > intermediate reducers. Could you provide some insight into this?
> >
> > Thanks,
> > Raghav
> >
> > On Thursday, June 4, 2015, Reza Zadeh  > wrote:
> >>
> >> In a regular reduce, all partitions have to send their reduced value to
> a
> >> single machine, and that machine can become a bottleneck.
> >>
> >> In a treeReduce, the partitions talk to each other in a logarithmic
> number
> >> of rounds. Imagine a binary tree that has all the partitions at its
> leaves
> >> and the root will contain the final reduced value. This way there is no
> >> single bottleneck machine.
> >>
> >> It remains to decide the number of children each node should have and
> how
> >> deep the tree should be, which is some of the logic in the method you
> >> pasted.
> >>
> >> On Wed, Jun 3, 2015 at 7:10 PM, raggy  > wrote:
> >>>
> >>> I am trying to understand what the treeReduce function for an RDD does,
> >>> and
> >>> how it is different from the normal reduce function. My current
> >>> understanding is that treeReduce tries to split up the reduce into
> >>> multiple
> >>> steps. We do a partial reduce on different nodes, and then a final
> reduce
> >>> is
> >>> done to get the final result. Is this correct? If so, I guess what I am
> >>> curious about is, how does spark decide how many nodes will be on each
> >>> level, and how many partitions will be sent to a given node?
> >>>
> >>> The bulk of the implementation is within this function:
> >>>
> >>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
> >>>   .getOrElse(throw new UnsupportedOperationException("empty
> >>> collection"))
> >>>
> >>> The above function is expanded to
> >>>
> >>> val cleanSeqOp = context.clean(seqOp)
> >>>   val cleanCombOp = context.clean(combOp)
> >>>   val aggregatePartition =
> >>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
> >>> cleanCombOp)
> >>>   var partiallyAggregated = mapPartitions(it =>
> >>> Iterator(aggregatePartition(it)))
> >>>   var numPartitions = partiallyAggregated.partitions.length
> >>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
> >>> depth)).toInt, 2)
> >>>   // If creating an extra level doesn't help reduce
> >>>   // the wall-clock time, we stop tree aggregation.
> >>>   while (numPartitions > scale + numPartitions / scale) {
> >>> numPartitions /= scale
> >>> val curNumPartitions = numPartitions
> >>> partiallyAggregated =
> partiallyAggregated.mapPartitionsWithIndex
> >>> {
> >>>   (i, iter) => iter.map((i % curNumPartitions, _))
> >>> }.reduceByKey(new HashPartitioner(curNumPartitions),
> >>> cleanCombOp).values
> >>>   }
> >>>   partiallyAggregated.reduce(cleanCombOp)
> >>>
> >>> I am completely lost about what is happening in this function. I would
> >>> greatly appreciate some sort of explanation.
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> 
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> >>>
> >>
> >
>


Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
By default, the depth of the tree is 2. Each partition will be one node.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar  wrote:
> Hey Reza,
>
> Thanks for your response!
>
> Your response clarifies some of my initial thoughts. However, what I don't
> understand is how the depth of the tree is used to identify how many
> intermediate reducers there will be, and how many partitions are sent to the
> intermediate reducers. Could you provide some insight into this?
>
> Thanks,
> Raghav
>
> On Thursday, June 4, 2015, Reza Zadeh  wrote:
>>
>> In a regular reduce, all partitions have to send their reduced value to a
>> single machine, and that machine can become a bottleneck.
>>
>> In a treeReduce, the partitions talk to each other in a logarithmic number
>> of rounds. Imagine a binary tree that has all the partitions at its leaves
>> and the root will contain the final reduced value. This way there is no
>> single bottleneck machine.
>>
>> It remains to decide the number of children each node should have and how
>> deep the tree should be, which is some of the logic in the method you
>> pasted.
>>
>> On Wed, Jun 3, 2015 at 7:10 PM, raggy  wrote:
>>>
>>> I am trying to understand what the treeReduce function for an RDD does,
>>> and
>>> how it is different from the normal reduce function. My current
>>> understanding is that treeReduce tries to split up the reduce into
>>> multiple
>>> steps. We do a partial reduce on different nodes, and then a final reduce
>>> is
>>> done to get the final result. Is this correct? If so, I guess what I am
>>> curious about is, how does spark decide how many nodes will be on each
>>> level, and how many partitions will be sent to a given node?
>>>
>>> The bulk of the implementation is within this function:
>>>
>>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>>>   .getOrElse(throw new UnsupportedOperationException("empty
>>> collection"))
>>>
>>> The above function is expanded to
>>>
>>> val cleanSeqOp = context.clean(seqOp)
>>>   val cleanCombOp = context.clean(combOp)
>>>   val aggregatePartition =
>>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
>>> cleanCombOp)
>>>   var partiallyAggregated = mapPartitions(it =>
>>> Iterator(aggregatePartition(it)))
>>>   var numPartitions = partiallyAggregated.partitions.length
>>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
>>> depth)).toInt, 2)
>>>   // If creating an extra level doesn't help reduce
>>>   // the wall-clock time, we stop tree aggregation.
>>>   while (numPartitions > scale + numPartitions / scale) {
>>> numPartitions /= scale
>>> val curNumPartitions = numPartitions
>>> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex
>>> {
>>>   (i, iter) => iter.map((i % curNumPartitions, _))
>>> }.reduceByKey(new HashPartitioner(curNumPartitions),
>>> cleanCombOp).values
>>>   }
>>>   partiallyAggregated.reduce(cleanCombOp)
>>>
>>> I am completely lost about what is happening in this function. I would
>>> greatly appreciate some sort of explanation.
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey Reza,

Thanks for your response!

Your response clarifies some of my initial thoughts. However, what I don't
understand is how the depth of the tree is used to identify how many
intermediate reducers there will be, and how many partitions are sent to
the intermediate reducers. Could you provide some insight into this?

Thanks,
Raghav

On Thursday, June 4, 2015, Reza Zadeh  wrote:

> In a regular reduce, all partitions have to send their reduced value to a
> single machine, and that machine can become a bottleneck.
>
> In a treeReduce, the partitions talk to each other in a logarithmic number
> of rounds. Imagine a binary tree that has all the partitions at its leaves
> and the root will contain the final reduced value. This way there is no
> single bottleneck machine.
>
> It remains to decide the number of children each node should have and how
> deep the tree should be, which is some of the logic in the method you
> pasted.
>
> On Wed, Jun 3, 2015 at 7:10 PM, raggy  > wrote:
>
>> I am trying to understand what the treeReduce function for an RDD does,
>> and
>> how it is different from the normal reduce function. My current
>> understanding is that treeReduce tries to split up the reduce into
>> multiple
>> steps. We do a partial reduce on different nodes, and then a final reduce
>> is
>> done to get the final result. Is this correct? If so, I guess what I am
>> curious about is, how does spark decide how many nodes will be on each
>> level, and how many partitions will be sent to a given node?
>>
>> The bulk of the implementation is within this function:
>>
>> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>>   .getOrElse(throw new UnsupportedOperationException("empty
>> collection"))
>>
>> The above function is expanded to
>>
>> val cleanSeqOp = context.clean(seqOp)
>>   val cleanCombOp = context.clean(combOp)
>>   val aggregatePartition =
>> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
>> cleanCombOp)
>>   var partiallyAggregated = mapPartitions(it =>
>> Iterator(aggregatePartition(it)))
>>   var numPartitions = partiallyAggregated.partitions.length
>>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
>> depth)).toInt, 2)
>>   // If creating an extra level doesn't help reduce
>>   // the wall-clock time, we stop tree aggregation.
>>   while (numPartitions > scale + numPartitions / scale) {
>> numPartitions /= scale
>> val curNumPartitions = numPartitions
>> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
>>   (i, iter) => iter.map((i % curNumPartitions, _))
>> }.reduceByKey(new HashPartitioner(curNumPartitions),
>> cleanCombOp).values
>>       }
>>   partiallyAggregated.reduce(cleanCombOp)
>>
>> I am completely lost about what is happening in this function. I would
>> greatly appreciate some sort of explanation.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>
>>
>


Re: TreeReduce Functionality in Spark

2015-06-04 Thread Reza Zadeh
In a regular reduce, all partitions have to send their reduced value to a
single machine, and that machine can become a bottleneck.

In a treeReduce, the partitions talk to each other in a logarithmic number
of rounds. Imagine a binary tree that has all the partitions at its leaves
and the root will contain the final reduced value. This way there is no
single bottleneck machine.

It remains to decide the number of children each node should have and how
deep the tree should be, which is some of the logic in the method you
pasted.

On Wed, Jun 3, 2015 at 7:10 PM, raggy  wrote:

> I am trying to understand what the treeReduce function for an RDD does, and
> how it is different from the normal reduce function. My current
> understanding is that treeReduce tries to split up the reduce into multiple
> steps. We do a partial reduce on different nodes, and then a final reduce
> is
> done to get the final result. Is this correct? If so, I guess what I am
> curious about is, how does spark decide how many nodes will be on each
> level, and how many partitions will be sent to a given node?
>
> The bulk of the implementation is within this function:
>
> partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
>   .getOrElse(throw new UnsupportedOperationException("empty
> collection"))
>
> The above function is expanded to
>
> val cleanSeqOp = context.clean(seqOp)
>   val cleanCombOp = context.clean(combOp)
>   val aggregatePartition =
> (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
> cleanCombOp)
>   var partiallyAggregated = mapPartitions(it =>
> Iterator(aggregatePartition(it)))
>   var numPartitions = partiallyAggregated.partitions.length
>   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
> depth)).toInt, 2)
>   // If creating an extra level doesn't help reduce
>   // the wall-clock time, we stop tree aggregation.
>   while (numPartitions > scale + numPartitions / scale) {
> numPartitions /= scale
> val curNumPartitions = numPartitions
> partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
>   (i, iter) => iter.map((i % curNumPartitions, _))
> }.reduceByKey(new HashPartitioner(curNumPartitions),
> cleanCombOp).values
>   }
>   partiallyAggregated.reduce(cleanCombOp)
>
> I am completely lost about what is happening in this function. I would
> greatly appreciate some sort of explanation.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


TreeReduce Functionality in Spark

2015-06-03 Thread raggy
I am trying to understand what the treeReduce function for an RDD does, and
how it is different from the normal reduce function. My current
understanding is that treeReduce tries to split up the reduce into multiple
steps. We do a partial reduce on different nodes, and then a final reduce is
done to get the final result. Is this correct? If so, I guess what I am
curious about is, how does spark decide how many nodes will be on each
level, and how many partitions will be sent to a given node? 

The bulk of the implementation is within this function:

partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
  .getOrElse(throw new UnsupportedOperationException("empty
collection"))

The above function is expanded to

val cleanSeqOp = context.clean(seqOp)
  val cleanCombOp = context.clean(combOp)
  val aggregatePartition =
(it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp,
cleanCombOp)
  var partiallyAggregated = mapPartitions(it =>
Iterator(aggregatePartition(it)))
  var numPartitions = partiallyAggregated.partitions.length
  val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
depth)).toInt, 2)
  // If creating an extra level doesn't help reduce
  // the wall-clock time, we stop tree aggregation.
  while (numPartitions > scale + numPartitions / scale) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
  (i, iter) => iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions),
cleanCombOp).values
  }
  partiallyAggregated.reduce(cleanCombOp)

I am completely lost about what is happening in this function. I would
greatly appreciate some sort of explanation. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org