This is not correct. In SQL Land, your query should be like below:

select * from (
select Train,DEST,TIME, row_number() over (partition by train order by time
desc) r
from TrainTable
) x where r=1

All the constructs supported in dataframe functions.

On Wed, Aug 30, 2017 at 1:08 PM, kant kodali <kanth...@gmail.com> wrote:

> yes in a relational db one could just do this
>
> SELECT t.Train, t.Dest, r.MaxTimeFROM (
>       SELECT Train, MAX(Time) as MaxTime
>       FROM TrainTable
>       GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time 
> = r.MaxTime
>
> (copied answer from a Stack overflow question someone asked)
>
> but still thinking how to do this in a streaming setting?
> mapGroupWithState looks interesting but its just not available in 2.1.1 If
> I am not wrong.
>
> Thanks!
>
> On Tue, Aug 29, 2017 at 7:50 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> That just gives you the max time for each train. If I understood the
>> question correctly, OP wants the whole row with the max time. That's
>> generally solved through joins or subqueries, which would be hard to do in
>> a streaming setting
>>
>> On Aug 29, 2017 7:29 PM, "ayan guha" <guha.a...@gmail.com> wrote:
>>
>>> Why removing the destination from the window wont work? Like this:
>>>
>>>  *trainTimesDataset*
>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
>>> *  .max("time")*
>>>
>>> On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> @Burak so how would the transformation or query would look like for the
>>>> above example? I don't see flatMapGroupsWithState in the DataSet API
>>>> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>>>>
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>>>
>>>>> Hey TD,
>>>>>
>>>>> If I understood the question correctly, your solution wouldn't return
>>>>> the exact solution, since it also groups by on destination. I would say 
>>>>> the
>>>>> easiest solution would be to use flatMapGroupsWithState, where you:
>>>>> .groupByKey(_.train)
>>>>>
>>>>> and keep in state the row with the maximum time.
>>>>>
>>>>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Yes. And in that case, if you just care about only the last few days
>>>>>> of max, then you should set watermark on the timestamp column.
>>>>>>
>>>>>>  *trainTimesDataset*
>>>>>> *  .withWatermark("**activity_timestamp", "5 days")*
>>>>>> *  .groupBy(window(activity_timestamp, "24 hours", "24 hours"),
>>>>>> "train", "dest")*
>>>>>> *  .max("time")*
>>>>>>
>>>>>> Any counts which are more than 5 days old will be dropped from the
>>>>>> streaming state.
>>>>>>
>>>>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for the response. Since this is a streaming based query and
>>>>>>> in my case I need to hold state for 24 hours which I forgot to mention 
>>>>>>> in
>>>>>>> my previous email. can I do ?
>>>>>>>
>>>>>>>  *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours",
>>>>>>> "24 hours"), "train", "dest").max("time")*
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>>>>> Int, dest: String, time: Timestamp] *
>>>>>>>>
>>>>>>>>
>>>>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>>>>
>>>>>>>>
>>>>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group
>>>>>>>> by train, dest"*    // after calling
>>>>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <kanth...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>>>>> computation below in Spark Structured streaming given that it 
>>>>>>>>> supports both
>>>>>>>>> imperative and declarative styles?
>>>>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>>>>> train? Instead of doing some sort of nested queries like we normally 
>>>>>>>>> do in
>>>>>>>>> any relational database I am trying to see if I can leverage both
>>>>>>>>> imperative and declarative at the same time. If nested queries or 
>>>>>>>>> join are
>>>>>>>>> not required then I would like to see how this can be possible? I am 
>>>>>>>>> using
>>>>>>>>> spark 2.1.1.
>>>>>>>>>
>>>>>>>>> Dataset
>>>>>>>>>
>>>>>>>>> Train    Dest      Time1        HK        10:001        SH        
>>>>>>>>> 12:001        SZ        14:002        HK        13:002        SH      
>>>>>>>>>   09:002        SZ        07:00
>>>>>>>>>
>>>>>>>>> The desired result should be:
>>>>>>>>>
>>>>>>>>> Train    Dest      Time1        SZ        14:002        HK        
>>>>>>>>> 13:00
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to