Hi TD,

Thanks for the explanation and for the clear pseudo code and an example!

mapGroupsWithState is cool and looks very flexible however I have few
concerns and questions. For example

Say I store TrainHistory as max heap from the Java Collections library and
I keep adding to to this heap for 24 hours and at some point I will run out
of Java heap space right? Do I need to store TrainHistory as a DataSet or
DataFrame instead of in memory max heap object from Java Collections library
?

I wonder between *Nested query* vs  *groupByKey/**mapGroupsWithState* which
approach is more efficient to solve this particular problem ?

Thanks!





On Tue, Aug 29, 2017 at 9:50 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Aah, I might have misinterpreted. The groupBy + window solution would give
> the max time for each train over 24 hours (non-overlapping window) of event
> data (timestamped by activity_timestamp). So the output would be like.
>
> Train     Dest   Window(activity_timestamp)    max(Time)
> 1         HK     Aug28-00:00 to Aug29-00:00    10:00    <- updating
> currently through aug29
> 1         HK    Aug27-00:00 to Aug28-00:00     09:00    <- not updating as
> no new updates coming in with activity_timestamp in this range.
>
> The drawback of this approach is that as soon as Aug28 starts, you have
> wait for new event about a train to get a new max(time). You may rather
> want a rolling 24 hour period, that is, the max time known over events in
> the last 24 hours.
> Then maintaining our own custom state using 
> mapGroupsWithState/flatMapGroupsWithState()
> is the best and most flexible option.
> It is available in Spark 2.2 in Scala, Java.
>
> Here is an example that tracks sessions based on events.
> Scala - https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/sql/streaming/
> StructuredSessionization.scala
>
> You will have to create a custom per-train state which keeps track of last
> 24 hours of trains history, and use that state to calculate the max time
> for each train.
>
>
> def updateHistoryAndGetMax(train: String, events: Iterator[TrainEvents],
> state: GroupState[TrainHistory]): Long = {
>     // for every event, update history (i.e. last 24 hours of events) and
> return the max time from the history
> }
>
> trainTimesDataset     // Dataset[TrainEvents]
>   .groupByKey(_.train)
>   .mapGroupsWithState(updateHistoryAndGetMax)
>
> Hope this helps.
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> *  .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the response. Since this is a streaming based query and in
>>>> my case I need to hold state for 24 hours which I forgot to mention in my
>>>> previous email. can I do ?
>>>>
>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>>> hours"), "train", "dest").max("time")*
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>> Int, dest: String, time: Timestamp] *
>>>>>
>>>>>
>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>
>>>>>
>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>>> train, dest"*    // after calling
>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>> computation below in Spark Structured streaming given that it supports 
>>>>>> both
>>>>>> imperative and declarative styles?
>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>> train? Instead of doing some sort of nested queries like we normally do 
>>>>>> in
>>>>>> any relational database I am trying to see if I can leverage both
>>>>>> imperative and declarative at the same time. If nested queries or join 
>>>>>> are
>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>> using
>>>>>> spark 2.1.1.
>>>>>>
>>>>>> Dataset
>>>>>>
>>>>>> Train    Dest      Time1        HK        10:001        SH        12:001 
>>>>>>        SZ        14:002        HK        13:002        SH        09:002  
>>>>>>       SZ        07:00
>>>>>>
>>>>>> The desired result should be:
>>>>>>
>>>>>> Train    Dest      Time1        SZ        14:002        HK        13:00
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to