Thanks Arun, I modified a bit to try my best to avoid enumerating fields:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .select(from_json($"value", schema=schema).as("data"))
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .trigger(Trigger.ProcessingTime("1 seconds"))

It still have a minor issue: the column "AMOUNT" is showing twice in result
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <>님이 작성:

> The below expr might work:
> df.groupBy($"id").agg(max(struct($"amount", 
> $"my_timestamp")).as("data")).select($"id", $"data.*")
> Thanks,
> Arun
> From: Jungtaek Lim <>
> Date: Wednesday, April 18, 2018 at 4:54 PM
> To: Michael Armbrust <>
> Cc: kant kodali <>, Arun Iyer <>,
> Tathagata Das <>, "user @spark" <
> Subject: Re: can we use mapGroupsWithState in raw sql?
> Thanks Michael for providing great solution. Great to remove UDAF and any
> needs to provide fields manually.
> Btw, your code has compilation error. ')' is missing, and after I fix it,
> it complains again with other issue.
> <console>:66: error: overloaded method value max with alternatives:
>   (columnName: String)org.apache.spark.sql.Column <and>
>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>  cannot be applied to (org.apache.spark.sql.ColumnName,
> org.apache.spark.sql.Column)
> Could you check your code to see it works with Spark 2.3 (via spark-shell
> or whatever)?
> Thanks!
> Jungtaek Lim (HeartSaVioR)
> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <>님이 작성:
>> You can calculate argmax using a struct.
>> df.groupBy($"id").agg(max($"my_timestamp",
>> struct($"*").as("data")).getField("data").select($"data.*")
>> You could transcode this to SQL, it'll just be complicated nested queries.
>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <> wrote:
>>> Hi Arun,
>>> I want to select the entire row with the max timestamp for each group. I
>>> have modified my data set below to avoid any confusion.
>>> *Input:*
>>> id | amount     | my_timestamp
>>> -------------------------------------------
>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>> 1  |      6     |  2018-04-01T01:20:00.000Z
>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>> *Expected Output:*
>>> id | amount     | my_timestamp
>>> -------------------------------------------
>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <>
>>> wrote:
>>>> Cant the “max” function used here ? Something like..
>>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
>>>> Unless the “stream” is already a grouped stream, in which case the
>>>> above would not work since the support for multiple aggregate operations is
>>>> not there yet.
>>>> Thanks,
>>>> Arun
>>>> From: kant kodali <>
>>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>>> To: Tathagata Das <>
>>>> Cc: "user @spark" <>
>>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>> Hi TD,
>>>> Thanks for that. The only reason I ask is I don't see any alternative
>>>> solution to solve the problem below using raw sql.
>>>> How to select the max row for every group in spark structured streaming
>>>> 2.3.0 without using order by since it requires complete mode or
>>>> mapGroupWithState?
>>>> *Input:*
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 2  |     20     |  2018-04-01T01:20:00.000Z
>>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>> *Expected Output:*
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>> Looking for a streaming solution using either raw sql like 
>>>> sparkSession.sql("sql
>>>> query") or similar to raw sql but not something like mapGroupWithState
>>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>>>>> wrote:
>>>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>>>> function. That does not fit in with the SQL language structure.
>>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <>
>>>>> wrote:
>>>>>> Hi All,
>>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>>>> Thanks!

Reply via email to