you may consider
- Increase Watermark Retention: Consider increasing the watermark retention
duration. This allows keeping records for a longer period before dropping
them. However, this might increase processing latency and violate
at-least-once semantics if the watermark lags behind real-time.
OR
- Use a separate stream for dropped records: Create a separate streaming
pipeline to process the dropped records. Try:
- Filter: Filter out records older than the watermark in the main
pipeline. say
resultC = streamingDataFrame.select( \
col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.timestamp").alias("timestamp") \
, col("parsed_value.temperature").alias("temperature"))
"""
We work out the window and the AVG(temperature) in the window's
timeframe below
This should return back the following Dataframe as struct
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- avg(temperature): double (nullable = true)
"""
resultM = resultC. \
*withWatermark("timestamp", "5 minutes").* \
groupBy(window(resultC.timestamp, "5 minutes", "5
minutes")). \
avg('temperature')
- Write to Sink: Write the filtered records (dropped records) to a
separate Kafka topic.
- Consume and Store: Consume the dropped records topic with another
streaming job and store them in a Postgres table or S3 using lib
HTH
Mich Talebzadeh,
Technologist | Architect | Data Engineer | Generative AI | FinCrime
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
On Wed, 8 May 2024 at 05:13, Nandha Kumar <[email protected]> wrote:
> Hi Team,
> We are trying to use *spark structured streaming *for our use
> case. We will be joining 2 streaming sources(from kafka topic) with
> watermarks. As time progresses, the records that are prior to the watermark
> timestamp are removed from the state. For our use case, we want to *store
> these dropped records* in some postgres table or s3.
>
> When searching, we found a similar question
> <https://stackoverflow.com/questions/60418632/how-to-save-the-records-that-are-dropped-by-watermarking-in-spark-structured-str>in
> StackOverflow which is unanswered.
> *We would like to know how to store these dropped records due to the
> watermark.*
>