Hello everyone,

I am trying to apply moving average on categorical data like below, which
is a synthetic data generated by myself.

sqltimestamp,col1,col2,col3,col4,col5

1618574879,cat1,cat4,cat2,cat5,cat3

1618574880,cat1,cat3,cat4,cat2,cat5

1618574881,cat5,cat3,cat4,cat2,cat1

1618574882,cat2,cat3,cat5,cat1,cat4

1618574883,cat2,cat4,cat1,cat3,cat5

1618574884,cat1,cat2,cat5,cat4,cat3

1618574885,cat5,cat3,cat2,cat1,cat4

1618574886,cat3,cat5,cat4,cat2,cat1

1618574887,cat3,cat2,cat5,cat4,cat1

1618574888,cat1,cat5,cat3,cat2,cat4




I like to take the average of the number of "cat1" in the column "col1" for
each 5 minutes window according to the column "sqltimestamp". I solved when
column is numeric but I couldn't solve it when the column is categorical as
above.


The code below produces rows of tuples (timestamp, count) and I cannot
apply avg aggregate function on the result because spark does not support
multiple aggregation functions on one streaming.

val movavgDF = spark

  .readStream

  .schema(schema)

  .option("failOnDataLoss", true)
  .option("delimiter", ",")
  .csv(inputParameters.csvSinkDir)

.withWatermark("sqltimestamp", "5 seconds")
.groupBy(window(col("sqltimestamp"), "1 minute").as("time_frame"))
.agg(
count( when( col("col1") === "cat1", 1)).as("count")
)
.withColumn("window_start", col("time_frame")("start").cast(TimestampType))
.drop("time_frame")
.orderBy("window_start")


After my searches on the net, I have come to the conclusion that we
can do it if it is not structural streaming, but I need it while
streaming.

I would be very happy if you can provide me a solution for this problem.

Thank you very much in advance.

Best,

-halil.

Reply via email to