Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining 
messages from ~10 different Kafka topics. I've recently upgraded to Spark 
2.4.3, which has resolved all my issues with the time handling (watermarks, 
join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 
topics are joined, the results are then aggregated with a groupBy and finally 
put onto a result Kafka topic. On the 3 input topics involved, all messages 
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as 
event timestamp on all incoming streams. Since the fields on the incoming 
messages are all named the same (ID and LAST_MOD), I rename them on all 
incoming streams with

     aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as 
A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as 
event time, before joining. I know that the LAST_MOD timestamps are equal on 
the messages that I want to join together.

The first join is an inner join, where a field on stream A links with the ID of 
stream B. So I have

     aDf
        .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
        .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
        .agg(
            collect_list(struct("*")).alias("RESULTS"),
            count("A_ID").alias("NUM_RESULTS"),
            # just add a timestamp to watermark on, they are all the
            min("A_LAST_MOD").alias("RESULT_LAST_MOD")
            )
        .withWatermark("RESULT_LAST_MOD", "30 seconds")
        )

This works perfectly and generates (on my current data set) some 10'000 
records. This is the expected result.

When I add the leftOuter join of the third topic as follows

     aDf
        .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
        -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) 
# C_FK is the field in stream B
        .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
        .agg(
            collect_list(struct("*")).alias("RESULTS"),
            count("A_ID").alias("NUM_RESULTS"),
            # just add a timestamp to watermark on, they are all the
            min("A_LAST_MOD").alias("RESULT_LAST_MOD")
            )
        .withWatermark("RESULT_LAST_MOD", "30 seconds")
        )

then what I would expect is that I get the same number of output records 
(~10'000), and some of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those 
which have a successful join on records on topic C. The other are not shown on 
the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an 
NVL2(C_FK, C_FK, 'FFFF')
   * widen the time window join on the leftOuter to "B_LAST_MOD < 
C_LAST_LAST_MOD - interval 5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the 
optional join FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or 
what could be wrong

-- 
CU, Joe

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to