I assume its going to compare by the first column and if equal compare the second column and so on.
From: kant kodali <kanth...@gmail.com> Date: Wednesday, April 18, 2018 at 6:26 PM To: Jungtaek Lim <kabh...@gmail.com> Cc: Arun Iyer <ar...@apache.org>, Michael Armbrust <mich...@databricks.com>, Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" <user@spark.apache.org> Subject: Re: can we use mapGroupsWithState in raw sql? This is cool! Looks to me this works too select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1 group by id) but I got naive question again. what does max of a struct mean? Does it always take the max of the first column and ignore the rest of the fields in the struct? On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim <kabh...@gmail.com> wrote: Thanks Arun, I modified a bit to try my best to avoid enumerating fields: val query = socketDF .selectExpr("CAST(value AS STRING) as value") .as[String] .select(from_json($"value", schema=schema).as("data")) .select($"data.*") .groupBy($"ID") .agg(max(struct($"AMOUNT", $"*")).as("data")) .select($"data.*") .writeStream .format("console") .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .start() It still have a minor issue: the column "AMOUNT" is showing twice in result table, but everything works like a charm. -Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성: The below expr might work: df.groupBy($"id").agg(max(struct($"amount", $"my_timestamp")).as("data")).select($"id", $"data.*") Thanks, Arun From: Jungtaek Lim <kabh...@gmail.com> Date: Wednesday, April 18, 2018 at 4:54 PM To: Michael Armbrust <mich...@databricks.com> Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>, Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" <user@spark.apache.org> Subject: Re: can we use mapGroupsWithState in raw sql? Thanks Michael for providing great solution. Great to remove UDAF and any needs to provide fields manually. Btw, your code has compilation error. ')' is missing, and after I fix it, it complains again with other issue. <console>:66: error: overloaded method value max with alternatives: (columnName: String)org.apache.spark.sql.Column <and> (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column cannot be applied to (org.apache.spark.sql.ColumnName, org.apache.spark.sql.Column) Could you check your code to see it works with Spark 2.3 (via spark-shell or whatever)? Thanks! Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성: You can calculate argmax using a struct. df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).getField("data").select($"data.*") You could transcode this to SQL, it'll just be complicated nested queries. On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote: Hi Arun, I want to select the entire row with the max timestamp for each group. I have modified my data set below to avoid any confusion. Input: id | amount | my_timestamp ------------------------------------------- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 1 | 6 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp ------------------------------------------- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> wrote: Cant the “max” function used here ? Something like.. stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. Unless the “stream” is already a grouped stream, in which case the above would not work since the support for multiple aggregate operations is not there yet. Thanks, Arun From: kant kodali <kanth...@gmail.com> Date: Tuesday, April 17, 2018 at 11:41 AM To: Tathagata Das <tathagata.das1...@gmail.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: can we use mapGroupsWithState in raw sql? Hi TD, Thanks for that. The only reason I ask is I don't see any alternative solution to solve the problem below using raw sql. How to select the max row for every group in spark structured streaming 2.3.0 without using order by since it requires complete mode or mapGroupWithState? Input: id | amount | my_timestamp ------------------------------------------- 1 | 5 | 2018-04-01T01:00:00.000Z 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 20 | 2018-04-01T01:20:00.000Z 2 | 30 | 2018-04-01T01:25:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Expected Output: id | amount | my_timestamp ------------------------------------------- 1 | 10 | 2018-04-01T01:10:00.000Z 2 | 40 | 2018-04-01T01:30:00.000Z Looking for a streaming solution using either raw sql like sparkSession.sql("sql query") or similar to raw sql but not something like mapGroupWithState On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: Unfortunately no. Honestly it does not make sense as for type-aware operations like map, mapGroups, etc., you have to provide an actual JVM function. That does not fit in with the SQL language structure. On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> wrote: Hi All, can we use mapGroupsWithState in raw SQL? or is it in the roadmap? Thanks!