I note that one of the listing variants of aggregateByKey accepts a
partitioner as an argument:

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V)
⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Would it be possible to extract my sorted parent's partitioner and pass
that into aggregateByKey on the re-keyed data being aggregated?

On Thu, Oct 16, 2014 at 12:01 PM, Michael Misiewicz <mmisiew...@gmail.com>
wrote:

> Thanks for the suggestion! That does look really helpful, I see what you
> mean about it being more general than fold. I think I will replace my fold
> with aggregate - it should give me more control over the process.
>
> I think the problem will still exist though - which is that I can't get
> the correct partitioning I need. When I change my key to user_id, I lose
> the timestamp partitioning. My problem is that I'm trying to retain a
> parent RDD's partitioning in an RDD that no longer has the same keys as its
> parent.
>
> On Thu, Oct 16, 2014 at 11:46 AM, Cheng Lian <lian.cs....@gmail.com>
> wrote:
>
>>  Hi Michael,
>>
>> I'm not sure I fully understood your question, but I think RDD.aggregate
>> can be helpful in your case. You can see it as a more general version of
>> fold.
>>
>> Cheng
>>
>>
>>
>> On 10/16/14 11:15 PM, Michael Misiewicz wrote:
>>
>> Hi,
>>
>>  I'm working on a problem where I'd like to sum items in an RDD *in
>> order (*approximately*)*. I am currently trying to implement this using
>> a fold, but I'm having some issues because the sorting key of my data is
>> not the same as the folding key for my data. I have data that looks like
>> this:
>>
>>  user_id, transaction_timestamp, transaction_amount
>>
>>  And I'm interested in doing a foldByKey on user_id to sum transaction
>> amounts - taking care to note approximately when a user surpasses a total
>> transaction threshold. I'm using RangePartitioner to make sure that data
>> is ordered sequentially between partitions, and I'd also make sure that
>> data is sorted within partitions, though I'm not sure how to do this
>> exactly (I was going to look at the code for sortByKey to figure this
>> out - I believe sorting in place in a mapPartitions should work). What
>> do you think about the approach? Here's some sample code that demonstrates
>> what I'm thinking:
>>
>>  def myFold(V1:Float, V2:Float) : Float = {
>>  val partialSum = V1 + V2
>>  if (partialSum >= 500) {
>>  // make a note of it, do things
>>  }
>>  return partialSum
>> }
>>
>>  val rawData = sc.textFile("hdfs://path/to/data").map{ x => // load data
>>  l = x.split()
>>  (l(0).toLong, l(1).toLong, l(2).toFloat) // user_id:long,
>> transaction_timestamp:long, transaction_amount:float
>>  }
>>  val keyByTimestamp = rawData.map(x=> (x._2, (x._1, x._3))) // rearrange
>> to make timestamp the key (for sorting), convert to PairRDD
>> val sortedByTimestamp = keyByTimestamp.sortByKey()
>> val partitionedByTimestamp = sortedByTimestamp.partitionBy(
>>  new org.apache.spark.RangePartitioner(partitions=500,
>> rdd=sortedByTimestamp)).persist()
>> // By this point, the RDD should be sorted and partitioned according to
>> the timestamp. However, I need to now make user_id the key,
>> // because the output must be per user. At this point, since I change the
>> keys of the PairRDD, I understand that I lose the partitioning
>> // the consequence of this is that I can no longer be sure in my fold
>> function that the ordering is retained.
>>
>>  val keyByUser = partitionedByTimestamp.map(x => (x._2._1, x._2._2))
>> val finalResult = keyByUser.foldByKey(zeroValue=0)(myFold)
>> finalResult.saveAsTextFile("hdfs://...")
>>
>>  The problem as you'd expect takes place in the folding function, after
>> I've re-arranged my RDD to no longer be keyed by timestamp (when I produce
>> keyByUser, I lose the correct partitioning). As I've read in the
>> documentation, partitioning is not preserved when keys are changed (which
>> makes sense).
>>
>>  Reading this thread:
>> https://groups.google.com/forum/#!topic/spark-users/Fx7DNtWiSx4 it
>> appears that one possible solution might be to subclass RDD (à la
>> MappedValuesRDD) to define my own RDD that retains the partitions of its
>> parent. This seems simple enough, but I've never done anything like that
>> before, but I'm not sure where to start. I'm also willing to write my own
>> custom partitioner class, but it appears that the getPartition method
>> only accepts a "key" argument - and since the value I need to partition on
>> in the final step (the timestamp) would be in the Value, my
>> partitioner class doesn't have the data it needs to make the right
>> decision. I cannot have timestamp in my key.
>>
>>  Alternatively, has anyone else encountered a problem like this (i.e. an
>> approximately ordered sum) and did they find a good solution? Does my
>> approach of subclassing RDD make sense? Would there be some way to
>> finagle a custom partitioner into making this work? Perhaps this might be a
>> job for some other tool, like spark streaming?
>>
>>  Thanks,
>> Michael
>>
>>
>>
>

Reply via email to