My goal is for rows to be partitioned according to timestamp bins (e.g. with each partition representing an even interval of time), and then ordered by timestamp *within* each partition. Ordering by user ID is not important. In my aggregate function, in the seqOp function, I am checking to verify this fact, and seeing that rows within partitions are not in order - but I think this should be very easy to solve with mapPartitons( preservesPartitioning=True) prior to aggregate(), which should maintain the evenly spaced ranges produced by RangePartitioner.
On Fri, Oct 17, 2014 at 11:04 AM, Cheng Lian <lian.cs....@gmail.com> wrote: > Hm, a little confused here. What exactly the ordering do you expect? It > seems that you want all the elements in the RDD to be sorted first by > timestamp and then by user_id. If this is true, then you can simply do this: > > rawData.map { case (time, user, amount) => (time, user) -> amount > }.sortByKey.aggregate(…) > > On 10/17/14 10:44 PM, Michael Misiewicz wrote: > > Thank you for sharing this Cheng! This is fantastic. I was able to > implement it and it seems like it's working quite well. I'm definitely on > the right track now! > > I'm still having a small problem with the rows inside each partition > being out of order - but I suspect this is because in the code currently, I > sortByKey, then use RangePartitioner (which I think does not maintain row > order within each partition - due to the shuffle in RangePartitioner). I > suspect I can work around this by doing operations in these order: > > - RangePartitioner > - mapValues to sort each partition in memory, maintaining partitioning > - aggregate > > Michael > > On Thu, Oct 16, 2014 at 12:35 PM, Cheng Lian <lian.cs....@gmail.com> > wrote: > >> RDD.aggregate doesn’t require the RDD elements to be pairs, so you >> don’t need to use user_id to be the key or the RDD. For example, you can >> use an empty Map as the zero value of the aggregation. The key of the Map >> is the user_id you extracted from each tuple, and the value is the >> aggregated value. >> >> keyByTimestamp.aggregate(Map.empty[String, Float].withDefaultValue(0.0))({ >> (agg, rec) => >> val (time, (user, amount)) = rec >> agg.updated(user, agg(user) + amount) >> }, { (lhs, rhs) => >> lhs.keys.foldLeft(rhs) { (combined, user) => >> combined.updated(user, lhs(user) + rhs(user)) >> } >> }) >> >> Of course, you may use mutable Map for optimized performance. One thing >> to notice, foldByKey is a transformation, while aggregate is an action. The >> final result of the code above is a single Map object rather than an RDD. >> If this map can be very large (say you have billions of users), then >> aggregate may OOM. >> >> On 10/17/14 12:01 AM, Michael Misiewicz wrote: >> >> Thanks for the suggestion! That does look really helpful, I see what you >> mean about it being more general than fold. I think I will replace my fold >> with aggregate - it should give me more control over the process. >> >> I think the problem will still exist though - which is that I can't get >> the correct partitioning I need. When I change my key to user_id, I lose >> the timestamp partitioning. My problem is that I'm trying to retain a >> parent RDD's partitioning in an RDD that no longer has the same keys as its >> parent. >> >> On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com> >> wrote: >> >>> Hi Michael, >>> >>> I'm not sure I fully understood your question, but I think RDD.aggregate >>> can be helpful in your case. You can see it as a more general version of >>> fold. >>> >>> Cheng >>> >>> >>> >>> On 10/16/14 11:15 PM, Michael Misiewicz wrote: >>> >>> Hi, >>> >>> I'm working on a problem where I'd like to sum items in an RDD *in >>> order (*approximately*)*. I am currently trying to implement this using >>> a fold, but I'm having some issues because the sorting key of my data is >>> not the same as the folding key for my data. I have data that looks like >>> this: >>> >>> user_id, transaction_timestamp, transaction_amount >>> >>> And I'm interested in doing a foldByKey on user_id to sum transaction >>> amounts - taking care to note approximately when a user surpasses a total >>> transaction threshold. I'm using RangePartitioner to make sure that >>> data is ordered sequentially between partitions, and I'd also make sure >>> that data is sorted within partitions, though I'm not sure how to do this >>> exactly (I was going to look at the code for sortByKey to figure this >>> out - I believe sorting in place in a mapPartitions should work). What >>> do you think about the approach? Here's some sample code that demonstrates >>> what I'm thinking: >>> >>> def myFold(V1:Float, V2:Float) : Float = { >>> val partialSum = V1 + V2 >>> if (partialSum >= 500) { >>> // make a note of it, do things >>> } >>> return partialSum >>> } >>> >>> val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data >>> l = x.split() >>> (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long, >>> transaction_timestamp:long, transaction_amount:float >>> } >>> val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) // >>> rearrange to make timestamp the key (for sorting), convert to PairRDD >>> val sortedByTimestamp = keyByTimestamp.sortByKey() >>> val partitionedByTimestamp = sortedByTimestamp.partitionBy( >>> new org.apache.spark.RangePartitioner(partitions=500, >>> rdd=sortedByTimestamp)).persist() >>> // By this point, the RDD should be sorted and partitioned according to >>> the timestamp. However, I need to now make user_id the key, >>> // because the output must be per user. At this point, since I change >>> the keys of the PairRDD, I understand that I lose the partitioning >>> // the consequence of this is that I can no longer be sure in my fold >>> function that the ordering is retained. >>> >>> val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2)) >>> val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold) >>> finalResult.saveAsTextFile("hdfs://...") >>> >>> The problem as you'd expect takes place in the folding function, after >>> I've re-arranged my RDD to no longer be keyed by timestamp (when I produce >>> keyByUser, I lose the correct partitioning). As I've read in the >>> documentation, partitioning is not preserved when keys are changed (which >>> makes sense). >>> >>> Reading this thread: >>> https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it >>> appears that one possible solution might be to subclass RDD (à la >>> MappedValuesRDD) to define my own RDD that retains the partitions of >>> its parent. This seems simple enough, but I've never done anything like >>> that before, but I'm not sure where to start. I'm also willing to write my >>> own custom partitioner class, but it appears that the getPartition >>> method only accepts a "key" argument - and since the value I need to >>> partition on in the final step (the timestamp) would be in the Value, my >>> partitioner class doesn't have the data it needs to make the right >>> decision. I cannot have timestamp in my key. >>> >>> Alternatively, has anyone else encountered a problem like this (i.e. >>> an approximately ordered sum) and did they find a good solution? Does my >>> approach of subclassing RDD make sense? Would there be some way to >>> finagle a custom partitioner into making this work? Perhaps this might be a >>> job for some other tool, like spark streaming? >>> >>> Thanks, >>> Michael >>> >>> >>> >> >> > > >