Hi all sorry, tl;dr
I'm on my first Python Spark structured streaming app, in the end joining messages from ~10 different Kafka topics. I've recently upgraded to Spark 2.4.3, which has resolved all my issues with the time handling (watermarks, join windows) I had before with Spark 2.3.2. My current problem happens during a leftOuter join, where messages from 3 topics are joined, the results are then aggregated with a groupBy and finally put onto a result Kafka topic. On the 3 input topics involved, all messages have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as event timestamp on all incoming streams. Since the fields on the incoming messages are all named the same (ID and LAST_MOD), I rename them on all incoming streams with aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as A_LAST_MOD").drop(*["ID", "LAST_MOD"]) For those data frames, I then take the watermark with the A/B/C_LAST_MOD as event time, before joining. I know that the LAST_MOD timestamps are equal on the messages that I want to join together. The first join is an inner join, where a field on stream A links with the ID of stream B. So I have aDf .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) .agg( collect_list(struct("*")).alias("RESULTS"), count("A_ID").alias("NUM_RESULTS"), # just add a timestamp to watermark on, they are all the min("A_LAST_MOD").alias("RESULT_LAST_MOD") ) .withWatermark("RESULT_LAST_MOD", "30 seconds") ) This works perfectly and generates (on my current data set) some 10'000 records. This is the expected result. When I add the leftOuter join of the third topic as follows aDf .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A # here the additional left join -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) # C_FK is the field in stream B .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) .agg( collect_list(struct("*")).alias("RESULTS"), count("A_ID").alias("NUM_RESULTS"), # just add a timestamp to watermark on, they are all the min("A_LAST_MOD").alias("RESULT_LAST_MOD") ) .withWatermark("RESULT_LAST_MOD", "30 seconds") ) then what I would expect is that I get the same number of output records (~10'000), and some of them have the additional fields from the C stream. But what happens is that my output is reduced to ~1'500 records, exactly those which have a successful join on records on topic C. The other are not shown on the output. I already tried * make sure that the optional FK on topic B is never null, by using an NVL2(C_FK, C_FK, 'FFFF') * widen the time window join on the leftOuter to "B_LAST_MOD < C_LAST_LAST_MOD - interval 5 seconds ..." * use various combinations of joinWindows and watermarkLateThreshold The result is always the same: I'm "losing" the ~8'500 records for which the optional join FK is NULL on topic B. Did I totally misunderstand the concept of stream-stream left outer join? Or what could be wrong -- CU, Joe --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org