This is not correct. In SQL Land, your query should be like below: select * from ( select Train,DEST,TIME, row_number() over (partition by train order by time desc) r from TrainTable ) x where r=1
All the constructs supported in dataframe functions. On Wed, Aug 30, 2017 at 1:08 PM, kant kodali <kanth...@gmail.com> wrote: > yes in a relational db one could just do this > > SELECT t.Train, t.Dest, r.MaxTimeFROM ( > SELECT Train, MAX(Time) as MaxTime > FROM TrainTable > GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time > = r.MaxTime > > (copied answer from a Stack overflow question someone asked) > > but still thinking how to do this in a streaming setting? > mapGroupWithState looks interesting but its just not available in 2.1.1 If > I am not wrong. > > Thanks! > > On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brk...@gmail.com> wrote: > >> That just gives you the max time for each train. If I understood the >> question correctly, OP wants the whole row with the max time. That's >> generally solved through joins or subqueries, which would be hard to do in >> a streaming setting >> >> On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote: >> >>> Why removing the destination from the window wont work? Like this: >>> >>> *trainTimesDataset* >>> * .withWatermark("**activity_timestamp", "5 days")* >>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")* >>> * .max("time")* >>> >>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> @Burak so how would the transformation or query would look like for the >>>> above example? I don't see flatMapGroupsWithState in the DataSet API >>>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier. >>>> >>>> >>>> >>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Hey TD, >>>>> >>>>> If I understood the question correctly, your solution wouldn't return >>>>> the exact solution, since it also groups by on destination. I would say >>>>> the >>>>> easiest solution would be to use flatMapGroupsWithState, where you: >>>>> .groupByKey(_.train) >>>>> >>>>> and keep in state the row with the maximum time. >>>>> >>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >>>>> tathagata.das1...@gmail.com> wrote: >>>>> >>>>>> Yes. And in that case, if you just care about only the last few days >>>>>> of max, then you should set watermark on the timestamp column. >>>>>> >>>>>> *trainTimesDataset* >>>>>> * .withWatermark("**activity_timestamp", "5 days")* >>>>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>>>> "train", "dest")* >>>>>> * .max("time")* >>>>>> >>>>>> Any counts which are more than 5 days old will be dropped from the >>>>>> streaming state. >>>>>> >>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thanks for the response. Since this is a streaming based query and >>>>>>> in my case I need to hold state for 24 hours which I forgot to mention >>>>>>> in >>>>>>> my previous email. can I do ? >>>>>>> >>>>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", >>>>>>> "24 hours"), "train", "dest").max("time")* >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>> >>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>>>>> Int, dest: String, time: Timestamp] * >>>>>>>> >>>>>>>> >>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>>>>> >>>>>>>> >>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group >>>>>>>> by train, dest"* // after calling >>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I am wondering what is the easiest and concise way to express the >>>>>>>>> computation below in Spark Structured streaming given that it >>>>>>>>> supports both >>>>>>>>> imperative and declarative styles? >>>>>>>>> I am just trying to select rows that has max timestamp for each >>>>>>>>> train? Instead of doing some sort of nested queries like we normally >>>>>>>>> do in >>>>>>>>> any relational database I am trying to see if I can leverage both >>>>>>>>> imperative and declarative at the same time. If nested queries or >>>>>>>>> join are >>>>>>>>> not required then I would like to see how this can be possible? I am >>>>>>>>> using >>>>>>>>> spark 2.1.1. >>>>>>>>> >>>>>>>>> Dataset >>>>>>>>> >>>>>>>>> Train Dest Time1 HK 10:001 SH >>>>>>>>> 12:001 SZ 14:002 HK 13:002 SH >>>>>>>>> 09:002 SZ 07:00 >>>>>>>>> >>>>>>>>> The desired result should be: >>>>>>>>> >>>>>>>>> Train Dest Time1 SZ 14:002 HK >>>>>>>>> 13:00 >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> > -- Best Regards, Ayan Guha