Best to qualify your thoughts with an example By using the foreachBatch function combined with the update output mode in Spark Structured Streaming, you can effectively handle and integrate late-arriving data into your aggregations. This approach will allow you to continuously update your aggregated results with both on-time and late data
example from pyspark.sql import SparkSession from pyspark.sql.functions import expr, col, window, sum as spark_sum, max as spark_max, current_timestamp # Create Spark session spark = SparkSession.builder.appName("exampleWithRate").getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR") # Simulate a stream of data with an event time stream = spark.readStream.format("rate").option("rowsPerSecond", 5).load() base_timestamp = current_timestamp() stream = stream.withColumn("event_time", (base_timestamp + (col("value") * 60).cast("interval second")).cast("timestamp")) stream = stream.withColumn("value", col("value") % 10) def process_batch(batch_df, batch_id): # Read current state from an external store (simulated here as a static DataFrame) current_state = spark.createDataFrame( [(1, 10, '2024-06-13 10:00:00')], ["key", "total_value", "max_event_time"] ).withColumn("max_event_time", col("max_event_time").cast("timestamp")) # Perform aggregation including late data handling aggregated_batch = batch_df.groupBy("value").agg( spark_sum("value").alias("total_value"), spark_max("event_time").alias("max_event_time") ) # Merge with current state merged_state = current_state.union(aggregated_batch) # Show the merged state merged_state.show(truncate=False) # Define your streaming query streaming_query = ( stream .withWatermark("event_time", "10 minutes") .writeStream .foreachBatch(process_batch) .outputMode("update") .start() ) # Await termination streaming_query.awaitTermination() and the output +---+-----------+-------------------+ |key|total_value|max_event_time | +---+-----------+-------------------+ |1 |10 |2024-06-13 10:00:00| +---+-----------+-------------------+ +---+-----------+-----------------------+ |key|total_value|max_event_time | +---+-----------+-----------------------+ |1 |10 |2024-06-13 10:00:00 | |0 |0 |2024-06-15 16:22:23.642| |8 |8 |2024-06-15 16:20:23.642| |2 |4 |2024-06-15 16:24:23.642| |4 |8 |2024-06-15 16:26:23.642| |9 |9 |2024-06-15 16:21:23.642| |5 |5 |2024-06-15 16:17:23.642| |1 |2 |2024-06-15 16:23:23.642| |3 |6 |2024-06-15 16:25:23.642| |6 |6 |2024-06-15 16:18:23.642| |7 |7 |2024-06-15 16:19:23.642| +---+-----------+-----------------------+ HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)". Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College London <https://en.wikipedia.org/wiki/Imperial_College_London> London, United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Fri, 14 Jun 2024 at 20:13, Om Prakash <ompr...@gmail.com> wrote: > Hi Team, > > Hope you all are doing well. I have run into a use case in which I want to > do the aggregation in foreachbatch and use update mode for handling late > data in structured streaming. Will this approach work in effectively > capturing late arriving data in the aggregations? Please help. > > > > Thanking you, > Om Prakash >