Hey TD, If I understood the question correctly, your solution wouldn't return the exact solution, since it also groups by on destination. I would say the easiest solution would be to use flatMapGroupsWithState, where you: .groupByKey(_.train)
and keep in state the row with the maximum time. On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Yes. And in that case, if you just care about only the last few days of > max, then you should set watermark on the timestamp column. > > *trainTimesDataset* > * .withWatermark("**activity_timestamp", "5 days")* > * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", > "dest")* > * .max("time")* > > Any counts which are more than 5 days old will be dropped from the > streaming state. > > On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi, >> >> Thanks for the response. Since this is a streaming based query and in my >> case I need to hold state for 24 hours which I forgot to mention in my >> previous email. can I do ? >> >> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 >> hours"), "train", "dest").max("time")* >> >> >> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>> Int, dest: String, time: Timestamp] * >>> >>> >>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>> >>> >>> *SQL*: *"select train, dest, max(time) from trainTimesView group by >>> train, dest"* // after calling >>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>> >>> >>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> >>>> I am wondering what is the easiest and concise way to express the >>>> computation below in Spark Structured streaming given that it supports both >>>> imperative and declarative styles? >>>> I am just trying to select rows that has max timestamp for each train? >>>> Instead of doing some sort of nested queries like we normally do in any >>>> relational database I am trying to see if I can leverage both imperative >>>> and declarative at the same time. If nested queries or join are not >>>> required then I would like to see how this can be possible? I am using >>>> spark 2.1.1. >>>> >>>> Dataset >>>> >>>> Train Dest Time1 HK 10:001 SH 12:001 >>>> SZ 14:002 HK 13:002 SH 09:002 >>>> SZ 07:00 >>>> >>>> The desired result should be: >>>> >>>> Train Dest Time1 SZ 14:002 HK 13:00 >>>> >>>> >>> >> >