Hi If you asked to any DB developer, s/he would tell you the construct:
select userid,time,state, rank() over (partition by userId order by time desc) r from event) where r=1 I am not sure if Dataframe supports it, though I am sure we can extend functions to implement it. But here is one not using DF, so we can access to repartitionAndSortWithinPartitions, which is just the tool we need. Idea is: - Partition using userId, making sure we get all records of a userid in one partition def customPartitioner(k): #partition using user id return int(k.split("~")[0]) - Sort data userid and then by time desc def getSortingKey(k): #sort using user id~time return k - Just pick up the first record for each userid def getFinal(itr): prev=(0,0,"NULL") for v in itr: curr = (v[0].split("~")[0],v[0].split("~")[1],v[1]) if prev[0]<>curr[0]: del prev prev=curr yield curr else: del prev prev = curr Lets set up data and test. data=[[1,20150101,"S1"],[2,20150101,"S2"],[3,20150101,"S3"], [1,20150102,"S1C1"],[1,20150112,"S1C2"],[1,20150303,"S1C3"], [2,20150105,"S2C1"],[2,20150501,"S2C2"],[3,20150201,"S3C1"]] userSchemaRDD = sc.parallelize(data).map(lambda t: (str(t[0])+"~"+str(t[1]),(t[2]))) for i in userSchemaRDD.collect(): print i rep = userSchemaRDD.repartitionAndSortWithinPartitions(numPartitions=2, partitionFunc=customPartitioner, ascending=False, keyfunc=getSortingKey) res = rep.mapPartitions(getFinal) for j in res.collect(): print j Data: ('1~20150101', 'S1') ('2~20150101', 'S2') ('3~20150101', 'S3') ('1~20150102', 'S1C1') ('1~20150112', 'S1C2') ('1~20150303', 'S1C3') ('2~20150105', 'S2C1') ('2~20150501', 'S2C2') ('3~20150201', 'S3C1') Result ('2', '20150501', 'S2C2') ('3', '20150201', 'S3C1') ('1', '20150303', 'S1C3') Of course this is rough code, and I really did not tried to get any fancier than needed, but it should convey the idea. Kindly let me know if this works (or not :) ) . To devs, any idea if analytical functions (as they are known in DB world) are in roadmap? Best Ayan On Sat, May 16, 2015 at 7:49 AM, Justin Yip <yipjus...@prediction.io> wrote: > Hi Ayan, > > I have a DF constructed from the following case class Event: > > case class State { attr1: String, ....} > > case class Event { > userId: String, > time: Long, > state: State > } > > I would like to generate a DF which contains the latest state of each > userId. I could have first compute the latest time of each user, and join > it back to the original data frame. But that involves two shuffles. Hence > would like to see if there are ways to improve the performance. > > Thanks. > > Justin > > > On Fri, May 15, 2015 at 6:32 AM, ayan guha <guha.a...@gmail.com> wrote: > >> can you kindly elaborate on this? it should be possible to write udafs in >> similar lines of sum/min etc. >> >> On Fri, May 15, 2015 at 5:49 AM, Justin Yip <yipjus...@prediction.io> >> wrote: >> >>> Hello, >>> >>> May I know if these is way to implement aggregate function for grouped >>> data in DataFrame? I dug into the doc but didn't find any apart from the >>> UDF functions which applies on a Row. Maybe I have missed something. Thanks. >>> >>> Justin >>> >>> ------------------------------ >>> View this message in context: Custom Aggregate Function for DataFrame >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Aggregate-Function-for-DataFrame-tp22893.html> >>> Sent from the Apache Spark User List mailing list archive >>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>> >> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > -- Best Regards, Ayan Guha