Thanks Arun, I modified a bit to try my best to avoid enumerating fields:

val query = socketDF
  .selectExpr("CAST(value AS STRING) as value")
  .as[String]
  .select(from_json($"value", schema=schema).as("data"))
  .select($"data.*")
  .groupBy($"ID")
  .agg(max(struct($"AMOUNT", $"*")).as("data"))
  .select($"data.*")
  .writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("1 seconds"))
  .outputMode(OutputMode.Update())
  .start()

It still have a minor issue: the column "AMOUNT" is showing twice in result
table, but everything works like a charm.

-Jungtaek Lim (HeartSaVioR)

2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성:

> The below expr might work:
>
> df.groupBy($"id").agg(max(struct($"amount", 
> $"my_timestamp")).as("data")).select($"id", $"data.*")
>
>
> Thanks,
> Arun
>
> From: Jungtaek Lim <kabh...@gmail.com>
> Date: Wednesday, April 18, 2018 at 4:54 PM
> To: Michael Armbrust <mich...@databricks.com>
> Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>,
> Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" <
> user@spark.apache.org>
>
> Subject: Re: can we use mapGroupsWithState in raw sql?
>
> Thanks Michael for providing great solution. Great to remove UDAF and any
> needs to provide fields manually.
>
> Btw, your code has compilation error. ')' is missing, and after I fix it,
> it complains again with other issue.
>
> <console>:66: error: overloaded method value max with alternatives:
>   (columnName: String)org.apache.spark.sql.Column <and>
>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>  cannot be applied to (org.apache.spark.sql.ColumnName,
> org.apache.spark.sql.Column)
>
> Could you check your code to see it works with Spark 2.3 (via spark-shell
> or whatever)?
>
> Thanks!
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:
>
>> You can calculate argmax using a struct.
>>
>> df.groupBy($"id").agg(max($"my_timestamp",
>> struct($"*").as("data")).getField("data").select($"data.*")
>>
>> You could transcode this to SQL, it'll just be complicated nested queries.
>>
>>
>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Arun,
>>>
>>> I want to select the entire row with the max timestamp for each group. I
>>> have modified my data set below to avoid any confusion.
>>>
>>> *Input:*
>>>
>>> id | amount     | my_timestamp
>>> -------------------------------------------
>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>> 1  |      6     |  2018-04-01T01:20:00.000Z
>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>
>>> *Expected Output:*
>>>
>>> id | amount     | my_timestamp
>>> -------------------------------------------
>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>
>>> Looking for a streaming solution using either raw sql like 
>>> sparkSession.sql("sql
>>> query") or similar to raw sql but not something like mapGroupWithState
>>>
>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org>
>>> wrote:
>>>
>>>> Cant the “max” function used here ? Something like..
>>>>
>>>>
>>>> stream.groupBy($"id").max("amount").writeStream.outputMode(“complete”/“update")….
>>>>
>>>> Unless the “stream” is already a grouped stream, in which case the
>>>> above would not work since the support for multiple aggregate operations is
>>>> not there yet.
>>>>
>>>> Thanks,
>>>> Arun
>>>>
>>>> From: kant kodali <kanth...@gmail.com>
>>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>>> To: Tathagata Das <tathagata.das1...@gmail.com>
>>>> Cc: "user @spark" <user@spark.apache.org>
>>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>>
>>>> Hi TD,
>>>>
>>>> Thanks for that. The only reason I ask is I don't see any alternative
>>>> solution to solve the problem below using raw sql.
>>>>
>>>>
>>>> How to select the max row for every group in spark structured streaming
>>>> 2.3.0 without using order by since it requires complete mode or
>>>> mapGroupWithState?
>>>>
>>>> *Input:*
>>>>
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 2  |     20     |  2018-04-01T01:20:00.000Z
>>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>
>>>> *Expected Output:*
>>>>
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>
>>>> Looking for a streaming solution using either raw sql like 
>>>> sparkSession.sql("sql
>>>> query") or similar to raw sql but not something like mapGroupWithState
>>>>
>>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>>>> function. That does not fit in with the SQL language structure.
>>>>>
>>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to