Hi all
sorry, tl;dr
I'm on my first Python Spark structured streaming app, in the end joining
messages from ~10 different Kafka topics. I've recently upgraded to Spark
2.4.3, which has resolved all my issues with the time handling (watermarks,
join windows) I had before with Spark 2.3.2.
My current problem happens during a leftOuter join, where messages from 3
topics are joined, the results are then aggregated with a groupBy and finally
put onto a result Kafka topic. On the 3 input topics involved, all messages
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as
event timestamp on all incoming streams. Since the fields on the incoming
messages are all named the same (ID and LAST_MOD), I rename them on all
incoming streams with
aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
A_LAST_MOD").drop(*["ID", "LAST_MOD"])
For those data frames, I then take the watermark with the A/B/C_LAST_MOD as
event time, before joining. I know that the LAST_MOD timestamps are equal on
the messages that I want to join together.
The first join is an inner join, where a field on stream A links with the ID of
stream B. So I have
aDf
.join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)
This works perfectly and generates (on my current data set) some 10'000
records. This is the expected result.
When I add the leftOuter join of the third topic as follows
aDf
.join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A
# here the additional left join
-join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter))
# C_FK is the field in stream B
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)
then what I would expect is that I get the same number of output records
(~10'000), and some of them have the additional fields from the C stream.
But what happens is that my output is reduced to ~1'500 records, exactly those
which have a successful join on records on topic C. The other are not shown on
the output.
I already tried
* make sure that the optional FK on topic B is never null, by using an
NVL2(C_FK, C_FK, 'FFFF')
* widen the time window join on the leftOuter to "B_LAST_MOD <
C_LAST_LAST_MOD - interval 5 seconds ..."
* use various combinations of joinWindows and watermarkLateThreshold
The result is always the same: I'm "losing" the ~8'500 records for which the
optional join FK is NULL on topic B.
Did I totally misunderstand the concept of stream-stream left outer join? Or
what could be wrong
--
CU, Joe
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]