The below expr might work:

df.groupBy($"id").agg(max(struct($"amount", 
$"my_timestamp")).as("data")).select($"id", $"data.*")

Thanks,
Arun

From:  Jungtaek Lim <[email protected]>
Date:  Wednesday, April 18, 2018 at 4:54 PM
To:  Michael Armbrust <[email protected]>
Cc:  kant kodali <[email protected]>, Arun Iyer <[email protected]>, Tathagata 
Das <[email protected]>, "user @spark" <[email protected]>
Subject:  Re: can we use mapGroupsWithState in raw sql?

Thanks Michael for providing great solution. Great to remove UDAF and any needs 
to provide fields manually. 

Btw, your code has compilation error. ')' is missing, and after I fix it, it 
complains again with other issue.

<console>:66: error: overloaded method value max with alternatives:
  (columnName: String)org.apache.spark.sql.Column <and>
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
 cannot be applied to (org.apache.spark.sql.ColumnName, 
org.apache.spark.sql.Column)

Could you check your code to see it works with Spark 2.3 (via spark-shell or 
whatever)?

Thanks!
Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <[email protected]>님이 작성:
You can calculate argmax using a struct.

df.groupBy($"id").agg(max($"my_timestamp", 
struct($"*").as("data")).getField("data").select($"data.*")

You could transcode this to SQL, it'll just be complicated nested queries.


On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <[email protected]> wrote:
Hi Arun, 

I want to select the entire row with the max timestamp for each group. I have 
modified my data set below to avoid any confusion.

Input:
id | amount     | my_timestamp
-------------------------------------------
1  |      5     |  2018-04-01T01:00:00.000Z
1  |     10     |  2018-04-01T01:10:00.000Z
1  |      6     |  2018-04-01T01:20:00.000Z
2  |     30     |  2018-04-01T01:25:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount     | my_timestamp
-------------------------------------------
1  |     10     |  2018-04-01T01:10:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <[email protected]> wrote:
Cant the “max” function used here ? Something like..

stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
 

Unless the “stream” is already a grouped stream, in which case the above would 
not work since the support for multiple aggregate operations is not there yet.

Thanks,
Arun

From: kant kodali <[email protected]>
Date: Tuesday, April 17, 2018 at 11:41 AM
To: Tathagata Das <[email protected]>
Cc: "user @spark" <[email protected]>
Subject: Re: can we use mapGroupsWithState in raw sql?

Hi TD, 

Thanks for that. The only reason I ask is I don't see any alternative solution 
to solve the problem below using raw sql.


How to select the max row for every group in spark structured streaming 2.3.0 
without using order by since it requires complete mode or mapGroupWithState?

Input:
id | amount     | my_timestamp
-------------------------------------------
1  |      5     |  2018-04-01T01:00:00.000Z
1  |     10     |  2018-04-01T01:10:00.000Z
2  |     20     |  2018-04-01T01:20:00.000Z
2  |     30     |  2018-04-01T01:25:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z
Expected Output:
id | amount     | my_timestamp
-------------------------------------------
1  |     10     |  2018-04-01T01:10:00.000Z
2  |     40     |  2018-04-01T01:30:00.000Z
Looking for a streaming solution using either raw sql like 
sparkSession.sql("sql query") or similar to raw sql but not something like 
mapGroupWithState


On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <[email protected]> 
wrote:
Unfortunately no. Honestly it does not make sense as for type-aware operations 
like map, mapGroups, etc., you have to provide an actual JVM function. That 
does not fit in with the SQL language structure.

On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <[email protected]> wrote:
Hi All, 

can we use mapGroupsWithState in raw SQL? or is it in the roadmap?

Thanks!







Reply via email to