That makes a lot of sense now! I am looking for Tumbling window so my window interval and batch interval is 24 hours. Every day I want to start with a fresh state. Finally, Since you said I need to do book keeping of "last 24 hours" ? Do you mean I need to do this some external store and then compute Max? unless I store history in some external store I am not seeing a way to retrieve all history especially when GroupState.get() seems to return only the most recent updated state but not the entire history.
On Wed, Aug 30, 2017 at 4:09 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > The per-key state S is kept in the memory. It has to be of a type that can > be encoded by Datasets. All you have to do is update S every time the > function is called, and the engine takes care of serializing/checkpointing > the state value, and retrieving the correct version of the value when > restarting from failures. So you explicitly don't have to "store" the state > anywhere, the engine takes care of it under the hood. Internally, there is > an interface called StateStore > <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala>, > which defines a component who is actually responsible for checkpointing the > values, etc. And there is a single implementation > <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala> > of the store that keeps the values in a hashmap and writes all changes to > the values to a HDFS-API-compatible fault-tolerant filesystem for > checkpointing. With this, by default, you really don't have to worry about > externalizing it and you don't have overload any thing in GroupState. You > just use it as the example shows. > > It's important to note that all the state of all the keys is distributed > over the executors. So each executor will have in its memory, a fraction of > the all the train state. Depending on the number of trains, and the amount > of data in the state, you will have to size the cluster and the workers > accordingly. If you keep a lot of state for each train, then your overall > memory requirements are going to increase. So you have to be judicious > about how much data to keep as state data for each key. > > Regarding aggregation vs mapGroupsWithState, it's a trade-off between > efficiency and flexibility. With aggregation, you can do sliding window of > "24 hours" sliding every "1 hour", which will give max in "last 24 hours" > updated every "1 hour". If you are okay with this approximation, then this > is easiest to implement (don't forget setting watermarks) and most > efficient. If you really want something more precise than that, then > mapGroupsWithState is the ultimate flexible tool. However, you have to do > bookkeeping of "last 24 hours" and calculate the max yourself. :) > > Hope this helps. > > On Wed, Aug 30, 2017 at 10:58 AM, kant kodali <kanth...@gmail.com> wrote: > >> I think I understand *groupByKey/**mapGroupsWithState *and I am still >> trying to wrap my head around *GroupState<S>*. so, I believe I have a >> naive questions to ask on *GroupState<S>*. >> >> If I were to represent a state that has history of events (say 24 hours) >> and say the number of events can be big for a given 24 hour period. where >> do I store the state S? An external store like Kafka or a Database or a >> Distributed File system ? I wonder if I can represent the state S using a >> DataSet that represents the history of events? GroupState also has >> .exists() and .get() and if I am not wrong I should override these methods >> right so comparisons and retrieval from external store can work? >> >> Thanks! >> >> >> >> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi TD, >>> >>> Thanks for the explanation and for the clear pseudo code and an example! >>> >>> mapGroupsWithState is cool and looks very flexible however I have few >>> concerns and questions. For example >>> >>> Say I store TrainHistory as max heap from the Java Collections library >>> and I keep adding to to this heap for 24 hours and at some point I will run >>> out of Java heap space right? Do I need to store TrainHistory as a >>> DataSet or DataFrame instead of in memory max heap object from Java >>> Collections library? >>> >>> I wonder between *Nested query* vs *groupByKey/**mapGroupsWithState* >>> which approach is more efficient to solve this particular problem ? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> Aah, I might have misinterpreted. The groupBy + window solution would >>>> give the max time for each train over 24 hours (non-overlapping window) of >>>> event data (timestamped by activity_timestamp). So the output would be >>>> like. >>>> >>>> Train Dest Window(activity_timestamp) max(Time) >>>> 1 HK Aug28-00:00 to Aug29-00:00 10:00 <- updating >>>> currently through aug29 >>>> 1 HK Aug27-00:00 to Aug28-00:00 09:00 <- not updating >>>> as no new updates coming in with activity_timestamp in this range. >>>> >>>> The drawback of this approach is that as soon as Aug28 starts, you have >>>> wait for new event about a train to get a new max(time). You may rather >>>> want a rolling 24 hour period, that is, the max time known over events in >>>> the last 24 hours. >>>> Then maintaining our own custom state using mapGroupsWithState/ >>>> flatMapGroupsWithState() is the best and most flexible option. >>>> It is available in Spark 2.2 in Scala, Java. >>>> >>>> Here is an example that tracks sessions based on events. >>>> Scala - https://github.com/apache/spark/blob/master/examples/src/mai >>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredSe >>>> ssionization.scala >>>> >>>> You will have to create a custom per-train state which keeps track of >>>> last 24 hours of trains history, and use that state to calculate the max >>>> time for each train. >>>> >>>> >>>> def updateHistoryAndGetMax(train: String, events: >>>> Iterator[TrainEvents], state: GroupState[TrainHistory]): Long = { >>>> // for every event, update history (i.e. last 24 hours of events) >>>> and return the max time from the history >>>> } >>>> >>>> trainTimesDataset // Dataset[TrainEvents] >>>> .groupByKey(_.train) >>>> .mapGroupsWithState(updateHistoryAndGetMax) >>>> >>>> Hope this helps. >>>> >>>> >>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote: >>>> >>>>> Hey TD, >>>>> >>>>> If I understood the question correctly, your solution wouldn't return >>>>> the exact solution, since it also groups by on destination. I would say >>>>> the >>>>> easiest solution would be to use flatMapGroupsWithState, where you: >>>>> .groupByKey(_.train) >>>>> >>>>> and keep in state the row with the maximum time. >>>>> >>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das < >>>>> tathagata.das1...@gmail.com> wrote: >>>>> >>>>>> Yes. And in that case, if you just care about only the last few days >>>>>> of max, then you should set watermark on the timestamp column. >>>>>> >>>>>> *trainTimesDataset* >>>>>> * .withWatermark("**activity_timestamp", "5 days")* >>>>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), >>>>>> "train", "dest")* >>>>>> * .max("time")* >>>>>> >>>>>> Any counts which are more than 5 days old will be dropped from the >>>>>> streaming state. >>>>>> >>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thanks for the response. Since this is a streaming based query and >>>>>>> in my case I need to hold state for 24 hours which I forgot to mention >>>>>>> in >>>>>>> my previous email. can I do ? >>>>>>> >>>>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", >>>>>>> "24 hours"), "train", "dest").max("time")* >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das < >>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>> >>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train: >>>>>>>> Int, dest: String, time: Timestamp] * >>>>>>>> >>>>>>>> >>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* >>>>>>>> >>>>>>>> >>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group >>>>>>>> by train, dest"* // after calling >>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)* >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I am wondering what is the easiest and concise way to express the >>>>>>>>> computation below in Spark Structured streaming given that it >>>>>>>>> supports both >>>>>>>>> imperative and declarative styles? >>>>>>>>> I am just trying to select rows that has max timestamp for each >>>>>>>>> train? Instead of doing some sort of nested queries like we normally >>>>>>>>> do in >>>>>>>>> any relational database I am trying to see if I can leverage both >>>>>>>>> imperative and declarative at the same time. If nested queries or >>>>>>>>> join are >>>>>>>>> not required then I would like to see how this can be possible? I am >>>>>>>>> using >>>>>>>>> spark 2.1.1. >>>>>>>>> >>>>>>>>> Dataset >>>>>>>>> >>>>>>>>> Train Dest Time1 HK 10:001 SH >>>>>>>>> 12:001 SZ 14:002 HK 13:002 SH >>>>>>>>> 09:002 SZ 07:00 >>>>>>>>> >>>>>>>>> The desired result should be: >>>>>>>>> >>>>>>>>> Train Dest Time1 SZ 14:002 HK >>>>>>>>> 13:00 >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >