yes in a relational db one could just do this SELECT t.Train, t.Dest, r.MaxTimeFROM ( SELECT Train, MAX(Time) as MaxTime FROM TrainTable GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time = r.MaxTime
(copied answer from a Stack overflow question someone asked) but still thinking how to do this in a streaming setting? mapGroupWithState looks interesting but its just not available in 2.1.1 If I am not wrong. Thanks! On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brk...@gmail.com> wrote: > That just gives you the max time for each train. If I understood the > question correctly, OP wants the whole row with the max time. That's > generally solved through joins or subqueries, which would be hard to do in > a streaming setting > > On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote: > >> Why removing the destination from the window wont work? Like this: >> >> *trainTimesDataset* >> * .withWatermark("**activity_timestamp", "5 days")* >> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")* >> * .max("time")* >> >> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com> wrote: >> >>> @Burak so how would the transformation or query would look like for the >>> above example? I don't see flatMapGroupsWithState in the DataSet API >>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier. >>> >>> >>> >>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: >>> >>>> Hey TD, >>>> >>>> If I understood the question correctly, your solution wouldn't return >>>> the exact solution, since it also groups by on destination. I would say the >>>> easiest solution would be to use flatMapGroupsWithState, where you: >>>> .groupByKey(_.train) >>>> >>>> and keep in state the row with the maximum time. >>>> >>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Yes. And in that case, if you just care about only the last few days >>>>> of max, then you should set watermark on the timestamp column. >>>>> >>>>> *trainTimesDataset* >>>>> * .withWatermark("**activity_timestamp", "5 days")* >>>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>>> "train", "dest")* >>>>> * .max("time")* >>>>> >>>>> Any counts which are more than 5 days old will be dropped from the >>>>> streaming state. >>>>> >>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the response. Since this is a streaming based query and in >>>>>> my case I need to hold state for 24 hours which I forgot to mention in my >>>>>> previous email. can I do ? >>>>>> >>>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", >>>>>> "24 hours"), "train", "dest").max("time")* >>>>>> >>>>>> >>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>>>> tathagata.das1...@gmail.com> wrote: >>>>>> >>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>>>> Int, dest: String, time: Timestamp] * >>>>>>> >>>>>>> >>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>>>> >>>>>>> >>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by >>>>>>> train, dest"* // after calling >>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> >>>>>>>> I am wondering what is the easiest and concise way to express the >>>>>>>> computation below in Spark Structured streaming given that it supports >>>>>>>> both >>>>>>>> imperative and declarative styles? >>>>>>>> I am just trying to select rows that has max timestamp for each >>>>>>>> train? Instead of doing some sort of nested queries like we normally >>>>>>>> do in >>>>>>>> any relational database I am trying to see if I can leverage both >>>>>>>> imperative and declarative at the same time. If nested queries or join >>>>>>>> are >>>>>>>> not required then I would like to see how this can be possible? I am >>>>>>>> using >>>>>>>> spark 2.1.1. >>>>>>>> >>>>>>>> Dataset >>>>>>>> >>>>>>>> Train Dest Time1 HK 10:001 SH >>>>>>>> 12:001 SZ 14:002 HK 13:002 SH >>>>>>>> 09:002 SZ 07:00 >>>>>>>> >>>>>>>> The desired result should be: >>>>>>>> >>>>>>>> Train Dest Time1 SZ 14:002 HK 13:00 >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> >