Yes. And in that case, if you just care about only the last few days of max, then you should set watermark on the timestamp column.
*trainTimesDataset* * .withWatermark("**activity_timestamp", "5 days")* * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest")* * .max("time")* Any counts which are more than 5 days old will be dropped from the streaming state. On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote: > Hi, > > Thanks for the response. Since this is a streaming based query and in my > case I need to hold state for 24 hours which I forgot to mention in my > previous email. can I do ? > > *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 > hours"), "train", "dest").max("time")* > > > On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >> Int, dest: String, time: Timestamp] * >> >> >> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >> >> >> *SQL*: *"select train, dest, max(time) from trainTimesView group by >> train, dest"* // after calling >> *trainTimesData.createOrReplaceTempView(trainTimesView)* >> >> >> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi All, >>> >>> I am wondering what is the easiest and concise way to express the >>> computation below in Spark Structured streaming given that it supports both >>> imperative and declarative styles? >>> I am just trying to select rows that has max timestamp for each train? >>> Instead of doing some sort of nested queries like we normally do in any >>> relational database I am trying to see if I can leverage both imperative >>> and declarative at the same time. If nested queries or join are not >>> required then I would like to see how this can be possible? I am using >>> spark 2.1.1. >>> >>> Dataset >>> >>> Train Dest Time1 HK 10:001 SH 12:001 >>> SZ 14:002 HK 13:002 SH 09:002 >>> SZ 07:00 >>> >>> The desired result should be: >>> >>> Train Dest Time1 SZ 14:002 HK 13:00 >>> >>> >> >