I would like to perform structured streaming aggregation with a windowing 
period. Given this following data schema. The objective is to filter by the 
latest occurring event based on user. Then aggregate the count of each event 
type for each location.

time    location   user   type
 1        A         1      one
 2        A         1      two
 1        B         2      one
 2        B         2      one
 1        A         3      two
 1        A         4      one
Sample output:

location   countOne   countTwo
    A          1         2
    B          1         0
something like the following:

val aggTypes = df
  .select($"location", $"time", $"user", $"type")
  .groupBy($"user")
  .agg(max($"timestamp") as 'timestamp)
  .select("*")
  .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " 
seconds", DataConstant.t1min.toString + " seconds", $"location")
  .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" 
=== "two", $"type" as 'countTwo)))
  .drop($"window")
As structured streaming does not support multiple aggregations and 
Non-time-based windows are not supported on streaming DataFrames/Datasets. I am 
not sure if it is possible to achieve the desired output in 1 streaming query.

Any help is appreciated.

Reply via email to