This is cool! Looks to me this works too

select data.* from (SELECT max(struct(my_timestamp,*)) as data from view1
group by id)

but I got naive question again. what does max of a struct mean? Does it
always take the max of the first column and ignore the rest of the fields
in the struct?

On Wed, Apr 18, 2018 at 6:06 PM, Jungtaek Lim <kabh...@gmail.com> wrote:

> Thanks Arun, I modified a bit to try my best to avoid enumerating fields:
>
> val query = socketDF
>   .selectExpr("CAST(value AS STRING) as value")
>   .as[String]
>   .select(from_json($"value", schema=schema).as("data"))
>   .select($"data.*")
>   .groupBy($"ID")
>   .agg(max(struct($"AMOUNT", $"*")).as("data"))
>   .select($"data.*")
>   .writeStream
>   .format("console")
>   .trigger(Trigger.ProcessingTime("1 seconds"))
>   .outputMode(OutputMode.Update())
>   .start()
>
> It still have a minor issue: the column "AMOUNT" is showing twice in
> result table, but everything works like a charm.
>
> -Jungtaek Lim (HeartSaVioR)
>
> 2018년 4월 19일 (목) 오전 9:43, Arun Mahadevan <ar...@apache.org>님이 작성:
>
>> The below expr might work:
>>
>> df.groupBy($"id").agg(max(struct($"amount", 
>> $"my_timestamp")).as("data")).select($"id", $"data.*")
>>
>>
>> Thanks,
>> Arun
>>
>> From: Jungtaek Lim <kabh...@gmail.com>
>> Date: Wednesday, April 18, 2018 at 4:54 PM
>> To: Michael Armbrust <mich...@databricks.com>
>> Cc: kant kodali <kanth...@gmail.com>, Arun Iyer <ar...@apache.org>,
>> Tathagata Das <tathagata.das1...@gmail.com>, "user @spark" <
>> user@spark.apache.org>
>>
>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>
>> Thanks Michael for providing great solution. Great to remove UDAF and any
>> needs to provide fields manually.
>>
>> Btw, your code has compilation error. ')' is missing, and after I fix it,
>> it complains again with other issue.
>>
>> <console>:66: error: overloaded method value max with alternatives:
>>   (columnName: String)org.apache.spark.sql.Column <and>
>>   (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column
>>  cannot be applied to (org.apache.spark.sql.ColumnName,
>> org.apache.spark.sql.Column)
>>
>> Could you check your code to see it works with Spark 2.3 (via spark-shell
>> or whatever)?
>>
>> Thanks!
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 4월 19일 (목) 오전 8:34, Michael Armbrust <mich...@databricks.com>님이 작성:
>>
>>> You can calculate argmax using a struct.
>>>
>>> df.groupBy($"id").agg(max($"my_timestamp", struct($"*").as("data")).
>>> getField("data").select($"data.*")
>>>
>>> You could transcode this to SQL, it'll just be complicated nested
>>> queries.
>>>
>>>
>>> On Wed, Apr 18, 2018 at 3:40 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Hi Arun,
>>>>
>>>> I want to select the entire row with the max timestamp for each group.
>>>> I have modified my data set below to avoid any confusion.
>>>>
>>>> *Input:*
>>>>
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 1  |      6     |  2018-04-01T01:20:00.000Z
>>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>
>>>> *Expected Output:*
>>>>
>>>> id | amount     | my_timestamp
>>>> -------------------------------------------
>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>
>>>> Looking for a streaming solution using either raw sql like 
>>>> sparkSession.sql("sql
>>>> query") or similar to raw sql but not something like mapGroupWithState
>>>>
>>>> On Wed, Apr 18, 2018 at 9:36 AM, Arun Mahadevan <ar...@apache.org>
>>>> wrote:
>>>>
>>>>> Cant the “max” function used here ? Something like..
>>>>>
>>>>> stream.groupBy($"id").max("amount").writeStream.
>>>>> outputMode(“complete”/“update")….
>>>>>
>>>>> Unless the “stream” is already a grouped stream, in which case the
>>>>> above would not work since the support for multiple aggregate operations 
>>>>> is
>>>>> not there yet.
>>>>>
>>>>> Thanks,
>>>>> Arun
>>>>>
>>>>> From: kant kodali <kanth...@gmail.com>
>>>>> Date: Tuesday, April 17, 2018 at 11:41 AM
>>>>> To: Tathagata Das <tathagata.das1...@gmail.com>
>>>>> Cc: "user @spark" <user@spark.apache.org>
>>>>> Subject: Re: can we use mapGroupsWithState in raw sql?
>>>>>
>>>>> Hi TD,
>>>>>
>>>>> Thanks for that. The only reason I ask is I don't see any alternative
>>>>> solution to solve the problem below using raw sql.
>>>>>
>>>>>
>>>>> How to select the max row for every group in spark structured
>>>>> streaming 2.3.0 without using order by since it requires complete
>>>>> mode or mapGroupWithState?
>>>>>
>>>>> *Input:*
>>>>>
>>>>> id | amount     | my_timestamp
>>>>> -------------------------------------------
>>>>> 1  |      5     |  2018-04-01T01:00:00.000Z
>>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>>> 2  |     20     |  2018-04-01T01:20:00.000Z
>>>>> 2  |     30     |  2018-04-01T01:25:00.000Z
>>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>>
>>>>> *Expected Output:*
>>>>>
>>>>> id | amount     | my_timestamp
>>>>> -------------------------------------------
>>>>> 1  |     10     |  2018-04-01T01:10:00.000Z
>>>>> 2  |     40     |  2018-04-01T01:30:00.000Z
>>>>>
>>>>> Looking for a streaming solution using either raw sql like 
>>>>> sparkSession.sql("sql
>>>>> query") or similar to raw sql but not something like mapGroupWithState
>>>>>
>>>>> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>
>>>>>> Unfortunately no. Honestly it does not make sense as for type-aware
>>>>>> operations like map, mapGroups, etc., you have to provide an actual JVM
>>>>>> function. That does not fit in with the SQL language structure.
>>>>>>
>>>>>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>

Reply via email to