That might be simple if you want to get aggregated values for both amount
and my_timestamp:

    val schema = StructType(Seq(
      StructField("ID", IntegerType, true),
      StructField("AMOUNT", IntegerType, true),
      StructField("MY_TIMESTAMP", DateType, true)
    ))

    val query = socketDF
      .selectExpr("CAST(value AS STRING) as value")
      .as[String]
      .select(from_json($"value", schema=schema).as("data"))
      .select($"data.ID", $"data.AMOUNT", $"data.MY_TIMESTAMP")
      .groupBy($"ID")
      .agg("AMOUNT" -> "max", "MY_TIMESTAMP" -> "max")

which requires you to set output mode as Update mode or Complete mode.

But I guess you would like to select the max row and use MY_TIMESTAMP from
max row, then I guess you need to do inner self-join, like below:

    val query = socketDF
      .selectExpr("CAST(value AS STRING) as value")
      .as[String]
      .select(from_json($"value", schema=schema).as("data"))
      .select($"data.ID", $"data.AMOUNT")
      .groupBy($"ID")
      .agg("AMOUNT" -> "max")

    val query2 = socketDF
      .selectExpr("CAST(value AS STRING) as value")
      .as[String]
      .select(from_json($"value", schema=schema).as("data"))
      .select($"data.ID".as("SELF_ID"), $"data.AMOUNT".as("SELF_AMOUNT"),
$"data.MY_TIMESTAMP")

    val query3 = query.join(query2, expr("""
       ID = ID AND
       `MAX(AMOUNT)` = SELF_AMOUNT
    """))

which is NOT valid at least for Spark 2.3, because aggregation requires
Update/Complete mode but join requires Append mode.
(Guide page of structured streaming clearly explains such limitation:
"Cannot use streaming aggregation before joins.")

If you can achieve with mapGroupWithState, you may want to stick with that.

Btw, when you deal with streaming, you may want to define logical batch for
all aggregations and joins via defining window and watermark. You wouldn't
want to get different result according to the micro-batch, and then you
always want to deal with event time window.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 4월 18일 (수) 오전 3:42, kant kodali <kanth...@gmail.com>님이 작성:

> Hi TD,
>
> Thanks for that. The only reason I ask is I don't see any alternative
> solution to solve the problem below using raw sql.
>
>
> How to select the max row for every group in spark structured streaming
> 2.3.0 without using order by since it requires complete mode or
> mapGroupWithState?
>
> *Input:*
>
> id | amount     | my_timestamp
> -------------------------------------------
> 1  |      5     |  2018-04-01T01:00:00.000Z
> 1  |     10     |  2018-04-01T01:10:00.000Z
> 2  |     20     |  2018-04-01T01:20:00.000Z
> 2  |     30     |  2018-04-01T01:25:00.000Z
> 2  |     40     |  2018-04-01T01:30:00.000Z
>
> *Expected Output:*
>
> id | amount     | my_timestamp
> -------------------------------------------
> 1  |     10     |  2018-04-01T01:10:00.000Z
> 2  |     40     |  2018-04-01T01:30:00.000Z
>
> Looking for a streaming solution using either raw sql like 
> sparkSession.sql("sql
> query") or similar to raw sql but not something like mapGroupWithState
>
> On Mon, Apr 16, 2018 at 8:32 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Unfortunately no. Honestly it does not make sense as for type-aware
>> operations like map, mapGroups, etc., you have to provide an actual JVM
>> function. That does not fit in with the SQL language structure.
>>
>> On Mon, Apr 16, 2018 at 7:34 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> can we use mapGroupsWithState in raw SQL? or is it in the roadmap?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>

Reply via email to