Nice to hear you're investigating the issue deeply. Btw, if attaching code is not easy, maybe you could share logical/physical plan on any batch: "detail" in SQL tab would show up the plan as string. Plans from sequential batches would be much helpful - and streaming query status in these batch (especially watermark) should be helpful too.
On Wed, Jun 5, 2019 at 11:57 PM Joe Ammann <j...@pyx.ch> wrote: > Hi Jungtaek > > Thanks for your response! > > I actually have set watermarks on all the streams A/B/C with the > respective event time > column A/B/C_LAST_MOD. So I think this should not be the reason. > > Of course, the event time on the C stream (the "optional one") progresses > much slower > than on the other 2. I try to adjust for this by setting > > spark.sql.streaming.multipleWatermarkPolicy=max > > and judging from the microbatch results, this also works. The global > watermark seems > to progress as expected with the event time from A/B stream. > > I will try to put together an isolated test case to reproduce the issue, > that whole code > is embedded in a larger app and hence not easily to rip out. > > I did some more testing, and for now these are my observations > - inner join followed by aggregation works as expected > - inner join with 1 left outer (and no aggregation) works as expected > - inner join with 2 left outer only produces results where both outer > have a match > - inner join with 1 left outer followed by aggregation only produces the > messages with a match > > Of course, all are stream-stream joins > > CU, Joe > > On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim <kabh...@gmail.com> > wrote: > > I would suspect that rows are never evicted in state in second join. To > > determine whether the row is NOT matched to other side, Spark should > check > > whether the row is ever matched before evicted. You need to set watermark > > either B_LAST_MOD or C_LAST_MOD. > > > > If you already did but not exposed to here, please paste all codes > > (assuming you've already redacted) to gist or attach zipped file for > > project. > > > > Btw, there's known "correctness" issue on streaming-streaming left/right > > outer join. Please refer SPARK-26154 [1] for details. That's not a same > > case, but should be good to know once you're dealing with > > streaming-streaming join. > > > > Thanks, > > Jungtaek Lim (HeartSaVioR) > > > > 1. https://issues.apache.org/jira/browse/SPARK-26154 > > > > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann <j...@pyx.ch> wrote: > > > > > Hi all > > > > > > sorry, tl;dr > > > > > > I'm on my first Python Spark structured streaming app, in the end > joining > > > messages from ~10 different Kafka topics. I've recently upgraded to > Spark > > > 2.4.3, which has resolved all my issues with the time handling > (watermarks, > > > join windows) I had before with Spark 2.3.2. > > > > > > My current problem happens during a leftOuter join, where messages > from 3 > > > topics are joined, the results are then aggregated with a groupBy and > > > finally put onto a result Kafka topic. On the 3 input topics involved, > all > > > messages have ID and LAST_MOD fields. I use the ID for joining, and the > > > LAST_MOD as event timestamp on all incoming streams. Since the fields > on > > > the incoming messages are all named the same (ID and LAST_MOD), I > rename > > > them on all incoming streams with > > > > > > aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as > > > A_LAST_MOD").drop(*["ID", "LAST_MOD"]) > > > > > > For those data frames, I then take the watermark with the > A/B/C_LAST_MOD > > > as event time, before joining. I know that the LAST_MOD timestamps are > > > equal on the messages that I want to join together. > > > > > > The first join is an inner join, where a field on stream A links with > the > > > ID of stream B. So I have > > > > > > aDf > > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in > stream A > > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > > .agg( > > > collect_list(struct("*")).alias("RESULTS"), > > > count("A_ID").alias("NUM_RESULTS"), > > > # just add a timestamp to watermark on, they are all the > > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > > ) > > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > > ) > > > > > > This works perfectly and generates (on my current data set) some 10'000 > > > records. This is the expected result. > > > > > > When I add the leftOuter join of the third topic as follows > > > > > > aDf > > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in > stream A > > > # here the additional left join > > > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", > > > "leftOuter)) # C_FK is the field in stream B > > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > > .agg( > > > collect_list(struct("*")).alias("RESULTS"), > > > count("A_ID").alias("NUM_RESULTS"), > > > # just add a timestamp to watermark on, they are all the > > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > > ) > > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > > ) > > > > > > then what I would expect is that I get the same number of output > records > > > (~10'000), and some of them have the additional fields from the C > stream. > > > > > > But what happens is that my output is reduced to ~1'500 records, > exactly > > > those which have a successful join on records on topic C. The other > are not > > > shown on the output. > > > > > > I already tried > > > > > > * make sure that the optional FK on topic B is never null, by using > an > > > NVL2(C_FK, C_FK, 'FFFF') > > > * widen the time window join on the leftOuter to "B_LAST_MOD < > > > C_LAST_LAST_MOD - interval 5 seconds ..." > > > * use various combinations of joinWindows and watermarkLateThreshold > > > > > > The result is always the same: I'm "losing" the ~8'500 records for > which > > > the optional join FK is NULL on topic B. > > > > > > Did I totally misunderstand the concept of stream-stream left outer > join? > > > Or what could be wrong > > > > > > -- > > > CU, Joe > > > > > > --------------------------------------------------------------------- > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > > > > > -- > > Name : Jungtaek Lim > > Blog : http://medium.com/@heartsavior > > Twitter : http://twitter.com/heartsavior > > LinkedIn : http://www.linkedin.com/in/heartsavior > > > > > > -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior