Thank you for sharing this Cheng! This is fantastic. I was able to implement it and it seems like it's working quite well. I'm definitely on the right track now!
I'm still having a small problem with the rows inside each partition being out of order - but I suspect this is because in the code currently, I sortByKey, then use RangePartitioner (which I think does not maintain row order within each partition - due to the shuffle in RangePartitioner). I suspect I can work around this by doing operations in these order: - RangePartitioner - mapValues to sort each partition in memory, maintaining partitioning - aggregate Michael On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian <lian.cs....@gmail.com> wrote: > RDD.aggregate doesn’t require the RDD elements to be pairs, so you don’t > need to use user_id to be the key or the RDD. For example, you can use an > empty Map as the zero value of the aggregation. The key of the Map is the > user_id you extracted from each tuple, and the value is the aggregated > value. > > keyByTimestamp.aggregate(Map.empty[String, Float].withDefaultValue(0.0))({ > (agg, rec) => > val (time, (user, amount)) = rec > agg.updated(user, agg(user) + amount) > }, { (lhs, rhs) => > lhs.keys.foldLeft(rhs) { (combined, user) => > combined.updated(user, lhs(user) + rhs(user)) > } > }) > > Of course, you may use mutable Map for optimized performance. One thing to > notice, foldByKey is a transformation, while aggregate is an action. The > final result of the code above is a single Map object rather than an RDD. > If this map can be very large (say you have billions of users), then > aggregate may OOM. > > On 10/17/14 12:01 AM, Michael Misiewicz wrote: > > Thanks for the suggestion! That does look really helpful, I see what > you mean about it being more general than fold. I think I will replace my > fold with aggregate - it should give me more control over the process. > > I think the problem will still exist though - which is that I can't get > the correct partitioning I need. When I change my key to user_id, I lose > the timestamp partitioning. My problem is that I'm trying to retain a > parent RDD's partitioning in an RDD that no longer has the same keys as its > parent. > > On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com> > wrote: > >> Hi Michael, >> >> I'm not sure I fully understood your question, but I think RDD.aggregate >> can be helpful in your case. You can see it as a more general version of >> fold. >> >> Cheng >> >> >> >> On 10/16/14 11:15 PM, Michael Misiewicz wrote: >> >> Hi, >> >> I'm working on a problem where I'd like to sum items in an RDD *in >> order (*approximately*)*. I am currently trying to implement this using >> a fold, but I'm having some issues because the sorting key of my data is >> not the same as the folding key for my data. I have data that looks like >> this: >> >> user_id, transaction_timestamp, transaction_amount >> >> And I'm interested in doing a foldByKey on user_id to sum transaction >> amounts - taking care to note approximately when a user surpasses a total >> transaction threshold. I'm using RangePartitioner to make sure that data >> is ordered sequentially between partitions, and I'd also make sure that >> data is sorted within partitions, though I'm not sure how to do this >> exactly (I was going to look at the code for sortByKey to figure this >> out - I believe sorting in place in a mapPartitions should work). What >> do you think about the approach? Here's some sample code that demonstrates >> what I'm thinking: >> >> def myFold(V1:Float, V2:Float) : Float = { >> val partialSum = V1 + V2 >> if (partialSum >= 500) { >> // make a note of it, do things >> } >> return partialSum >> } >> >> val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data >> l = x.split() >> (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, >> transaction_timestamp:long, transaction_amount:float >> } >> val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) // rearrange >> to make timestamp the key (for sorting), convert to PairRDD >> val sortedByTimestamp = keyByTimestamp.sortByKey() >> val partitionedByTimestamp = sortedByTimestamp.partitionBy( >> new org.apache.spark.RangePartitioner(partitions=500, >> rdd=sortedByTimestamp)).persist() >> // By this point, the RDD should be sorted and partitioned according to >> the timestamp. However, I need to now make user_id the key, >> // because the output must be per user. At this point, since I change the >> keys of the PairRDD, I understand that I lose the partitioning >> // the consequence of this is that I can no longer be sure in my fold >> function that the ordering is retained. >> >> val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2)) >> val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) >> finalResult.saveAsTextFile("hdfs://...") >> >> The problem as you'd expect takes place in the folding function, after >> I've re-arranged my RDD to no longer be keyed by timestamp (when I produce >> keyByUser, I lose the correct partitioning). As I've read in the >> documentation, partitioning is not preserved when keys are changed (which >> makes sense). >> >> Reading this thread: >> https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it >> appears that one possible solution might be to subclass RDD (à la >> MappedValuesRDD) to define my own RDD that retains the partitions of its >> parent. This seems simple enough, but I've never done anything like that >> before, but I'm not sure where to start. I'm also willing to write my own >> custom partitioner class, but it appears that the getPartition method >> only accepts a "key" argument - and since the value I need to partition on >> in the final step (the timestamp) would be in the Value, my >> partitioner class doesn't have the data it needs to make the right >> decision. I cannot have timestamp in my key. >> >> Alternatively, has anyone else encountered a problem like this (i.e. an >> approximately ordered sum) and did they find a good solution? Does my >> approach of subclassing RDD make sense? Would there be some way to >> finagle a custom partitioner into making this work? Perhaps this might be a >> job for some other tool, like spark streaming? >> >> Thanks, >> Michael >> >> >> > >