Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Jungtaek Lim
Please refer the structured streaming guide doc which is very clear of
representing when the query will have unbounded state.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking

Quoting the doc:



In other words, you will have to do the following additional steps in the
join.

   1.

   Define watermark delays on both inputs such that the engine knows how
   delayed the input can be (similar to streaming aggregations)
   2.

   Define a constraint on event-time across the two inputs such that the
   engine can figure out when old rows of one input is not going to be
   required (i.e. will not satisfy the time constraint) for matches with the
   other input. This constraint can be defined in one of the two ways.
   1.

  Time range join conditions (e.g. ...JOIN ON leftTime BETWEEN
  rightTime AND rightTime + INTERVAL 1 HOUR),
  2.

  Join on event-time windows (e.g. ...JOIN ON leftTimeWindow =
  rightTimeWindow).



So yes, join condition should directly deal with timestamp column,
otherwise state will grow infinitely.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 12월 11일 (화) 오후 2:52, Abhijeet Kumar 님이
작성:

> You mean to say that Spark will store all the data in memory forever :)
>
>
> On 10-Dec-2018, at 6:16 PM, Sandeep Katta <
> sandeep0102.opensou...@gmail.com> wrote:
>
> Hi Abhijeet,
>
> You are using inner join with unbounded state which means every data in
> stream ll match with  other stream infinitely,
>   If you want the intended behaviour you should add time stamp conditions
> or window operator in join condition
>
>
>
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar <
> abhijeet.ku...@sentienz.com> wrote:
>
>> Hello,
>>
>> I’m using watermark to join two streams as you can see below:
>>
>> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")val 
>> invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")val 
>> join_df = order_wm
>>   .join(invoice_wm, order_wm.col("s_order_id") === 
>> invoice_wm.col("order_id"))
>>
>> My understanding with the above code, it will keep each of the stream for
>> 20 secs. After it comes but, when I’m giving one stream now and the another
>> after 20secs then also both are getting joined. It seems like even after
>> watermark got finished Spark is holding the data in memory. I even tried
>> after 45 seconds and that was getting joined too.
>> I’m sending streams from two Kafka queues and tstamp_trans I’m creating
>> with current timestamp values.
>>
>> This is creating confusion in my mind regarding watermark.
>>
>> Thank you,
>> Abhijeet Kumar
>>
>
>


Re: Why does join use rows that were sent after watermark of 20 seconds?

2018-12-10 Thread Abhijeet Kumar
You mean to say that Spark will store all the data in memory forever :)

> On 10-Dec-2018, at 6:16 PM, Sandeep Katta  
> wrote:
> 
> Hi Abhijeet,
> 
> You are using inner join with unbounded state which means every data in 
> stream ll match with  other stream infinitely, 
>   If you want the intended behaviour you should add time stamp conditions or 
> window operator in join condition
> 
> 
> 
> On Mon, 10 Dec 2018 at 5:23 PM, Abhijeet Kumar  > wrote:
> Hello,
> 
> I’m using watermark to join two streams as you can see below:
> 
> val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
> val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
> val join_df = order_wm
>   .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
> My understanding with the above code, it will keep each of the stream for 20 
> secs. After it comes but, when I’m giving one stream now and the another 
> after 20secs then also both are getting joined. It seems like even after 
> watermark got finished Spark is holding the data in memory. I even tried 
> after 45 seconds and that was getting joined too.
> 
> I’m sending streams from two Kafka queues and tstamp_trans I’m creating with 
> current timestamp values.
> This is creating confusion in my mind regarding watermark.
> 
> 
> Thank you,
> Abhijeet Kumar