Best to qualify your thoughts with an example

By using the foreachBatch function combined with the update output mode in
Spark Structured Streaming, you can effectively handle and integrate
late-arriving data into your aggregations. This approach will allow you to
continuously update your aggregated results with both on-time and late data

example

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, window, sum as spark_sum, max
as spark_max, current_timestamp

# Create Spark session
spark = SparkSession.builder.appName("exampleWithRate").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Simulate a stream of data with an event time
stream = spark.readStream.format("rate").option("rowsPerSecond", 5).load()
base_timestamp = current_timestamp()
stream = stream.withColumn("event_time", (base_timestamp + (col("value") *
60).cast("interval second")).cast("timestamp"))
stream = stream.withColumn("value", col("value") % 10)

def process_batch(batch_df, batch_id):
    # Read current state from an external store (simulated here as a static
DataFrame)
    current_state = spark.createDataFrame(
        [(1, 10, '2024-06-13 10:00:00')],
        ["key", "total_value", "max_event_time"]
    ).withColumn("max_event_time", col("max_event_time").cast("timestamp"))

    # Perform aggregation including late data handling
    aggregated_batch = batch_df.groupBy("value").agg(
        spark_sum("value").alias("total_value"),
        spark_max("event_time").alias("max_event_time")
    )

    # Merge with current state
    merged_state = current_state.union(aggregated_batch)

    # Show the merged state
    merged_state.show(truncate=False)

# Define your streaming query
streaming_query = (
    stream
    .withWatermark("event_time", "10 minutes")
    .writeStream
    .foreachBatch(process_batch)
    .outputMode("update")
    .start()
)
# Await termination
streaming_query.awaitTermination()


and the output

+---+-----------+-------------------+
|key|total_value|max_event_time     |
+---+-----------+-------------------+
|1  |10         |2024-06-13 10:00:00|
+---+-----------+-------------------+

+---+-----------+-----------------------+
|key|total_value|max_event_time         |
+---+-----------+-----------------------+
|1  |10         |2024-06-13 10:00:00    |
|0  |0          |2024-06-15 16:22:23.642|
|8  |8          |2024-06-15 16:20:23.642|
|2  |4          |2024-06-15 16:24:23.642|
|4  |8          |2024-06-15 16:26:23.642|
|9  |9          |2024-06-15 16:21:23.642|
|5  |5          |2024-06-15 16:17:23.642|
|1  |2          |2024-06-15 16:23:23.642|
|3  |6          |2024-06-15 16:25:23.642|
|6  |6          |2024-06-15 16:18:23.642|
|7  |7          |2024-06-15 16:19:23.642|
+---+-----------+-----------------------+

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime


Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
PhD <https://en.wikipedia.org/wiki/Doctor_of_Philosophy> Imperial College
London <https://en.wikipedia.org/wiki/Imperial_College_London>
London, United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Fri, 14 Jun 2024 at 20:13, Om Prakash <ompr...@gmail.com> wrote:

> Hi Team,
>
> Hope you all are doing well. I have run into a use case in which I want to
> do the aggregation in foreachbatch and use update mode for handling late
> data in structured streaming. Will this approach work in effectively
> capturing late arriving data in the aggregations?  Please help.
>
>
>
> Thanking you,
> Om Prakash
>

Reply via email to