You might be able to do this with multiple aggregations on avg(col("col1")
== "cat1") etc, but how about pivoting the DataFrame first so that you get
columns like "cat1" being 1 or 0? you would end up with columns x
categories new columns if you want to count all categories in all cols. But
then it's just a simple aggregation on numeric values.

On Mon, Apr 26, 2021 at 9:29 AM halil <halil.a...@gmail.com> wrote:

> Hello everyone,
>
> I am trying to apply moving average on categorical data like below, which
> is a synthetic data generated by myself.
>
> sqltimestamp,col1,col2,col3,col4,col5
>
> 1618574879,cat1,cat4,cat2,cat5,cat3
>
> 1618574880,cat1,cat3,cat4,cat2,cat5
>
> 1618574881,cat5,cat3,cat4,cat2,cat1
>
> 1618574882,cat2,cat3,cat5,cat1,cat4
>
> 1618574883,cat2,cat4,cat1,cat3,cat5
>
> 1618574884,cat1,cat2,cat5,cat4,cat3
>
> 1618574885,cat5,cat3,cat2,cat1,cat4
>
> 1618574886,cat3,cat5,cat4,cat2,cat1
>
> 1618574887,cat3,cat2,cat5,cat4,cat1
>
> 1618574888,cat1,cat5,cat3,cat2,cat4
>
>
>
>
> I like to take the average of the number of "cat1" in the column "col1"
> for each 5 minutes window according to the column "sqltimestamp". I solved
> when column is numeric but I couldn't solve it when the column is
> categorical as above.
>
>
> The code below produces rows of tuples (timestamp, count) and I cannot
> apply avg aggregate function on the result because spark does not support
> multiple aggregation functions on one streaming.
>
> val movavgDF = spark
>
>   .readStream
>
>   .schema(schema)
>
>   .option("failOnDataLoss", true)
>   .option("delimiter", ",")
>   .csv(inputParameters.csvSinkDir)
>
> .withWatermark("sqltimestamp", "5 seconds")
> .groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
> .agg(
> count( when( col("col1") === "cat1", 1)).as("count")
> )
> .withColumn("window_start", col("time_frame")("start").cast(TimestampType
> ))
> .drop("time_frame")
> .orderBy("window_start")
>
>
> After my searches on the net, I have come to the conclusion that we can do it 
> if it is not structural streaming, but I need it while streaming.
>
> I would be very happy if you can provide me a solution for this problem.
>
> Thank you very much in advance.
>
> Best,
>
> -halil.
>
>
>
>
>
>

Reply via email to