Hello,

I’m using watermark to join two streams as you can see below:

val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
My understanding with the above code, it will keep each of the stream for 20 
secs. After it comes but, when I’m giving one stream now and the another after 
20secs then also both are getting joined. It seems like even after watermark 
got finished Spark is holding the data in memory. I even tried after 45 seconds 
and that was getting joined too.

I’m sending streams from two Kafka queues and tstamp_trans I’m creating with 
current timestamp values.
This is creating confusion in my mind regarding watermark.


Thank you,
Abhijeet Kumar

Reply via email to