This is cool! Looks to me this works too select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group by id)
but I got naive question again. what does max of a struct mean? Does it always take the max of the first column and ignore the rest of the fields in the struct? On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim <kabh...@gmail.com> wrote: > Thanks Arun, I modified a bit to try my best to avoid enumerating fields: > > val query = socketDF > .selectExpr("CAST(value AS STRING) as value") > .as[String] > .select(from_json($"value", schema=schema).as("data")) > .select($"data.*") > .groupBy($"ID") > .agg(max(struct($"AMOUNT", $"*")).as("data")) > .select($"data.*") > .writeStream > .format("console") > .trigger(Trigger.ProcessingTime("1 seconds")) > .outputMode(OutputMode.Update()) > .start() > > It still have a minor issue: the column "AMOUNT" is showing twice in > result table, but everything works like a charm. > > -Jungtaek Lim (HeartSaVioR) > > 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성: > >> The below expr might work: >> >> df.groupBy($"id").agg(max(struct($"amount", >> $"my_timestamp")).as("data")).select($"id", $"data.*") >> >> >> Thanks, >> Arun >> >> From: Jungtaek Lim <kabh...@gmail.com> >> Date: Wednesday, April 18, 2018 at 4:54 PM >> To: Michael Armbrust <mich...@databricks.com> >> Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>, >> Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" < >> user@spark.apache.org> >> >> Subject: Re: can we use mapGroupsWithState in raw sql? >> >> Thanks Michael for providing great solution. Great to remove UDAF and any >> needs to provide fields manually. >> >> Btw, your code has compilation error. ')' is missing, and after I fix it, >> it complains again with other issue. >> >> <console>:66: error: overloaded method value max with alternatives: >> (columnName: String)org.apache.spark.sql.Column <and> >> (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column >> cannot be applied to (org.apache.spark.sql.ColumnName, >> org.apache.spark.sql.Column) >> >> Could you check your code to see it works with Spark 2.3 (via spark-shell >> or whatever)? >> >> Thanks! >> Jungtaek Lim (HeartSaVioR) >> >> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성: >> >>> You can calculate argmax using a struct. >>> >>> df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")). >>> getField("data").select($"data.*") >>> >>> You could transcode this to SQL, it'll just be complicated nested >>> queries. >>> >>> >>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi Arun, >>>> >>>> I want to select the entire row with the max timestamp for each group. >>>> I have modified my data set below to avoid any confusion. >>>> >>>> *Input:* >>>> >>>> id | amount | my_timestamp >>>> ------------------------------------------- >>>> 1 | 5 | 2018-04-01T01:00:00.000Z >>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>> 1 | 6 | 2018-04-01T01:20:00.000Z >>>> 2 | 30 | 2018-04-01T01:25:00.000Z >>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>> >>>> *Expected Output:* >>>> >>>> id | amount | my_timestamp >>>> ------------------------------------------- >>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>> >>>> Looking for a streaming solution using either raw sql like >>>> sparkSession.sql("sql >>>> query") or similar to raw sql but not something like mapGroupWithState >>>> >>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> >>>> wrote: >>>> >>>>> Cant the “max” function used here ? Something like.. >>>>> >>>>> stream.groupBy($"id").max("amount").writeStream. >>>>> outputMode(“complete”/“update")…. >>>>> >>>>> Unless the “stream” is already a grouped stream, in which case the >>>>> above would not work since the support for multiple aggregate operations >>>>> is >>>>> not there yet. >>>>> >>>>> Thanks, >>>>> Arun >>>>> >>>>> From: kant kodali <kanth...@gmail.com> >>>>> Date: Tuesday, April 17, 2018 at 11:41 AM >>>>> To: Tathagata Das <tathagata.das1...@gmail.com> >>>>> Cc: "user @spark" <user@spark.apache.org> >>>>> Subject: Re: can we use mapGroupsWithState in raw sql? >>>>> >>>>> Hi TD, >>>>> >>>>> Thanks for that. The only reason I ask is I don't see any alternative >>>>> solution to solve the problem below using raw sql. >>>>> >>>>> >>>>> How to select the max row for every group in spark structured >>>>> streaming 2.3.0 without using order by since it requires complete >>>>> mode or mapGroupWithState? >>>>> >>>>> *Input:* >>>>> >>>>> id | amount | my_timestamp >>>>> ------------------------------------------- >>>>> 1 | 5 | 2018-04-01T01:00:00.000Z >>>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>>> 2 | 20 | 2018-04-01T01:20:00.000Z >>>>> 2 | 30 | 2018-04-01T01:25:00.000Z >>>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>>> >>>>> *Expected Output:* >>>>> >>>>> id | amount | my_timestamp >>>>> ------------------------------------------- >>>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>>> >>>>> Looking for a streaming solution using either raw sql like >>>>> sparkSession.sql("sql >>>>> query") or similar to raw sql but not something like mapGroupWithState >>>>> >>>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < >>>>> tathagata.das1...@gmail.com> wrote: >>>>> >>>>>> Unfortunately no. Honestly it does not make sense as for type-aware >>>>>> operations like map, mapGroups, etc., you have to provide an actual JVM >>>>>> function. That does not fit in with the SQL language structure. >>>>>> >>>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>