Re: Is sorting persisted after pair rdd transformations?
If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Ah, so I misunderstood you too :). My reading of org/ apache/spark/Aggregator.scala is that your function will always see the items in the order that they are in the input RDD. An RDD partition is always accessed as an iterator, so it will not be read out of order. On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Re: Is sorting persisted after pair rdd transformations?
Thanks Daniel :-). It seems to make sense and something I was hoping for. I will proceed with this assumption and report back if I see any anomalies. On Wed Nov 19 2014 at 19:30:02 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Ah, so I misunderstood you too :). My reading of org/ apache/spark/Aggregator.scala is that your function will always see the items in the order that they are in the input RDD. An RDD partition is always accessed as an iterator, so it will not be read out of order. On Wed, Nov 19, 2014 at 2:28 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: Thanks Daniel. I can understand that the keys will not be in sorted order but what I am trying to understanding is whether the functions are passed values in sorted order in a given partition. For example: sc.parallelize(1 to 8).map(i = (1, i)).sortBy(t = t._2).foldByKey(0)((a, b) = b).collect res0: Array[(Int, Int)] = Array((1,8)) The fold always given me last value as 8 which suggests values preserve sorting earlier defined in stage in DAG? On Wed Nov 19 2014 at 18:10:11 Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Akhil, I think Aniket uses the word persisted in a different way than what you mean. I.e. not in the RDD.persist() way. Aniket asks if running combineByKey on a sorted RDD will result in a sorted RDD. (I.e. the sorting is preserved.) I think the answer is no. combineByKey uses AppendOnlyMap, which is a hashmap. That will shuffle your keys. You can quickly verify it in spark-shell: scala sc.parallelize(7 to 8).map(_ - 1).reduceByKey(_ + _).collect res0: Array[(Int, Int)] = Array((8,1), (7,1)) (The initial size of the AppendOnlyMap seems to be 8, so 8 is the first number that demonstrates this.) On Wed, Nov 19, 2014 at 9:05 AM, Akhil Das ak...@sigmoidanalytics.com wrote: If something is persisted you can easily see them under the Storage tab in the web ui. Thanks Best Regards On Tue, Nov 18, 2014 at 7:26 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?
Is sorting persisted after pair rdd transformations?
I am trying to figure out if sorting is persisted after applying Pair RDD transformations and I am not able to decisively tell after reading the documentation. For example: val numbers = .. // RDD of numbers val pairedNumbers = numbers.map(number = (number % 100, number)) val sortedPairedNumbers = pairedNumbers.sortBy(pairedNumber = pairedNumber._2) // Sort by values in the pair val aggregates = sortedPairedNumbers.combineByKey(..) In this example, will the combine functions see values in sorted order? What if I had done groupByKey and then combineByKey? What transformations can unsort an already sorted data?