Hello All -

I'm using Apache Spark Structured Streaming to read data from Kafka topic,
and do some processing. I'm using watermark to account for late-coming
records and the code works fine.

Here is the working(sample) code:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import
from_json, col, to_timestamp, window, max,exprfrom pyspark.sql.types
import StructType, StructField, StringType, DoubleType,IntegerType

spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("Sliding Window Demo") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 1) \

stock_schema = StructType([
        StructField("LogType", StringType()),
        StructField("CreatedTime", StringType()),
        StructField("Type", StringType()),
        StructField("Amount", IntegerType()),
        StructField("BrokerCode", StringType())

kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "trades") \
        .option("startingOffsets", "earliest") \

value_df = kafka_df.select(from_json(col("value").cast("string"),

trade_df = value_df.select("value.*") \
        .withColumn("CreatedTime", to_timestamp(col("CreatedTime"),
"yyyy-MM-dd HH:mm:ss")) \
        .withColumn("Buy", expr("case when Type == 'BUY' then Amount
else 0 end")) \
        .withColumn("Sell", expr("case when Type == 'SELL' then Amount
else 0 end"))

window_agg_df = trade_df \
        .withWatermark("CreatedTime", "10 minute") \
        .groupBy(window(col("CreatedTime"), "10 minute")) \
"TotalBuy").withColumnRenamed("sum(Sell)", "TotalSell")

output_df = window_agg_df.select("window.start", "window.end",
"TotalBuy", "TotalSell")

window_query = output_df.writeStream \
        .format("console") \
        .outputMode("append") \
        .option("checkpointLocation", "chk-point-dir-mar28") \
        .trigger(processingTime="30 second") \



Currently, I'm processing a single LogType, the requirement is to process
multiple LogTypes in the same flow .. LogTypes will be config driven (not
hard-coded). Objective is to have generic code that can process all

As an example, for LogType X, I will need to get groupby columns col1, col2
and get the sum of values 'sent' & 'received'. for LogType Y, the groupBy
columns will remain the same but the sum will be on column col3 instead.

w/o the watermark, I can look at the LogType and do the processing in batch
mode (using foreachBatch). However, with watermark - i'm unable to figure
out how to process based on LogType.

Any inputs on this ?

