Why removing the destination from the window wont work? Like this:
*trainTimesDataset*
* .withWatermark("**activity_timestamp", "5 days")*
* .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")*
* .max("time")*
On Wed, Aug 30, 2017 at 10:38 AM, kant kodali <[email protected]> wrote:
> @Burak so how would the transformation or query would look like for the
> above example? I don't see flatMapGroupsWithState in the DataSet API
> Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier.
>
>
>
> On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz <[email protected]> wrote:
>
>> Hey TD,
>>
>> If I understood the question correctly, your solution wouldn't return the
>> exact solution, since it also groups by on destination. I would say the
>> easiest solution would be to use flatMapGroupsWithState, where you:
>> .groupByKey(_.train)
>>
>> and keep in state the row with the maximum time.
>>
>> On Tue, Aug 29, 2017 at 5:18 PM, Tathagata Das <
>> [email protected]> wrote:
>>
>>> Yes. And in that case, if you just care about only the last few days of
>>> max, then you should set watermark on the timestamp column.
>>>
>>> *trainTimesDataset*
>>> * .withWatermark("**activity_timestamp", "5 days")*
>>> * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train",
>>> "dest")*
>>> * .max("time")*
>>>
>>> Any counts which are more than 5 days old will be dropped from the
>>> streaming state.
>>>
>>> On Tue, Aug 29, 2017 at 2:06 PM, kant kodali <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> Thanks for the response. Since this is a streaming based query and in
>>>> my case I need to hold state for 24 hours which I forgot to mention in my
>>>> previous email. can I do ?
>>>>
>>>> *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24
>>>> hours"), "train", "dest").max("time")*
>>>>
>>>>
>>>> On Tue, Aug 29, 2017 at 1:38 PM, Tathagata Das <
>>>> [email protected]> wrote:
>>>>
>>>>> Say, *trainTimesDataset* is the streaming Dataset of schema *[train:
>>>>> Int, dest: String, time: Timestamp] *
>>>>>
>>>>>
>>>>> *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")*
>>>>>
>>>>>
>>>>> *SQL*: *"select train, dest, max(time) from trainTimesView group by
>>>>> train, dest"* // after calling
>>>>> *trainTimesData.createOrReplaceTempView(trainTimesView)*
>>>>>
>>>>>
>>>>> On Tue, Aug 29, 2017 at 12:59 PM, kant kodali <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am wondering what is the easiest and concise way to express the
>>>>>> computation below in Spark Structured streaming given that it supports
>>>>>> both
>>>>>> imperative and declarative styles?
>>>>>> I am just trying to select rows that has max timestamp for each
>>>>>> train? Instead of doing some sort of nested queries like we normally do
>>>>>> in
>>>>>> any relational database I am trying to see if I can leverage both
>>>>>> imperative and declarative at the same time. If nested queries or join
>>>>>> are
>>>>>> not required then I would like to see how this can be possible? I am
>>>>>> using
>>>>>> spark 2.1.1.
>>>>>>
>>>>>> Dataset
>>>>>>
>>>>>> Train Dest Time1 HK 10:001 SH 12:001
>>>>>> SZ 14:002 HK 13:002 SH 09:002
>>>>>> SZ 07:00
>>>>>>
>>>>>> The desired result should be:
>>>>>>
>>>>>> Train Dest Time1 SZ 14:002 HK 13:00
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
--
Best Regards,
Ayan Guha