I figured out the issue which breaks the second test in SqlSyntaxTest. This is also a correctness issue, unfortunately.
Issue and the fix for OuterJoinTest: https://issues.apache.org/jira/browse/SPARK-49829 Issue and the fix for SqlSyntaxTest: https://issues.apache.org/jira/browse/SPARK-49836 Thanks again for reporting. I wish I hadn't missed this in Feb... On Mon, Sep 30, 2024 at 7:13 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > I just quickly looked into SqlSyntaxTest - the first broken test looks to > be fixed via SPARK-46062 > <https://issues.apache.org/jira/browse/SPARK-46062> which was released in > Spark 3.5.1. The second broken test is a valid issue and I'm yet to know > why this is happening. I'll file a JIRA ticket and let me (or folks in my > team) try to look into it. I'd be happy if there is a volunteer looking > into this issue. > > On Sun, Sep 29, 2024 at 10:15 AM Jungtaek Lim < > kabhwan.opensou...@gmail.com> wrote: > >> Sorry I totally missed this email. This is forgotten for 6 months but I'm >> happy that we have smart users reporting such complex edge-case issues! >> >> I haven't had time to validate all of them but OuterJoinTest is a valid >> correctness issue indeed. Thanks for reporting to us! I figured out the >> root cause and have a fix now. I will submit a fix soon. >> >> I also quickly looked into IntervalJoinTest but it looks like due to how >> SS works. >> >> In the second time interval join, you may expect that lower bound of et1 >> = et3 - 5mins, and WM for et3 isn't delayed by the first time interval >> join, hence lower bound of et1 should be min(WM for et2 - 3mins, WM for et3 >> - 5mins). >> >> But in SS, we have simplified the watermark model - input watermark is >> calculated per "operator" level. (Also we still calculate global watermark >> among watermark definition"s" and apply the same value to all >> watermark definition"s.). So, in the second time interval join, WM for et3 >> is also considered as delayed by the first time interval join as input >> watermark is "min" of all output watermarks from upstream, though it's not >> participated in the first time interval join. That said, lower bound of et1 >> = et3 - 5 mins ~ et3, which is, lower bound of et1 = (wm - 3 mins) - 5 mins >> ~ (wm - 3 mins) = wm - 8 mins ~ wm - 3 mins. That's why moving the >> watermark to window.end + 5 mins does not produce the output and fails the >> test. >> >> Please let me know if this does not make sense to you and we can discuss >> more. >> >> I haven't had time to look into SqlSyntaxTest - we don't have enough >> tests on interop between DataFrame <-> SQL for streaming query, so we might >> have a non-trivial number of unknowns. I (or folks in my team) will take a >> look sooner than later. >> >> Thanks again for the valuable report! >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> >> >> On Tue, Mar 12, 2024 at 8:24 AM Andrzej Zera <andrzejz...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Do you think there is any chance for this issue to get resolved? Should >>> I create another bug report? As mentioned in my message, there is one open >>> already: https://issues.apache.org/jira/browse/SPARK-45637 but it >>> covers only one of the problems. >>> >>> Andrzej >>> >>> wt., 27 lut 2024 o 09:58 Andrzej Zera <andrzejz...@gmail.com> >>> napisał(a): >>> >>>> Hi, >>>> >>>> Yes, I tested all of them on spark 3.5. >>>> >>>> Regards, >>>> Andrzej >>>> >>>> >>>> pon., 26 lut 2024 o 23:24 Mich Talebzadeh <mich.talebza...@gmail.com> >>>> napisał(a): >>>> >>>>> Hi, >>>>> >>>>> These are all on spark 3.5, correct? >>>>> >>>>> Mich Talebzadeh, >>>>> Dad | Technologist | Solutions Architect | Engineer >>>>> London >>>>> United Kingdom >>>>> >>>>> >>>>> view my Linkedin profile >>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>>>> >>>>> >>>>> https://en.everybodywiki.com/Mich_Talebzadeh >>>>> >>>>> >>>>> >>>>> *Disclaimer:* The information provided is correct to the best of my >>>>> knowledge but of course cannot be guaranteed . It is essential to note >>>>> that, as with any advice, quote "one test result is worth one-thousand >>>>> expert opinions (Werner >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>>>> >>>>> >>>>> On Mon, 26 Feb 2024 at 22:18, Andrzej Zera <andrzejz...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hey all, >>>>>> >>>>>> I've been using Structured Streaming in production for almost a year >>>>>> already and I want to share the bugs I found in this time. I created a >>>>>> test >>>>>> for each of the issues and put them all here: >>>>>> https://github.com/andrzejzera/spark-bugs/tree/main/spark-3.5/src/test/scala >>>>>> >>>>>> I split the issues into three groups: outer joins on event time, >>>>>> interval joins and Spark SQL. >>>>>> >>>>>> Issues related to outer joins: >>>>>> >>>>>> - When joining three or more input streams on event time, if two >>>>>> or more streams don't contain an event for a join key (which is event >>>>>> time), no row will be output even if other streams contain an event >>>>>> for >>>>>> this join key. Tests that check for this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L86 >>>>>> and >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L169 >>>>>> - When joining aggregated stream with raw events with a stream >>>>>> with already aggregated events (aggregation made outside of Spark), >>>>>> then no >>>>>> row will be output if that second stream don't contain a corresponding >>>>>> event. Test that checks for this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L266 >>>>>> - When joining two aggregated streams (aggregated in Spark), no >>>>>> result is produced. Test that checks for this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/OuterJoinTest.scala#L341. >>>>>> I've already reported this one here: >>>>>> https://issues.apache.org/jira/browse/SPARK-45637 but it hasn't >>>>>> been handled yet. >>>>>> >>>>>> Issues related to interval joins: >>>>>> >>>>>> - When joining three streams (A, B, C) using interval join on >>>>>> event time, in the way that B.eventTime is conditioned on A.eventTime >>>>>> and >>>>>> C.eventTime is also conditioned on A.eventTime, and then doing window >>>>>> aggregation based on A's event time, the result is output only after >>>>>> watermark crosses the window end + interval(A, B) + interval (A, C). >>>>>> However, I'd expect results to be output faster, i.e. when the >>>>>> watermark >>>>>> crosses window end + MAX(interval(A, B) + interval (A, C)). If our >>>>>> case is >>>>>> that event B can happen 3 minutes after event A and event C can >>>>>> happen 5 >>>>>> minutes after A, there is no point to suspend reporting output for 8 >>>>>> minutes (3+5) after the end of the window if we know that no more >>>>>> event can >>>>>> be matched after 5 min from the window end (assuming window end is >>>>>> based on >>>>>> A's event time). Test that checks for this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/IntervalJoinTest.scala#L32 >>>>>> >>>>>> SQL issues: >>>>>> >>>>>> - WITH clause (in contrast to subquery) seems to create a static >>>>>> DataFrame that can't be used in streaming joins. Test that checks for >>>>>> this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L31 >>>>>> - Two subqueries, each aggregating data using window() functio, >>>>>> breaks the output schema. Test that checks for this: >>>>>> >>>>>> https://github.com/andrzejzera/spark-bugs/blob/abae7a3839326a8eafc7516a51aca5e0c79282a6/spark-3.5/src/test/scala/SqlSyntaxTest.scala#L122 >>>>>> >>>>>> I'm a beginner with Scala (I'm using Structured Streaming with >>>>>> PySpark) so won't be able to provide fixes. But I hope the test cases I >>>>>> provided can be of some help. >>>>>> >>>>>> Regards, >>>>>> Andrzej >>>>>> >>>>>