I would suspect that rows are never evicted in state in second join. To
determine whether the row is NOT matched to other side, Spark should check
whether the row is ever matched before evicted. You need to set watermark
either B_LAST_MOD or C_LAST_MOD.

If you already did but not exposed to here, please paste all codes
(assuming you've already redacted) to gist or attach zipped file for
project.

Btw, there's known "correctness" issue on streaming-streaming left/right
outer join. Please refer SPARK-26154 [1] for details. That's not a same
case, but should be good to know once you're dealing with
streaming-streaming join.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-26154

On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann <j...@pyx.ch> wrote:

> Hi all
>
> sorry, tl;dr
>
> I'm on my first Python Spark structured streaming app, in the end joining
> messages from ~10 different Kafka topics. I've recently upgraded to Spark
> 2.4.3, which has resolved all my issues with the time handling (watermarks,
> join windows) I had before with Spark 2.3.2.
>
> My current problem happens during a leftOuter join, where messages from 3
> topics are joined, the results are then aggregated with a groupBy and
> finally put onto a result Kafka topic. On the 3 input topics involved, all
> messages have ID and LAST_MOD fields. I use the ID for joining, and the
> LAST_MOD as event timestamp on all incoming streams. Since the fields on
> the incoming messages are all named the same (ID and LAST_MOD), I rename
> them on all incoming streams with
>
>      aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> A_LAST_MOD").drop(*["ID", "LAST_MOD"])
>
> For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> as event time, before joining. I know that the LAST_MOD timestamps are
> equal on the messages that I want to join together.
>
> The first join is an inner join, where a field on stream A links with the
> ID of stream B. So I have
>
>      aDf
>         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
>         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
>         .agg(
>             collect_list(struct("*")).alias("RESULTS"),
>             count("A_ID").alias("NUM_RESULTS"),
>             # just add a timestamp to watermark on, they are all the
>             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
>             )
>         .withWatermark("RESULT_LAST_MOD", "30 seconds")
>         )
>
> This works perfectly and generates (on my current data set) some 10'000
> records. This is the expected result.
>
> When I add the leftOuter join of the third topic as follows
>
>      aDf
>         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> # here the additional left join
>         -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> "leftOuter)) # C_FK is the field in stream B
>         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
>         .agg(
>             collect_list(struct("*")).alias("RESULTS"),
>             count("A_ID").alias("NUM_RESULTS"),
>             # just add a timestamp to watermark on, they are all the
>             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
>             )
>         .withWatermark("RESULT_LAST_MOD", "30 seconds")
>         )
>
> then what I would expect is that I get the same number of output records
> (~10'000), and some of them have the additional fields from the C stream.
>
> But what happens is that my output is reduced to ~1'500 records, exactly
> those which have a successful join on records on topic C. The other are not
> shown on the output.
>
> I already tried
>
>    * make sure that the optional FK on topic B is never null, by using an
> NVL2(C_FK, C_FK, 'FFFF')
>    * widen the time window join on the leftOuter to "B_LAST_MOD <
> C_LAST_LAST_MOD - interval 5 seconds ..."
>    * use various combinations of joinWindows and watermarkLateThreshold
>
> The result is always the same: I'm "losing" the ~8'500 records for which
> the optional join FK is NULL on topic B.
>
> Did I totally misunderstand the concept of stream-stream left outer join?
> Or what could be wrong
>
> --
> CU, Joe
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to