Hi Jungtaek

Thanks for your response!

I actually have set watermarks on all the streams A/B/C with the respective 
event time
column A/B/C_LAST_MOD. So I think this should not be the reason.

Of course, the event time on the C stream (the "optional one") progresses much 
slower
than on the other 2. I try to adjust for this by setting 

   spark.sql.streaming.multipleWatermarkPolicy=max

and judging from the microbatch results, this also works. The global watermark 
seems
to progress as expected with the event time from A/B stream.

I will try to put together an isolated test case to reproduce the issue, that 
whole code
is embedded in a larger app and hence not easily to rip out.

I did some more testing, and for now these are my observations
 - inner join followed by aggregation works as expected
 - inner join with 1 left outer (and no aggregation) works as expected
 - inner join with 2 left outer only produces results where both outer have a 
match
 - inner join with 1 left outer followed by aggregation only produces the 
messages with a match 

Of course, all are stream-stream joins

CU, Joe
 
On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim <kabh...@gmail.com> wrote: 
> I would suspect that rows are never evicted in state in second join. To
> determine whether the row is NOT matched to other side, Spark should check
> whether the row is ever matched before evicted. You need to set watermark
> either B_LAST_MOD or C_LAST_MOD.
> 
> If you already did but not exposed to here, please paste all codes
> (assuming you've already redacted) to gist or attach zipped file for
> project.
> 
> Btw, there's known "correctness" issue on streaming-streaming left/right
> outer join. Please refer SPARK-26154 [1] for details. That's not a same
> case, but should be good to know once you're dealing with
> streaming-streaming join.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 1. https://issues.apache.org/jira/browse/SPARK-26154
> 
> On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann <j...@pyx.ch> wrote:
> 
> > Hi all
> >
> > sorry, tl;dr
> >
> > I'm on my first Python Spark structured streaming app, in the end joining
> > messages from ~10 different Kafka topics. I've recently upgraded to Spark
> > 2.4.3, which has resolved all my issues with the time handling (watermarks,
> > join windows) I had before with Spark 2.3.2.
> >
> > My current problem happens during a leftOuter join, where messages from 3
> > topics are joined, the results are then aggregated with a groupBy and
> > finally put onto a result Kafka topic. On the 3 input topics involved, all
> > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > LAST_MOD as event timestamp on all incoming streams. Since the fields on
> > the incoming messages are all named the same (ID and LAST_MOD), I rename
> > them on all incoming streams with
> >
> >      aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> >
> > For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> > as event time, before joining. I know that the LAST_MOD timestamps are
> > equal on the messages that I want to join together.
> >
> > The first join is an inner join, where a field on stream A links with the
> > ID of stream B. So I have
> >
> >      aDf
> >         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> >         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> >         .agg(
> >             collect_list(struct("*")).alias("RESULTS"),
> >             count("A_ID").alias("NUM_RESULTS"),
> >             # just add a timestamp to watermark on, they are all the
> >             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> >             )
> >         .withWatermark("RESULT_LAST_MOD", "30 seconds")
> >         )
> >
> > This works perfectly and generates (on my current data set) some 10'000
> > records. This is the expected result.
> >
> > When I add the leftOuter join of the third topic as follows
> >
> >      aDf
> >         .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > # here the additional left join
> >         -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > "leftOuter)) # C_FK is the field in stream B
> >         .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> >         .agg(
> >             collect_list(struct("*")).alias("RESULTS"),
> >             count("A_ID").alias("NUM_RESULTS"),
> >             # just add a timestamp to watermark on, they are all the
> >             min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> >             )
> >         .withWatermark("RESULT_LAST_MOD", "30 seconds")
> >         )
> >
> > then what I would expect is that I get the same number of output records
> > (~10'000), and some of them have the additional fields from the C stream.
> >
> > But what happens is that my output is reduced to ~1'500 records, exactly
> > those which have a successful join on records on topic C. The other are not
> > shown on the output.
> >
> > I already tried
> >
> >    * make sure that the optional FK on topic B is never null, by using an
> > NVL2(C_FK, C_FK, 'FFFF')
> >    * widen the time window join on the leftOuter to "B_LAST_MOD <
> > C_LAST_LAST_MOD - interval 5 seconds ..."
> >    * use various combinations of joinWindows and watermarkLateThreshold
> >
> > The result is always the same: I'm "losing" the ~8'500 records for which
> > the optional join FK is NULL on topic B.
> >
> > Did I totally misunderstand the concept of stream-stream left outer join?
> > Or what could be wrong
> >
> > --
> > CU, Joe
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
> 
> -- 
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
 
 
 
 


---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to