Sorry I totally missed this email. This is forgotten for 6 months but I'm
happy that we have smart users reporting such complex edge-case issues!

I haven't had time to validate all of them but OuterJoinTest is a valid
correctness issue indeed. Thanks for reporting to us! I figured out the
root cause and have a fix now. I will submit a fix soon.

I also quickly looked into IntervalJoinTest but it looks like due to how SS
works.

In the second time interval join, you may expect that lower bound of et1 =
et3 - 5mins, and WM for et3 isn't delayed by the first time interval join,
hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 -
5mins).

But in SS, we have simplified the watermark model - input watermark is
calculated per "operator" level. (Also we still calculate global watermark
among watermark definition"s" and apply the same value to all
watermark definition"s.). So, in the second time interval join, WM for et3
is also considered as delayed by the first time interval join as input
watermark is "min" of all output watermarks from upstream, though it's not
participated in the first time interval join. That said, lower bound of et1
= et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins
~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the
watermark to window.end + 5 mins does not produce the output and fails the
test.

Please let me know if this does not make sense to you and we can discuss
more.

I haven't had time to look into SqlSyntaxTest - we don't have enough tests
on interop between DataFrame <-> SQL for streaming query, so we might have
a non-trivial number of unknowns. I (or folks in my team) will take a look
sooner than later.

Thanks again for the valuable report!

Thanks,
Jungtaek Lim (HeartSaVioR)



On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera <andrzejz...@gmail.com> wrote:

> Hi,
>
> Do you think there is any chance for this issue to get resolved? Should I
> create another bug report? As mentioned in my message, there is one open
> already: https://issues.apache.org/jira/browse/SPARK-45637 but it covers
> only one of the problems.
>
> Andrzej
>
> wt., 27 lut 2024 o 09:58 Andrzej Zera <andrzejz...@gmail.com> napisał(a):
>
>> Hi,
>>
>> Yes, I tested all of them on spark 3.5.
>>
>> Regards,
>> Andrzej
>>
>>
>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh <mich.talebza...@gmail.com>
>> napisał(a):
>>
>>> Hi,
>>>
>>> These are all on spark 3.5, correct?
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera <andrzejz...@gmail.com>
>>> wrote:
>>>
>>>> Hey all,
>>>>
>>>> I've been using Structured Streaming in production for almost a year
>>>> already and I want to share the bugs I found in this time. I created a test
>>>> for each of the issues and put them all here:
>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala
>>>>
>>>> I split the issues into three groups: outer joins on event time,
>>>> interval joins and Spark SQL.
>>>>
>>>> Issues related to outer joins:
>>>>
>>>>    - When joining three or more input streams on event time, if two or
>>>>    more streams don't contain an event for a join key (which is event 
>>>> time),
>>>>    no row will be output even if other streams contain an event for this 
>>>> join
>>>>    key. Tests that check for this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86
>>>>    and
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169
>>>>    - When joining aggregated stream with raw events with a stream with
>>>>    already aggregated events (aggregation made outside of Spark), then no 
>>>> row
>>>>    will be output if that second stream don't contain a corresponding 
>>>> event.
>>>>    Test that checks for this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266
>>>>    - When joining two aggregated streams (aggregated in Spark), no
>>>>    result is produced. Test that checks for this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341.
>>>>    I've already reported this one here:
>>>>    https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't
>>>>    been handled yet.
>>>>
>>>> Issues related to interval joins:
>>>>
>>>>    - When joining three streams (A, B, C) using interval join on event
>>>>    time, in the way that B.eventTime is conditioned on A.eventTime and
>>>>    C.eventTime is also conditioned on A.eventTime, and then doing window
>>>>    aggregation based on A's event time, the result is output only after
>>>>    watermark crosses the window end + interval(A, B) + interval (A, C).
>>>>    However, I'd expect results to be output faster, i.e. when the watermark
>>>>    crosses window end + MAX(interval(A, B) + interval (A, C)). If our case 
>>>> is
>>>>    that event B can happen 3 minutes after event A and event C can happen 5
>>>>    minutes after A, there is no point to suspend reporting output for 8
>>>>    minutes (3+5) after the end of the window if we know that no more event 
>>>> can
>>>>    be matched after 5 min from the window end (assuming window end is 
>>>> based on
>>>>    A's event time). Test that checks for this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32
>>>>
>>>> SQL issues:
>>>>
>>>>    - WITH clause (in contrast to subquery) seems to create a static
>>>>    DataFrame that can't be used in streaming joins. Test that checks for 
>>>> this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31
>>>>    - Two subqueries, each aggregating data using window() functio,
>>>>    breaks the output schema. Test that checks for this:
>>>>    
>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122
>>>>
>>>> I'm a beginner with Scala (I'm using Structured Streaming with PySpark)
>>>> so won't be able to provide fixes. But I hope the test cases I provided can
>>>> be of some help.
>>>>
>>>> Regards,
>>>> Andrzej
>>>>
>>>

Reply via email to