you may consider

- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.

OR

- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:


   - Filter: Filter out records older than the watermark in the main
   pipeline.  say

           resultC = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.timestamp").alias("timestamp") \
                   , col("parsed_value.temperature").alias("temperature"))

            """
            We work out the window and the AVG(temperature) in the window's
timeframe below
            This should return back the following Dataframe as struct

             root
             |-- window: struct (nullable = false)
             |    |-- start: timestamp (nullable = true)
             |    |-- end: timestamp (nullable = true)
             |-- avg(temperature): double (nullable = true)

            """
            resultM = resultC. \
                     *withWatermark("timestamp", "5 minutes").* \
                     groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
                     avg('temperature')

   - Write to Sink: Write the filtered records (dropped records) to a
   separate Kafka topic.
   - Consume and Store: Consume the dropped records topic with another
   streaming job and store them in a Postgres table or S3 using lib


HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Wed, 8 May 2024 at 05:13, Nandha Kumar <nandha1...@gmail.com> wrote:

> Hi Team,
>        We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> <https://stackoverflow.com/questions/60418632/how-to-save-the-records-that-are-dropped-by-watermarking-in-spark-structured-str>in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>

Reply via email to