That makes a lot of sense now! I am looking for Tumbling window so my
window interval and batch interval is 24 hours. Every day I want to start
with a fresh state.
Finally, Since you said I need to do book keeping of "last 24 hours" ? Do
you mean I need to do this some external store and then compute Max? unless
I store history in some external store I am not seeing a way to retrieve
all history especially when GroupState.get() seems to return only the most
recent updated state but not the entire history.


On Wed, Aug 30, 2017 at 4:09 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> The per-key state S is kept in the memory. It has to be of a type that can
> be encoded by Datasets. All you have to do is update S every time the
> function is called, and the engine takes care of serializing/checkpointing
> the state value, and retrieving the correct version of the value when
> restarting from failures. So you explicitly don't have to "store" the state
> anywhere, the engine takes care of it under the hood. Internally, there is
> an interface called StateStore
> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala>,
> which defines a component who is actually responsible for checkpointing the
> values, etc. And there is a single implementation
> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala>
> of the store that keeps the values in a hashmap and writes all changes to
> the values to a HDFS-API-compatible fault-tolerant filesystem for
> checkpointing. With this, by default, you really don't have to worry about
> externalizing it and you don't have overload any thing in GroupState. You
> just use it as the example shows.
>
> It's important to note that all the state of all the keys is distributed
> over the executors. So each executor will have in its memory, a fraction of
> the all the train state. Depending on the number of trains, and the amount
> of data in the state, you will have to size the cluster and the workers
> accordingly. If you keep a lot of state for each train, then your overall
> memory requirements are going to increase. So you have to be judicious
> about how much data to keep as state data for each key.
>
> Regarding aggregation vs mapGroupsWithState, it's a trade-off between
> efficiency and flexibility. With aggregation, you can do sliding window of
> "24 hours" sliding every "1 hour", which will give max in "last 24 hours"
> updated every "1 hour". If you are okay with this approximation, then this
> is easiest to implement (don't forget setting watermarks) and most
> efficient. If you really want something more precise than that, then
> mapGroupsWithState is the ultimate flexible tool. However, you have to do
> bookkeeping of "last 24 hours" and calculate the max yourself. :)
>
> Hope this helps.
>
> On Wed, Aug 30, 2017 at 10:58 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> I think I understand *groupByKey/**mapGroupsWithState *and I am still
>> trying to wrap my head around *GroupState<S>*. so, I believe I have a
>> naive questions to ask on *GroupState<S>*.
>>
>> If I were to represent a state that has history of events (say 24 hours)
>> and say the number of events can be big for a given 24 hour period. where
>> do I store the state S? An external store like Kafka or a Database or a
>> Distributed File system ? I wonder if I can represent the state S using a
>> DataSet that represents the history of events? GroupState also has
>> .exists() and  .get() and if I am not wrong I should override these methods
>> right so comparisons and retrieval from external store can work?
>>
>> Thanks!
>>
>>
>>
>> On Wed, Aug 30, 2017 at 1:39 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi TD,
>>>
>>> Thanks for the explanation and for the clear pseudo code and an example!
>>>
>>> mapGroupsWithState is cool and looks very flexible however I have few
>>> concerns and questions. For example
>>>
>>> Say I store TrainHistory as max heap from the Java Collections library
>>> and I keep adding to to this heap for 24 hours and at some point I will run
>>> out of Java heap space right? Do I need to store TrainHistory as a
>>> DataSet or DataFrame instead of in memory max heap object from Java
>>> Collections library?
>>>
>>> I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState*
>>> which approach is more efficient to solve this particular problem ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Aah, I might have misinterpreted. The groupBy + window solution would
>>>> give the max time for each train over 24 hours (non-overlapping window) of
>>>> event data (timestamped by activity_timestamp). So the output would be
>>>> like.
>>>>
>>>> Train     Dest   Window(activity_timestamp)    max(Time)
>>>> 1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating
>>>> currently through aug29
>>>> 1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating
>>>> as no new updates coming in with activity_timestamp in this range.
>>>>
>>>> The drawback of this approach is that as soon as Aug28 starts, you have
>>>> wait for new event about a train to get a new max(time). You may rather
>>>> want a rolling 24 hour period, that is, the max time known over events in
>>>> the last 24 hours.
>>>> Then maintaining our own custom state using mapGroupsWithState/
>>>> flatMapGroupsWithState() is the best and most flexible option.
>>>> It is available in Spark 2.2 in Scala, Java.
>>>>
>>>> Here is an example that tracks sessions based on events.
>>>> Scala - https://github.com/apache/spark/blob/master/examples/src/mai
>>>> n/scala/org/apache/spark/examples/sql/streaming/StructuredSe
>>>> ssionization.scala
>>>>
>>>> You will have to create a custom per-train state which keeps track of
>>>> last 24 hours of trains history, and use that state to calculate the max
>>>> time for each train.
>>>>
>>>>
>>>> def updateHistoryAndGetMax(train: String, events:
>>>> Iterator[TrainEvents], state: GroupState[TrainHistory]): Long = {
>>>>     // for every event, update history (i.e. last 24 hours of events)
>>>> and return the max time from the history
>>>> }
>>>>
>>>> trainTimesDataset     // Dataset[TrainEvents]
>>>>   .groupByKey(_.train)
>>>>   .mapGroupsWithState(updateHistoryAndGetMax)
>>>>
>>>> Hope this helps.
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Hey TD,
>>>>>
>>>>> If I understood the question correctly, your solution wouldn't return
>>>>> the exact solution, since it also groups by on destination. I would say 
>>>>> the
>>>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>>>> .groupByKey(_.train)
>>>>>
>>>>> and keep in state the row with the maximum time.
>>>>>
>>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Yes. And in that case, if you just care about only the last few days
>>>>>> of max, then you should set watermark on the timestamp column.
>>>>>>
>>>>>>  *trainTimesDataset*
>>>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>>>> "train", "dest")*
>>>>>> *  .max("time")*
>>>>>>
>>>>>> Any counts which are more than 5 days old will be dropped from the
>>>>>> streaming state.
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for the response. Since this is a streaming based query and
>>>>>>> in my case I need to hold state for 24 hours which I forgot to mention 
>>>>>>> in
>>>>>>> my previous email. can I do ?
>>>>>>>
>>>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>>>
>>>>>>>>
>>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>>>
>>>>>>>>
>>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group
>>>>>>>> by train, dest"*    // after calling
>>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>>>>> computation below in Spark Structured streaming given that it 
>>>>>>>>> supports both
>>>>>>>>> imperative and declarative styles?
>>>>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>>>>> train? Instead of doing some sort of nested queries like we normally 
>>>>>>>>> do in
>>>>>>>>> any relational database I am trying to see if I can leverage both
>>>>>>>>> imperative and declarative at the same time. If nested queries or 
>>>>>>>>> join are
>>>>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>>>>> using
>>>>>>>>> spark 2.1.1.
>>>>>>>>>
>>>>>>>>> Dataset
>>>>>>>>>
>>>>>>>>> Train    Dest      Time1        HK        10:001        SH        
>>>>>>>>> 12:001        SZ        14:002        HK        13:002        SH      
>>>>>>>>>   09:002        SZ        07:00
>>>>>>>>>
>>>>>>>>> The desired result should be:
>>>>>>>>>
>>>>>>>>> Train    Dest      Time1        SZ        14:002        HK        
>>>>>>>>> 13:00
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to