yes in a relational db one could just do this

SELECT t.Train, t.Dest, r.MaxTimeFROM (
      SELECT Train, MAX(Time) as MaxTime
      FROM TrainTable
      GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND
t.Time = r.MaxTime

(copied answer from a Stack overflow question someone asked)

but still thinking how to do this in a streaming setting? mapGroupWithState
looks interesting but its just not available in 2.1.1 If I am not wrong.

Thanks!

On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brk...@gmail.com> wrote:

> That just gives you the max time for each train. If I understood the
> question correctly, OP wants the whole row with the max time. That's
> generally solved through joins or subqueries, which would be hard to do in
> a streaming setting
>
> On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote:
>
>> Why removing the destination from the window wont work? Like this:
>>
>>  *trainTimesDataset*
>> *  .withWatermark("**activity_timestamp", "5 days")*
>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>> *  .max("time")*
>>
>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> @Burak so how would the transformation or query would look like for the
>>> above example? I don't see flatMapGroupsWithState in the DataSet API
>>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>>
>>>
>>>
>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>
>>>> Hey TD,
>>>>
>>>> If I understood the question correctly, your solution wouldn't return
>>>> the exact solution, since it also groups by on destination. I would say the
>>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>>> .groupByKey(_.train)
>>>>
>>>> and keep in state the row with the maximum time.
>>>>
>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Yes. And in that case, if you just care about only the last few days
>>>>> of max, then you should set watermark on the timestamp column.
>>>>>
>>>>>  *trainTimesDataset*
>>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>>> "train", "dest")*
>>>>> *  .max("time")*
>>>>>
>>>>> Any counts which are more than 5 days old will be dropped from the
>>>>> streaming state.
>>>>>
>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the response. Since this is a streaming based query and in
>>>>>> my case I need to hold state for 24 hours which I forgot to mention in my
>>>>>> previous email. can I do ?
>>>>>>
>>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>
>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>>
>>>>>>>
>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>>
>>>>>>>
>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>>>>> train, dest"*    // after calling
>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>>>> computation below in Spark Structured streaming given that it supports 
>>>>>>>> both
>>>>>>>> imperative and declarative styles?
>>>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>>>> train? Instead of doing some sort of nested queries like we normally 
>>>>>>>> do in
>>>>>>>> any relational database I am trying to see if I can leverage both
>>>>>>>> imperative and declarative at the same time. If nested queries or join 
>>>>>>>> are
>>>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>>>> using
>>>>>>>> spark 2.1.1.
>>>>>>>>
>>>>>>>> Dataset
>>>>>>>>
>>>>>>>> Train    Dest      Time1        HK        10:001        SH        
>>>>>>>> 12:001        SZ        14:002        HK        13:002        SH       
>>>>>>>>  09:002        SZ        07:00
>>>>>>>>
>>>>>>>> The desired result should be:
>>>>>>>>
>>>>>>>> Train    Dest      Time1        SZ        14:002        HK        13:00
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>

Reply via email to