I would like to perform structured streaming aggregation with a windowing period. Given this following data schema. The objective is to filter by the latest occurring event based on user. Then aggregate the count of each event type for each location.
time location user type 1 A 1 one 2 A 1 two 1 B 2 one 2 B 2 one 1 A 3 two 1 A 4 one Sample output: location countOne countTwo A 1 2 B 1 0 something like the following: val aggTypes = df .select($"location", $"time", $"user", $"type") .groupBy($"user") .agg(max($"timestamp") as 'timestamp) .select("*") .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds") .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location") .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo))) .drop($"window") As structured streaming does not support multiple aggregations and Non-time-based windows are not supported on streaming DataFrames/Datasets. I am not sure if it is possible to achieve the desired output in 1 streaming query. Any help is appreciated.