you may consider - Increase Watermark Retention: Consider increasing the watermark retention duration. This allows keeping records for a longer period before dropping them. However, this might increase processing latency and violate at-least-once semantics if the watermark lags behind real-time.
OR - Use a separate stream for dropped records: Create a separate streaming pipeline to process the dropped records. Try: - Filter: Filter out records older than the watermark in the main pipeline. say resultC = streamingDataFrame.select( \ col("parsed_value.rowkey").alias("rowkey") \ , col("parsed_value.timestamp").alias("timestamp") \ , col("parsed_value.temperature").alias("temperature")) """ We work out the window and the AVG(temperature) in the window's timeframe below This should return back the following Dataframe as struct root |-- window: struct (nullable = false) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- avg(temperature): double (nullable = true) """ resultM = resultC. \ *withWatermark("timestamp", "5 minutes").* \ groupBy(window(resultC.timestamp, "5 minutes", "5 minutes")). \ avg('temperature') - Write to Sink: Write the filtered records (dropped records) to a separate Kafka topic. - Consume and Store: Consume the dropped records topic with another streaming job and store them in a Postgres table or S3 using lib HTH Mich Talebzadeh, Technologist | Architect | Data Engineer | Generative AI | FinCrime London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Wed, 8 May 2024 at 05:13, Nandha Kumar <nandha1...@gmail.com> wrote: > Hi Team, > We are trying to use *spark structured streaming *for our use > case. We will be joining 2 streaming sources(from kafka topic) with > watermarks. As time progresses, the records that are prior to the watermark > timestamp are removed from the state. For our use case, we want to *store > these dropped records* in some postgres table or s3. > > When searching, we found a similar question > <https://stackoverflow.com/questions/60418632/how-to-save-the-records-that-are-dropped-by-watermarking-in-spark-structured-str>in > StackOverflow which is unanswered. > *We would like to know how to store these dropped records due to the > watermark.* >