Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Akhil Das
If something is persisted you can easily see them under the Storage tab in
the web ui.

Thanks
Best Regards

On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair RDD
 transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?



Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Akhil, I think Aniket uses the word persisted in a different way than
what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
is preserved.)

I think the answer is no. combineByKey uses AppendOnlyMap, which is a
hashmap. That will shuffle your keys. You can quickly verify it in
spark-shell:

scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
res0: Array[(Int, Int)] = Array((8,1), (7,1))

(The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
number that demonstrates this.)

On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 If something is persisted you can easily see them under the Storage tab in
 the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair RDD
 transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?





Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Aniket Bhatnagar
Thanks Daniel. I can understand that the keys will not be in sorted order
but what I am trying to understanding is whether the functions are passed
values in sorted order in a given partition.

For example:

sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a,
b) = b).collect
res0: Array[(Int, Int)] = Array((1,8))

The fold always given me last value as 8 which suggests values preserve
sorting earlier defined in stage in DAG?

On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?






Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Daniel Darabos
Ah, so I misunderstood you too :).

My reading of org/ apache/spark/Aggregator.scala is that your function will
always see the items in the order that they are in the input RDD. An RDD
partition is always accessed as an iterator, so it will not be read out of
order.

On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar 
aniket.bhatna...@gmail.com wrote:

 Thanks Daniel. I can understand that the keys will not be in sorted order
 but what I am trying to understanding is whether the functions are passed
 values in sorted order in a given partition.

 For example:

 sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a,
 b) = b).collect
 res0: Array[(Int, Int)] = Array((1,8))

 The fold always given me last value as 8 which suggests values preserve
 sorting earlier defined in stage in DAG?

 On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted order?
 What if I had done groupByKey and then combineByKey? What transformations
 can unsort an already sorted data?






Re: Is sorting persisted after pair rdd transformations?

2014-11-19 Thread Aniket Bhatnagar
Thanks Daniel :-). It seems to make sense and something I was hoping for. I
will proceed with this assumption and report back if I see any anomalies.

On Wed Nov 19 2014 at 19:30:02 Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Ah, so I misunderstood you too :).

 My reading of org/ apache/spark/Aggregator.scala is that your function
 will always see the items in the order that they are in the input RDD. An
 RDD partition is always accessed as an iterator, so it will not be read out
 of order.

 On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 Thanks Daniel. I can understand that the keys will not be in sorted order
 but what I am trying to understanding is whether the functions are passed
 values in sorted order in a given partition.

 For example:

 sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t =
 t._2).foldByKey(0)((a, b) = b).collect
 res0: Array[(Int, Int)] = Array((1,8))

 The fold always given me last value as 8 which suggests values preserve
 sorting earlier defined in stage in DAG?

 On Wed Nov 19 2014 at 18:10:11 Daniel Darabos 
 daniel.dara...@lynxanalytics.com wrote:

 Akhil, I think Aniket uses the word persisted in a different way than
 what you mean. I.e. not in the RDD.persist() way. Aniket asks if running
 combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting
 is preserved.)

 I think the answer is no. combineByKey uses AppendOnlyMap, which is a
 hashmap. That will shuffle your keys. You can quickly verify it in
 spark-shell:

 scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect
 res0: Array[(Int, Int)] = Array((8,1), (7,1))

 (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first
 number that demonstrates this.)

 On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 If something is persisted you can easily see them under the Storage tab
 in the web ui.

 Thanks
 Best Regards

 On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar 
 aniket.bhatna...@gmail.com wrote:

 I am trying to figure out if sorting is persisted after applying Pair
 RDD transformations and I am not able to decisively tell after reading the
 documentation.

 For example:
 val numbers = .. // RDD of numbers
 val pairedNumbers = numbers.map(number = (number % 100, number))
 val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
 pairedNumber._2) // Sort by values in the pair
 val aggregates = sortedPairedNumbers.combineByKey(..)

 In this example, will the combine functions see values in sorted
 order? What if I had done groupByKey and then combineByKey? What
 transformations can unsort an already sorted data?







Is sorting persisted after pair rdd transformations?

2014-11-18 Thread Aniket Bhatnagar
I am trying to figure out if sorting is persisted after applying Pair RDD
transformations and I am not able to decisively tell after reading the
documentation.

For example:
val numbers = .. // RDD of numbers
val pairedNumbers = numbers.map(number = (number % 100, number))
val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber =
pairedNumber._2) // Sort by values in the pair
val aggregates = sortedPairedNumbers.combineByKey(..)

In this example, will the combine functions see values in sorted order?
What if I had done groupByKey and then combineByKey? What transformations
can unsort an already sorted data?