Thanks Arun, I modified a bit to try my best to avoid enumerating fields: val query = socketDF .selectExpr("CAST(value AS STRING) as value") .as[String] .select(from_json($"value", schema=schema).as("data")) .select($"data.*") .groupBy($"ID") .agg(max(struct($"AMOUNT", $"*")).as("data")) .select($"data.*") .writeStream .format("console") .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .start()
It still have a minor issue: the column "AMOUNT" is showing twice in result table, but everything works like a charm. -Jungtaek Lim (HeartSaVioR) 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성: > The below expr might work: > > df.groupBy($"id").agg(max(struct($"amount", > $"my_timestamp")).as("data")).select($"id", $"data.*") > > > Thanks, > Arun > > From: Jungtaek Lim <kabh...@gmail.com> > Date: Wednesday, April 18, 2018 at 4:54 PM > To: Michael Armbrust <mich...@databricks.com> > Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>, > Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" < > user@spark.apache.org> > > Subject: Re: can we use mapGroupsWithState in raw sql? > > Thanks Michael for providing great solution. Great to remove UDAF and any > needs to provide fields manually. > > Btw, your code has compilation error. ')' is missing, and after I fix it, > it complains again with other issue. > > <console>:66: error: overloaded method value max with alternatives: > (columnName: String)org.apache.spark.sql.Column <and> > (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column > cannot be applied to (org.apache.spark.sql.ColumnName, > org.apache.spark.sql.Column) > > Could you check your code to see it works with Spark 2.3 (via spark-shell > or whatever)? > > Thanks! > Jungtaek Lim (HeartSaVioR) > > 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성: > >> You can calculate argmax using a struct. >> >> df.groupBy($"id").agg(max($"my_timestamp", >> struct($"*").as("data")).getField("data").select($"data.*") >> >> You could transcode this to SQL, it'll just be complicated nested queries. >> >> >> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi Arun, >>> >>> I want to select the entire row with the max timestamp for each group. I >>> have modified my data set below to avoid any confusion. >>> >>> *Input:* >>> >>> id | amount | my_timestamp >>> ------------------------------------------- >>> 1 | 5 | 2018-04-01T01:00:00.000Z >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 1 | 6 | 2018-04-01T01:20:00.000Z >>> 2 | 30 | 2018-04-01T01:25:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> *Expected Output:* >>> >>> id | amount | my_timestamp >>> ------------------------------------------- >>> 1 | 10 | 2018-04-01T01:10:00.000Z >>> 2 | 40 | 2018-04-01T01:30:00.000Z >>> >>> Looking for a streaming solution using either raw sql like >>> sparkSession.sql("sql >>> query") or similar to raw sql but not something like mapGroupWithState >>> >>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org> >>> wrote: >>> >>>> Cant the “max” function used here ? Something like.. >>>> >>>> >>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")…. >>>> >>>> Unless the “stream” is already a grouped stream, in which case the >>>> above would not work since the support for multiple aggregate operations is >>>> not there yet. >>>> >>>> Thanks, >>>> Arun >>>> >>>> From: kant kodali <kanth...@gmail.com> >>>> Date: Tuesday, April 17, 2018 at 11:41 AM >>>> To: Tathagata Das <tathagata.das1...@gmail.com> >>>> Cc: "user @spark" <user@spark.apache.org> >>>> Subject: Re: can we use mapGroupsWithState in raw sql? >>>> >>>> Hi TD, >>>> >>>> Thanks for that. The only reason I ask is I don't see any alternative >>>> solution to solve the problem below using raw sql. >>>> >>>> >>>> How to select the max row for every group in spark structured streaming >>>> 2.3.0 without using order by since it requires complete mode or >>>> mapGroupWithState? >>>> >>>> *Input:* >>>> >>>> id | amount | my_timestamp >>>> ------------------------------------------- >>>> 1 | 5 | 2018-04-01T01:00:00.000Z >>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>> 2 | 20 | 2018-04-01T01:20:00.000Z >>>> 2 | 30 | 2018-04-01T01:25:00.000Z >>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>> >>>> *Expected Output:* >>>> >>>> id | amount | my_timestamp >>>> ------------------------------------------- >>>> 1 | 10 | 2018-04-01T01:10:00.000Z >>>> 2 | 40 | 2018-04-01T01:30:00.000Z >>>> >>>> Looking for a streaming solution using either raw sql like >>>> sparkSession.sql("sql >>>> query") or similar to raw sql but not something like mapGroupWithState >>>> >>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Unfortunately no. Honestly it does not make sense as for type-aware >>>>> operations like map, mapGroups, etc., you have to provide an actual JVM >>>>> function. That does not fit in with the SQL language structure. >>>>> >>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>