Can I do sub queries using DataSet or DataFrame API's but not raw sql unless it is a final resort?
On Tue, Aug 29, 2017 at 8:47 PM, ayan guha <guha.a...@gmail.com> wrote: > This is not correct. In SQL Land, your query should be like below: > > select * from ( > select Train,DEST,TIME, row_number() over (partition by train order by > time desc) r > from TrainTable > ) x where r=1 > > All the constructs supported in dataframe functions. > > On Wed, Aug 30, 2017 at 1:08 PM, kant kodali <kanth...@gmail.com> wrote: > >> yes in a relational db one could just do this >> >> SELECT t.Train, t.Dest, r.MaxTimeFROM ( >> SELECT Train, MAX(Time) as MaxTime >> FROM TrainTable >> GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND >> t.Time = r.MaxTime >> >> (copied answer from a Stack overflow question someone asked) >> >> but still thinking how to do this in a streaming setting? >> mapGroupWithState looks interesting but its just not available in 2.1.1 If >> I am not wrong. >> >> Thanks! >> >> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> That just gives you the max time for each train. If I understood the >>> question correctly, OP wants the whole row with the max time. That's >>> generally solved through joins or subqueries, which would be hard to do in >>> a streaming setting >>> >>> On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote: >>> >>>> Why removing the destination from the window wont work? Like this: >>>> >>>> *trainTimesDataset* >>>> * .withWatermark("**activity_timestamp", "5 days")* >>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>> "train")* >>>> * .max("time")* >>>> >>>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> @Burak so how would the transformation or query would look like for >>>>> the above example? I don't see flatMapGroupsWithState in the DataSet >>>>> API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life >>>>> easier. >>>>> >>>>> >>>>> >>>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: >>>>> >>>>>> Hey TD, >>>>>> >>>>>> If I understood the question correctly, your solution wouldn't return >>>>>> the exact solution, since it also groups by on destination. I would say >>>>>> the >>>>>> easiest solution would be to use flatMapGroupsWithState, where you: >>>>>> .groupByKey(_.train) >>>>>> >>>>>> and keep in state the row with the maximum time. >>>>>> >>>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >>>>>> tathagata.das1...@gmail.com> wrote: >>>>>> >>>>>>> Yes. And in that case, if you just care about only the last few days >>>>>>> of max, then you should set watermark on the timestamp column. >>>>>>> >>>>>>> *trainTimesDataset* >>>>>>> * .withWatermark("**activity_timestamp", "5 days")* >>>>>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>>>>> "train", "dest")* >>>>>>> * .max("time")* >>>>>>> >>>>>>> Any counts which are more than 5 days old will be dropped from the >>>>>>> streaming state. >>>>>>> >>>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thanks for the response. Since this is a streaming based query and >>>>>>>> in my case I need to hold state for 24 hours which I forgot to mention >>>>>>>> in >>>>>>>> my previous email. can I do ? >>>>>>>> >>>>>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", >>>>>>>> "24 hours"), "train", "dest").max("time")* >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>>>>>> Int, dest: String, time: Timestamp] * >>>>>>>>> >>>>>>>>> >>>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>>>>>> >>>>>>>>> >>>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group >>>>>>>>> by train, dest"* // after calling >>>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I am wondering what is the easiest and concise way to express the >>>>>>>>>> computation below in Spark Structured streaming given that it >>>>>>>>>> supports both >>>>>>>>>> imperative and declarative styles? >>>>>>>>>> I am just trying to select rows that has max timestamp for each >>>>>>>>>> train? Instead of doing some sort of nested queries like we normally >>>>>>>>>> do in >>>>>>>>>> any relational database I am trying to see if I can leverage both >>>>>>>>>> imperative and declarative at the same time. If nested queries or >>>>>>>>>> join are >>>>>>>>>> not required then I would like to see how this can be possible? I am >>>>>>>>>> using >>>>>>>>>> spark 2.1.1. >>>>>>>>>> >>>>>>>>>> Dataset >>>>>>>>>> >>>>>>>>>> Train Dest Time1 HK 10:001 SH >>>>>>>>>> 12:001 SZ 14:002 HK 13:002 SH >>>>>>>>>> 09:002 SZ 07:00 >>>>>>>>>> >>>>>>>>>> The desired result should be: >>>>>>>>>> >>>>>>>>>> Train Dest Time1 SZ 14:002 HK >>>>>>>>>> 13:00 >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >> > > > -- > Best Regards, > Ayan Guha >