You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp",
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Arun,
>
> I want to select the entire row with the max timestamp for each group. I
> have modified my data set below to avoid any confusion.
>
> *Input:*
>
> id | amount     | my_timestamp
> -------------------------------------------
> 1  |      5     |  2018-04-01T01:00:00.000Z
> 1  |     10     |  2018-04-01T01:10:00.000Z
> 1  |      6     |  2018-04-01T01:20:00.000Z
> 2  |     30     |  2018-04-01T01:25:00.000Z
> 2  |     40     |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount     | my_timestamp
> -------------------------------------------
> 1  |     10     |  2018-04-01T01:10:00.000Z
> 2  |     40     |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote:
>
>> Cant the “max” function used here ? Something like..
>>
>> stream.groupBy($"id").max("amount").writeStream.outputMode(“
>> complete”/“update")….
>>
>> Unless the “stream” is already a grouped stream, in which case the above
>> would not work since the support for multiple aggregate operations is not
>> there yet.
>>
>> Thanks,
>> Arun
>>
>> From: kant kodali <kanth...@gmail.com>
>> Date: Tuesday, April 17, 2018 at 11:41 AM
>> To: Tathagata Das <tathagata.das1...@gmail.com>
>> Cc: "user @spark" <user@spark.apache.org>
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Hi TD,
>>
>> Thanks for that. The only reason I ask is I don't see any alternative
>> solution to solve the problem below using raw sql.
>>
>>
>> How to select the max row for every group in spark structured streaming
>> 2.3.0 without using order by since it requires complete mode or
>> mapGroupWithState?
>>
>> *Input:*
>>
>> id | amount     | my_timestamp
>> -------------------------------------------
>> 1  |      5     |  2018-04-01T01:00:00.000Z
>> 1  |     10     |  2018-04-01T01:10:00.000Z
>> 2  |     20     |  2018-04-01T01:20:00.000Z
>> 2  |     30     |  2018-04-01T01:25:00.000Z
>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>
>> *Expected Output:*
>>
>> id | amount     | my_timestamp
>> -------------------------------------------
>> 1  |     10     |  2018-04-01T01:10:00.000Z
>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>
>> Looking for a streaming solution using either raw sql like 
>> sparkSession.sql("sql
>> query") or similar to raw sql but not something like mapGroupWithState
>>
>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>> function. That does not fit in with the SQL language structure.
>>>
>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to