Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
For the first round, you will have 16 reducers working since you have
32 partitions. Two of 32 partitions will know which reducer they will
go by sharing the same key using reduceByKey.

After this step is done, you will have 16 partitions, so the next
round will be 8 reducers.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 12:06 PM, Raghav Shankar raghav0110...@gmail.com wrote:
 Hey DB,

 Thanks for the reply!

 I still don't think this answers my question. For example, if I have a top()
 action being executed and I have 32 workers(32 partitions), and I choose a
 depth of 4, what does the overlay of intermediate reducers look like? How
 many reducers are there excluding the master and the worker? How many
 partitions get sent to each of these intermediate reducers? Does this number
 vary at each level?

 Thanks!


 On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote:

 By default, the depth of the tree is 2. Each partition will be one node.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com
 wrote:
  Hey Reza,
 
  Thanks for your response!
 
  Your response clarifies some of my initial thoughts. However, what I
  don't
  understand is how the depth of the tree is used to identify how many
  intermediate reducers there will be, and how many partitions are sent to
  the
  intermediate reducers. Could you provide some insight into this?
 
  Thanks,
  Raghav
 
  On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:
 
  In a regular reduce, all partitions have to send their reduced value to
  a
  single machine, and that machine can become a bottleneck.
 
  In a treeReduce, the partitions talk to each other in a logarithmic
  number
  of rounds. Imagine a binary tree that has all the partitions at its
  leaves
  and the root will contain the final reduced value. This way there is no
  single bottleneck machine.
 
  It remains to decide the number of children each node should have and
  how
  deep the tree should be, which is some of the logic in the method you
  pasted.
 
  On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote:
 
  I am trying to understand what the treeReduce function for an RDD
  does,
  and
  how it is different from the normal reduce function. My current
  understanding is that treeReduce tries to split up the reduce into
  multiple
  steps. We do a partial reduce on different nodes, and then a final
  reduce
  is
  done to get the final result. Is this correct? If so, I guess what I
  am
  curious about is, how does spark decide how many nodes will be on each
  level, and how many partitions will be sent to a given node?
 
  The bulk of the implementation is within this function:
 
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException(empty
  collection))
 
  The above function is expanded to
 
  val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
  cleanCombOp)
var partiallyAggregated = mapPartitions(it =
  Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
  depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions  scale + numPartitions / scale) {
  numPartitions /= scale
  val curNumPartitions = numPartitions
  partiallyAggregated =
  partiallyAggregated.mapPartitionsWithIndex
  {
(i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions),
  cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
 
  I am completely lost about what is happening in this function. I would
  greatly appreciate some sort of explanation.
 
 
 
 
  --
  View this message in context:
 
  http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
  Sent from the Apache Spark User List mailing list archive at
  Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TreeReduce Functionality in Spark

2015-06-04 Thread DB Tsai
By default, the depth of the tree is 2. Each partition will be one node.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com wrote:
 Hey Reza,

 Thanks for your response!

 Your response clarifies some of my initial thoughts. However, what I don't
 understand is how the depth of the tree is used to identify how many
 intermediate reducers there will be, and how many partitions are sent to the
 intermediate reducers. Could you provide some insight into this?

 Thanks,
 Raghav

 On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:

 In a regular reduce, all partitions have to send their reduced value to a
 single machine, and that machine can become a bottleneck.

 In a treeReduce, the partitions talk to each other in a logarithmic number
 of rounds. Imagine a binary tree that has all the partitions at its leaves
 and the root will contain the final reduced value. This way there is no
 single bottleneck machine.

 It remains to decide the number of children each node should have and how
 deep the tree should be, which is some of the logic in the method you
 pasted.

 On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com wrote:

 I am trying to understand what the treeReduce function for an RDD does,
 and
 how it is different from the normal reduce function. My current
 understanding is that treeReduce tries to split up the reduce into
 multiple
 steps. We do a partial reduce on different nodes, and then a final reduce
 is
 done to get the final result. Is this correct? If so, I guess what I am
 curious about is, how does spark decide how many nodes will be on each
 level, and how many partitions will be sent to a given node?

 The bulk of the implementation is within this function:

 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
   .getOrElse(throw new UnsupportedOperationException(empty
 collection))

 The above function is expanded to

 val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
 (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
 cleanCombOp)
   var partiallyAggregated = mapPartitions(it =
 Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
 depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.
   while (numPartitions  scale + numPartitions / scale) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex
 {
   (i, iter) = iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions),
 cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)

 I am completely lost about what is happening in this function. I would
 greatly appreciate some sort of explanation.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey Reza,

Thanks for your response!

Your response clarifies some of my initial thoughts. However, what I don't
understand is how the depth of the tree is used to identify how many
intermediate reducers there will be, and how many partitions are sent to
the intermediate reducers. Could you provide some insight into this?

Thanks,
Raghav

On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com wrote:

 In a regular reduce, all partitions have to send their reduced value to a
 single machine, and that machine can become a bottleneck.

 In a treeReduce, the partitions talk to each other in a logarithmic number
 of rounds. Imagine a binary tree that has all the partitions at its leaves
 and the root will contain the final reduced value. This way there is no
 single bottleneck machine.

 It remains to decide the number of children each node should have and how
 deep the tree should be, which is some of the logic in the method you
 pasted.

 On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:_e(%7B%7D,'cvml','raghav0110...@gmail.com'); wrote:

 I am trying to understand what the treeReduce function for an RDD does,
 and
 how it is different from the normal reduce function. My current
 understanding is that treeReduce tries to split up the reduce into
 multiple
 steps. We do a partial reduce on different nodes, and then a final reduce
 is
 done to get the final result. Is this correct? If so, I guess what I am
 curious about is, how does spark decide how many nodes will be on each
 level, and how many partitions will be sent to a given node?

 The bulk of the implementation is within this function:

 partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
   .getOrElse(throw new UnsupportedOperationException(empty
 collection))

 The above function is expanded to

 val cleanSeqOp = context.clean(seqOp)
   val cleanCombOp = context.clean(combOp)
   val aggregatePartition =
 (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
 cleanCombOp)
   var partiallyAggregated = mapPartitions(it =
 Iterator(aggregatePartition(it)))
   var numPartitions = partiallyAggregated.partitions.length
   val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
 depth)).toInt, 2)
   // If creating an extra level doesn't help reduce
   // the wall-clock time, we stop tree aggregation.
   while (numPartitions  scale + numPartitions / scale) {
 numPartitions /= scale
 val curNumPartitions = numPartitions
 partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {
   (i, iter) = iter.map((i % curNumPartitions, _))
 }.reduceByKey(new HashPartitioner(curNumPartitions),
 cleanCombOp).values
   }
   partiallyAggregated.reduce(cleanCombOp)

 I am completely lost about what is happening in this function. I would
 greatly appreciate some sort of explanation.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-unsubscr...@spark.apache.org');
 For additional commands, e-mail: user-h...@spark.apache.org
 javascript:_e(%7B%7D,'cvml','user-h...@spark.apache.org');





