Hi TD, Thanks for the explanation and for the clear pseudo code and an example!
mapGroupsWithState is cool and looks very flexible however I have few concerns and questions. For example Say I store TrainHistory as max heap from the Java Collections library and I keep adding to to this heap for 24 hours and at some point I will run out of Java heap space right? Do I need to store TrainHistory as a DataSet or DataFrame instead of in memory max heap object from Java Collections library ? I wonder between *Nested query* vs *groupByKey/**mapGroupsWithState* which approach is more efficient to solve this particular problem ? Thanks! On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Aah, I might have misinterpreted. The groupBy + window solution would give > the max time for each train over 24 hours (non-overlapping window) of event > data (timestamped by activity_timestamp). So the output would be like. > > Train Dest Window(activity_timestamp) max(Time) > 1 HK Aug28-00:00 to Aug29-00:00 10:00 <- updating > currently through aug29 > 1 HK Aug27-00:00 to Aug28-00:00 09:00 <- not updating as > no new updates coming in with activity_timestamp in this range. > > The drawback of this approach is that as soon as Aug28 starts, you have > wait for new event about a train to get a new max(time). You may rather > want a rolling 24 hour period, that is, the max time known over events in > the last 24 hours. > Then maintaining our own custom state using > mapGroupsWithState/flatMapGroupsWithState() > is the best and most flexible option. > It is available in Spark 2.2 in Scala, Java. > > Here is an example that tracks sessions based on events. > Scala - https://github.com/apache/spark/blob/master/examples/ > src/main/scala/org/apache/spark/examples/sql/streaming/ > StructuredSessionization.scala > > You will have to create a custom per-train state which keeps track of last > 24 hours of trains history, and use that state to calculate the max time > for each train. > > > def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents], > state: GroupState[TrainHistory]): Long = { > // for every event, update history (i.e. last 24 hours of events) and > return the max time from the history > } > > trainTimesDataset // Dataset[TrainEvents] > .groupByKey(_.train) > .mapGroupsWithState(updateHistoryAndGetMax) > > Hope this helps. > > > On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: > >> Hey TD, >> >> If I understood the question correctly, your solution wouldn't return the >> exact solution, since it also groups by on destination. I would say the >> easiest solution would be to use flatMapGroupsWithState, where you: >> .groupByKey(_.train) >> >> and keep in state the row with the maximum time. >> >> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Yes. And in that case, if you just care about only the last few days of >>> max, then you should set watermark on the timestamp column. >>> >>> *trainTimesDataset* >>> * .withWatermark("**activity_timestamp", "5 days")* >>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", >>> "dest")* >>> * .max("time")* >>> >>> Any counts which are more than 5 days old will be dropped from the >>> streaming state. >>> >>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Thanks for the response. Since this is a streaming based query and in >>>> my case I need to hold state for 24 hours which I forgot to mention in my >>>> previous email. can I do ? >>>> >>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 >>>> hours"), "train", "dest").max("time")* >>>> >>>> >>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>> Int, dest: String, time: Timestamp] * >>>>> >>>>> >>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>> >>>>> >>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by >>>>> train, dest"* // after calling >>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>> >>>>> >>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I am wondering what is the easiest and concise way to express the >>>>>> computation below in Spark Structured streaming given that it supports >>>>>> both >>>>>> imperative and declarative styles? >>>>>> I am just trying to select rows that has max timestamp for each >>>>>> train? Instead of doing some sort of nested queries like we normally do >>>>>> in >>>>>> any relational database I am trying to see if I can leverage both >>>>>> imperative and declarative at the same time. If nested queries or join >>>>>> are >>>>>> not required then I would like to see how this can be possible? I am >>>>>> using >>>>>> spark 2.1.1. >>>>>> >>>>>> Dataset >>>>>> >>>>>> Train Dest Time1 HK 10:001 SH 12:001 >>>>>> SZ 14:002 HK 13:002 SH 09:002 >>>>>> SZ 07:00 >>>>>> >>>>>> The desired result should be: >>>>>> >>>>>> Train Dest Time1 SZ 14:002 HK 13:00 >>>>>> >>>>>> >>>>> >>>> >>> >> >