Hi

If you asked to any DB developer, s/he would tell you the construct:

select userid,time,state,
rank() over (partition by userId order by time desc) r
from event) where r=1

I am not sure if Dataframe supports it, though I am sure we can extend
functions to implement it.

But here is one not using DF, so we can access to
repartitionAndSortWithinPartitions, which is just the tool we need.

Idea is:
- Partition using userId, making sure we get all records of a userid in one
partition
def customPartitioner(k):
    #partition using user id
    return int(k.split("~")[0])

- Sort data userid and then by time desc
def getSortingKey(k):
    #sort using user id~time
    return k

- Just pick up the first record for each userid
def getFinal(itr):
    prev=(0,0,"NULL")
    for v in itr:
        curr = (v[0].split("~")[0],v[0].split("~")[1],v[1])
        if prev[0]<>curr[0]:
            del prev
            prev=curr
            yield curr
        else:
            del prev
            prev = curr


Lets set up data and test.

data=[[1,20150101,"S1"],[2,20150101,"S2"],[3,20150101,"S3"],
          [1,20150102,"S1C1"],[1,20150112,"S1C2"],[1,20150303,"S1C3"],
          [2,20150105,"S2C1"],[2,20150501,"S2C2"],[3,20150201,"S3C1"]]
    userSchemaRDD = sc.parallelize(data).map(lambda t:
(str(t[0])+"~"+str(t[1]),(t[2])))
    for i in userSchemaRDD.collect():
        print i
    rep = userSchemaRDD.repartitionAndSortWithinPartitions(numPartitions=2,
partitionFunc=customPartitioner, ascending=False, keyfunc=getSortingKey)
    res = rep.mapPartitions(getFinal)
    for j in res.collect():
        print j

Data:
('1~20150101', 'S1')
('2~20150101', 'S2')
('3~20150101', 'S3')
('1~20150102', 'S1C1')
('1~20150112', 'S1C2')
('1~20150303', 'S1C3')
('2~20150105', 'S2C1')
('2~20150501', 'S2C2')
('3~20150201', 'S3C1')
Result
('2', '20150501', 'S2C2')
('3', '20150201', 'S3C1')
('1', '20150303', 'S1C3')

Of course this is rough code, and I really did not tried to get any fancier
than needed, but it should convey the idea.

Kindly let me know if this works (or not :) ) .

To devs, any idea if analytical functions (as they are known in DB world)
are in roadmap?

Best
Ayan

On Sat, May 16, 2015 at 7:49 AM, Justin Yip <yipjus...@prediction.io> wrote:

> Hi Ayan,
>
> I have a DF constructed from the following case class Event:
>
> case class State { attr1: String, ....}
>
> case class Event {
>   userId: String,
>   time: Long,
>   state: State
> }
>
> I would like to generate a DF which contains the latest state of each
> userId. I could have first compute the latest time of each user, and join
> it back to the original data frame. But that involves two shuffles. Hence
> would like to see if there are ways to improve the performance.
>
> Thanks.
>
> Justin
>
>
> On Fri, May 15, 2015 at 6:32 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> can you kindly elaborate on this? it should be possible to write udafs in
>> similar lines of sum/min etc.
>>
>> On Fri, May 15, 2015 at 5:49 AM, Justin Yip <yipjus...@prediction.io>
>> wrote:
>>
>>> Hello,
>>>
>>> May I know if these is way to implement aggregate function for grouped
>>> data in DataFrame? I dug into the doc but didn't find any apart from the
>>> UDF functions which applies on a Row. Maybe I have missed something. Thanks.
>>>
>>> Justin
>>>
>>> ------------------------------
>>> View this message in context: Custom Aggregate Function for DataFrame
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to