Thanks Michael for providing great solution. Great to remove UDAF and any needs to provide fields manually.
Btw, your code has compilation error. ')' is missing, and after I fix it, it complains again with other issue. <console>:66: error: overloaded method value max with alternatives: (columnName: String)org.apache.spark.sql.Column <and> (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성: > You can calculate argmax using a struct. > > df.groupBy($"id").agg(max($"my_timestamp", > struct($"*").as("data")).getField("data").select($"data.*") > > You could transcode this to SQL, it'll just be complicated nested queries. > > > On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote: > >> Hi Arun, >> >> I want to select the entire row with the max timestamp for each group. I >> have modified my data set below to avoid any confusion. >> >> *Input:* >> >> id | amount | my_timestamp >> ------------------------------------------- >> 1 | 5 | 2018-04-01T01:00:00.000Z >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 1 | 6 | 2018-04-01T01:20:00.000Z >> 2 | 30 | 2018-04-01T01:25:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> *Expected Output:* >> >> id | amount | my_timestamp >> ------------------------------------------- >> 1 | 10 | 2018-04-01T01:10:00.000Z >> 2 | 40 | 2018-04-01T01:30:00.000Z >> >> Looking for a streaming solution using either raw sql like >> sparkSession.sql("sql >> query") or similar to raw sql but not something like mapGroupWithState >> >> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote: >> >>> Cant the “max” function used here ? Something like.. >>> >>> >>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. >>> >>> Unless the “stream” is already a grouped stream, in which case the above >>> would not work since the support for multiple aggregate operations is not >>> there yet. >>> >>> Thanks, >>> Arun >>> >>> From: kant kodali <kanth...@gmail.com> >>> Date: Tuesday, April 17, 2018 at 11:41 AM >>> To: Tathagata Das <tathagata.das1...@gmail.com> >>> Cc: "user @spark" <user@spark.apache.org> >>> Subject: Re: can we use mapGroupsWithState in raw sql? >>> >>> Hi TD, >>> >>> Thanks for that. The only reason I ask is I don't see any alternative >>> solution to solve the problem below using raw sql. >>> >>> >>> How to select the max row for every group in spark structured streaming >>> 2.3.0 without using order by since it requires complete mode or >>> mapGroupWithState? >>> >>> *Input:* >>> >>> id | amount | my_timestamp >>> ------------------------------------------- >>> 1 | 5 | 2018-04-01T01:00:00.000Z >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 20 | 2018-04-01T01:20:00.000Z >>> 2 | 30 | 2018-04-01T01:25:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> *Expected Output:* >>> >>> id | amount | my_timestamp >>> ------------------------------------------- >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> Looking for a streaming solution using either raw sql like >>> sparkSession.sql("sql >>> query") or similar to raw sql but not something like mapGroupWithState >>> >>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> Unfortunately no. Honestly it does not make sense as for type-aware >>>> operations like map, mapGroups, etc., you have to provide an actual JVM >>>> function. That does not fit in with the SQL language structure. >>>> >>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>> >>> >> >