Hello everyone, I am trying to apply moving average on categorical data like below, which is a synthetic data generated by myself.
sqltimestamp,col1,col2,col3,col4,col5 1618574879,cat1,cat4,cat2,cat5,cat3 1618574880,cat1,cat3,cat4,cat2,cat5 1618574881,cat5,cat3,cat4,cat2,cat1 1618574882,cat2,cat3,cat5,cat1,cat4 1618574883,cat2,cat4,cat1,cat3,cat5 1618574884,cat1,cat2,cat5,cat4,cat3 1618574885,cat5,cat3,cat2,cat1,cat4 1618574886,cat3,cat5,cat4,cat2,cat1 1618574887,cat3,cat2,cat5,cat4,cat1 1618574888,cat1,cat5,cat3,cat2,cat4 I like to take the average of the number of "cat1" in the column "col1" for each 5 minutes window according to the column "sqltimestamp". I solved when column is numeric but I couldn't solve it when the column is categorical as above. The code below produces rows of tuples (timestamp, count) and I cannot apply avg aggregate function on the result because spark does not support multiple aggregation functions on one streaming. val movavgDF = spark .readStream .schema(schema) .option("failOnDataLoss", true) .option("delimiter", ",") .csv(inputParameters.csvSinkDir) .withWatermark("sqltimestamp", "5 seconds") .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame")) .agg( count( when( col("col1") === "cat1", 1)).as("count") ) .withColumn("window_start", col("time_frame")("start").cast(TimestampType)) .drop("time_frame") .orderBy("window_start") After my searches on the net, I have come to the conclusion that we can do it if it is not structural streaming, but I need it while streaming. I would be very happy if you can provide me a solution for this problem. Thank you very much in advance. Best, -halil.