Hello All - I'm using Apache Spark Structured Streaming to read data from Kafka topic, and do some processing. I'm using watermark to account for late-coming records and the code works fine.
Here is the working(sample) code: ``` from pyspark.sql import SparkSessionfrom pyspark.sql.functions import from_json, col, to_timestamp, window, max,exprfrom pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType spark = SparkSession \ .builder \ .master("local[3]") \ .appName("Sliding Window Demo") \ .config("spark.streaming.stopGracefullyOnShutdown", "true") \ .config("spark.sql.shuffle.partitions", 1) \ .getOrCreate() stock_schema = StructType([ StructField("LogType", StringType()), StructField("CreatedTime", StringType()), StructField("Type", StringType()), StructField("Amount", IntegerType()), StructField("BrokerCode", StringType()) ]) kafka_df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "trades") \ .option("startingOffsets", "earliest") \ .load() value_df = kafka_df.select(from_json(col("value").cast("string"), stock_schema).alias("value")) trade_df = value_df.select("value.*") \ .withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \ .withColumn("Buy", expr("case when Type == 'BUY' then Amount else 0 end")) \ .withColumn("Sell", expr("case when Type == 'SELL' then Amount else 0 end")) window_agg_df = trade_df \ .withWatermark("CreatedTime", "10 minute") \ .groupBy(window(col("CreatedTime"), "10 minute")) \ .agg({"Buy":"sum", "Sell":"sum"}).withColumnRenamed("sum(Buy)", "TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell") output_df = window_agg_df.select("window.start", "window.end", "TotalBuy", "TotalSell") window_query = output_df.writeStream \ .format("console") \ .outputMode("append") \ .option("checkpointLocation", "chk-point-dir-mar28") \ .trigger(processingTime="30 second") \ .start() window_query.awaitTermination() ``` Currently, I'm processing a single LogType, the requirement is to process multiple LogTypes in the same flow .. LogTypes will be config driven (not hard-coded). Objective is to have generic code that can process all logTypes. As an example, for LogType X, I will need to get groupby columns col1, col2 and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy columns will remain the same but the sum will be on column col3 instead. w/o the watermark, I can look at the LogType and do the processing in batch mode (using foreachBatch). However, with watermark - i'm unable to figure out how to process based on LogType. Any inputs on this ? Here is the stackoverflow for this https://stackoverflow.com/questions/76547349/apache-spark-with-watermark-processing-data-different-logtypes-in-same-kafka-t tia!