Hi Jungtaek Thanks for your response!
I actually have set watermarks on all the streams A/B/C with the respective event time column A/B/C_LAST_MOD. So I think this should not be the reason. Of course, the event time on the C stream (the "optional one") progresses much slower than on the other 2. I try to adjust for this by setting spark.sql.streaming.multipleWatermarkPolicy=max and judging from the microbatch results, this also works. The global watermark seems to progress as expected with the event time from A/B stream. I will try to put together an isolated test case to reproduce the issue, that whole code is embedded in a larger app and hence not easily to rip out. I did some more testing, and for now these are my observations - inner join followed by aggregation works as expected - inner join with 1 left outer (and no aggregation) works as expected - inner join with 2 left outer only produces results where both outer have a match - inner join with 1 left outer followed by aggregation only produces the messages with a match Of course, all are stream-stream joins CU, Joe On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim <kabh...@gmail.com> wrote: > I would suspect that rows are never evicted in state in second join. To > determine whether the row is NOT matched to other side, Spark should check > whether the row is ever matched before evicted. You need to set watermark > either B_LAST_MOD or C_LAST_MOD. > > If you already did but not exposed to here, please paste all codes > (assuming you've already redacted) to gist or attach zipped file for > project. > > Btw, there's known "correctness" issue on streaming-streaming left/right > outer join. Please refer SPARK-26154 [1] for details. That's not a same > case, but should be good to know once you're dealing with > streaming-streaming join. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 1. https://issues.apache.org/jira/browse/SPARK-26154 > > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann <j...@pyx.ch> wrote: > > > Hi all > > > > sorry, tl;dr > > > > I'm on my first Python Spark structured streaming app, in the end joining > > messages from ~10 different Kafka topics. I've recently upgraded to Spark > > 2.4.3, which has resolved all my issues with the time handling (watermarks, > > join windows) I had before with Spark 2.3.2. > > > > My current problem happens during a leftOuter join, where messages from 3 > > topics are joined, the results are then aggregated with a groupBy and > > finally put onto a result Kafka topic. On the 3 input topics involved, all > > messages have ID and LAST_MOD fields. I use the ID for joining, and the > > LAST_MOD as event timestamp on all incoming streams. Since the fields on > > the incoming messages are all named the same (ID and LAST_MOD), I rename > > them on all incoming streams with > > > > aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as > > A_LAST_MOD").drop(*["ID", "LAST_MOD"]) > > > > For those data frames, I then take the watermark with the A/B/C_LAST_MOD > > as event time, before joining. I know that the LAST_MOD timestamps are > > equal on the messages that I want to join together. > > > > The first join is an inner join, where a field on stream A links with the > > ID of stream B. So I have > > > > aDf > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > .agg( > > collect_list(struct("*")).alias("RESULTS"), > > count("A_ID").alias("NUM_RESULTS"), > > # just add a timestamp to watermark on, they are all the > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > ) > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > ) > > > > This works perfectly and generates (on my current data set) some 10'000 > > records. This is the expected result. > > > > When I add the leftOuter join of the third topic as follows > > > > aDf > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > > # here the additional left join > > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", > > "leftOuter)) # C_FK is the field in stream B > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > .agg( > > collect_list(struct("*")).alias("RESULTS"), > > count("A_ID").alias("NUM_RESULTS"), > > # just add a timestamp to watermark on, they are all the > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > ) > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > ) > > > > then what I would expect is that I get the same number of output records > > (~10'000), and some of them have the additional fields from the C stream. > > > > But what happens is that my output is reduced to ~1'500 records, exactly > > those which have a successful join on records on topic C. The other are not > > shown on the output. > > > > I already tried > > > > * make sure that the optional FK on topic B is never null, by using an > > NVL2(C_FK, C_FK, 'FFFF') > > * widen the time window join on the leftOuter to "B_LAST_MOD < > > C_LAST_LAST_MOD - interval 5 seconds ..." > > * use various combinations of joinWindows and watermarkLateThreshold > > > > The result is always the same: I'm "losing" the ~8'500 records for which > > the optional join FK is NULL on topic B. > > > > Did I totally misunderstand the concept of stream-stream left outer join? > > Or what could be wrong > > > > -- > > CU, Joe > > > > --------------------------------------------------------------------- > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > > -- > Name : Jungtaek Lim > Blog : http://medium.com/@heartsavior > Twitter : http://twitter.com/heartsavior > LinkedIn : http://www.linkedin.com/in/heartsavior --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org