Hi Abhijeet,

You are using inner join with unbounded state which means every data in
stream ll match with  other stream infinitely,
  If you want the intended behaviour you should add time stamp conditions
or window operator in join condition



On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar <abhijeet.ku...@sentienz.com>
wrote:

> Hello,
>
> I’m using watermark to join two streams as you can see below:
>
> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")val 
> invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")val 
> join_df = order_wm
>   .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
>
> My understanding with the above code, it will keep each of the stream for
> 20 secs. After it comes but, when I’m giving one stream now and the another
> after 20secs then also both are getting joined. It seems like even after
> watermark got finished Spark is holding the data in memory. I even tried
> after 45 seconds and that was getting joined too.
> I’m sending streams from two Kafka queues and tstamp_trans I’m creating
> with current timestamp values.
>
> This is creating confusion in my mind regarding watermark.
>
> Thank you,
> Abhijeet Kumar
>

Reply via email to