You might be able to do this with multiple aggregations on avg(col("col1") == "cat1") etc, but how about pivoting the DataFrame first so that you get columns like "cat1" being 1 or 0? you would end up with columns x categories new columns if you want to count all categories in all cols. But then it's just a simple aggregation on numeric values.
On Mon, Apr 26, 2021 at 9:29 AM halil <halil.a...@gmail.com> wrote: > Hello everyone, > > I am trying to apply moving average on categorical data like below, which > is a synthetic data generated by myself. > > sqltimestamp,col1,col2,col3,col4,col5 > > 1618574879,cat1,cat4,cat2,cat5,cat3 > > 1618574880,cat1,cat3,cat4,cat2,cat5 > > 1618574881,cat5,cat3,cat4,cat2,cat1 > > 1618574882,cat2,cat3,cat5,cat1,cat4 > > 1618574883,cat2,cat4,cat1,cat3,cat5 > > 1618574884,cat1,cat2,cat5,cat4,cat3 > > 1618574885,cat5,cat3,cat2,cat1,cat4 > > 1618574886,cat3,cat5,cat4,cat2,cat1 > > 1618574887,cat3,cat2,cat5,cat4,cat1 > > 1618574888,cat1,cat5,cat3,cat2,cat4 > > > > > I like to take the average of the number of "cat1" in the column "col1" > for each 5 minutes window according to the column "sqltimestamp". I solved > when column is numeric but I couldn't solve it when the column is > categorical as above. > > > The code below produces rows of tuples (timestamp, count) and I cannot > apply avg aggregate function on the result because spark does not support > multiple aggregation functions on one streaming. > > val movavgDF = spark > > .readStream > > .schema(schema) > > .option("failOnDataLoss", true) > .option("delimiter", ",") > .csv(inputParameters.csvSinkDir) > > .withWatermark("sqltimestamp", "5 seconds") > .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame")) > .agg( > count( when( col("col1") === "cat1", 1)).as("count") > ) > .withColumn("window_start", col("time_frame")("start").cast(TimestampType > )) > .drop("time_frame") > .orderBy("window_start") > > > After my searches on the net, I have come to the conclusion that we can do it > if it is not structural streaming, but I need it while streaming. > > I would be very happy if you can provide me a solution for this problem. > > Thank you very much in advance. > > Best, > > -halil. > > > > > >