Re: TreeReduce Functionality in Spark

2015-06-04 Thread Raghav Shankar
Hey DB,

Thanks for the reply!

I still don't think this answers my question. For example, if I have a
top() action being executed and I have 32 workers(32 partitions), and I
choose a depth of 4, what does the overlay of intermediate reducers look
like? How many reducers are there excluding the master and the worker? How
many partitions get sent to each of these intermediate reducers? Does this
number vary at each level?

Thanks!

On Thursday, June 4, 2015, DB Tsai dbt...@dbtsai.com wrote:

 By default, the depth of the tree is 2. Each partition will be one node.

 Sincerely,

 DB Tsai
 ---
 Blog: https://www.dbtsai.com


 On Thu, Jun 4, 2015 at 10:46 AM, Raghav Shankar raghav0110...@gmail.com
 javascript:; wrote:
  Hey Reza,
 
  Thanks for your response!
 
  Your response clarifies some of my initial thoughts. However, what I
 don't
  understand is how the depth of the tree is used to identify how many
  intermediate reducers there will be, and how many partitions are sent to
 the
  intermediate reducers. Could you provide some insight into this?
 
  Thanks,
  Raghav
 
  On Thursday, June 4, 2015, Reza Zadeh r...@databricks.com
 javascript:; wrote:
 
  In a regular reduce, all partitions have to send their reduced value to
 a
  single machine, and that machine can become a bottleneck.
 
  In a treeReduce, the partitions talk to each other in a logarithmic
 number
  of rounds. Imagine a binary tree that has all the partitions at its
 leaves
  and the root will contain the final reduced value. This way there is no
  single bottleneck machine.
 
  It remains to decide the number of children each node should have and
 how
  deep the tree should be, which is some of the logic in the method you
  pasted.
 
  On Wed, Jun 3, 2015 at 7:10 PM, raggy raghav0110...@gmail.com
 javascript:; wrote:
 
  I am trying to understand what the treeReduce function for an RDD does,
  and
  how it is different from the normal reduce function. My current
  understanding is that treeReduce tries to split up the reduce into
  multiple
  steps. We do a partial reduce on different nodes, and then a final
 reduce
  is
  done to get the final result. Is this correct? If so, I guess what I am
  curious about is, how does spark decide how many nodes will be on each
  level, and how many partitions will be sent to a given node?
 
  The bulk of the implementation is within this function:
 
  partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException(empty
  collection))
 
  The above function is expanded to
 
  val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition =
  (it: Iterator[T]) = it.aggregate(zeroValue)(cleanSeqOp,
  cleanCombOp)
var partiallyAggregated = mapPartitions(it =
  Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.length
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 /
  depth)).toInt, 2)
// If creating an extra level doesn't help reduce
// the wall-clock time, we stop tree aggregation.
while (numPartitions  scale + numPartitions / scale) {
  numPartitions /= scale
  val curNumPartitions = numPartitions
  partiallyAggregated =
 partiallyAggregated.mapPartitionsWithIndex
  {
(i, iter) = iter.map((i % curNumPartitions, _))
  }.reduceByKey(new HashPartitioner(curNumPartitions),
  cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
 
  I am completely lost about what is happening in this function. I would
  greatly appreciate some sort of explanation.
 
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/TreeReduce-Functionality-in-Spark-tp23147.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 javascript:;
  For additional commands, e-mail: user-h...@spark.apache.org
 javascript:;