Re: Spark structured streaming leftOuter join not working as I expect
Got the point. If you would like to get "correct" output, you may need to set global watermark as "min", because watermark is not only used for evicting rows in state, but also discarding input rows later than watermark. Here you may want to be aware that there're two stateful operators which will receive inputs from previous stage and discard them via watermark before processing. Btw, you may also need to consider the difference of the concept of watermark between Spark and others: 1. Spark uses high watermark (picks highest event timestamp of input rows) even for single watermark whereas other frameworks use low watermark (picks lowest event timestamp of input rows). So you may always need to set enough delay on watermark. 2. Spark uses global watermark whereas other frameworks normally use operator-wise watermark. This is limitation of Spark (given outputs of previous stateful operator will become inputs of next stateful operator, they should have different watermark) and one of contributor proposes the approach [1] which would fit for Spark (unfortunately it haven't been reviewed by committers so long). Thanks, Jungtaek Lim (HeartSaVioR) 1. https://github.com/apache/spark/pull/23576 On Tue, Jun 11, 2019 at 7:06 AM Joe Ammann wrote: > Hi all > > it took me some time to get the issues extracted into a piece of > standalone code. I created the following gist > > https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17 > > I has messages for 4 topics A/B/C/D and a simple Python program which > shows 6 use cases, with my expectations and observations with Spark 2.4.3 > > It would be great if you could have a look and check if I'm doing > something wrong, or this is indeed a limitation of Spark? > > On 6/5/19 5:35 PM, Jungtaek Lim wrote: > > Nice to hear you're investigating the issue deeply. > > > > Btw, if attaching code is not easy, maybe you could share > logical/physical plan on any batch: "detail" in SQL tab would show up the > plan as string. Plans from sequential batches would be much helpful - and > streaming query status in these batch (especially watermark) should be > helpful too. > > > > > -- > CU, Joe > -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
Re: Spark structured streaming leftOuter join not working as I expect
Hi all it took me some time to get the issues extracted into a piece of standalone code. I created the following gist https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17 I has messages for 4 topics A/B/C/D and a simple Python program which shows 6 use cases, with my expectations and observations with Spark 2.4.3 It would be great if you could have a look and check if I'm doing something wrong, or this is indeed a limitation of Spark? On 6/5/19 5:35 PM, Jungtaek Lim wrote: > Nice to hear you're investigating the issue deeply. > > Btw, if attaching code is not easy, maybe you could share logical/physical > plan on any batch: "detail" in SQL tab would show up the plan as string. > Plans from sequential batches would be much helpful - and streaming query > status in these batch (especially watermark) should be helpful too. > -- CU, Joe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark structured streaming leftOuter join not working as I expect
Nice to hear you're investigating the issue deeply. Btw, if attaching code is not easy, maybe you could share logical/physical plan on any batch: "detail" in SQL tab would show up the plan as string. Plans from sequential batches would be much helpful - and streaming query status in these batch (especially watermark) should be helpful too. On Wed, Jun 5, 2019 at 11:57 PM Joe Ammann wrote: > Hi Jungtaek > > Thanks for your response! > > I actually have set watermarks on all the streams A/B/C with the > respective event time > column A/B/C_LAST_MOD. So I think this should not be the reason. > > Of course, the event time on the C stream (the "optional one") progresses > much slower > than on the other 2. I try to adjust for this by setting > >spark.sql.streaming.multipleWatermarkPolicy=max > > and judging from the microbatch results, this also works. The global > watermark seems > to progress as expected with the event time from A/B stream. > > I will try to put together an isolated test case to reproduce the issue, > that whole code > is embedded in a larger app and hence not easily to rip out. > > I did some more testing, and for now these are my observations > - inner join followed by aggregation works as expected > - inner join with 1 left outer (and no aggregation) works as expected > - inner join with 2 left outer only produces results where both outer > have a match > - inner join with 1 left outer followed by aggregation only produces the > messages with a match > > Of course, all are stream-stream joins > > CU, Joe > > On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim > wrote: > > I would suspect that rows are never evicted in state in second join. To > > determine whether the row is NOT matched to other side, Spark should > check > > whether the row is ever matched before evicted. You need to set watermark > > either B_LAST_MOD or C_LAST_MOD. > > > > If you already did but not exposed to here, please paste all codes > > (assuming you've already redacted) to gist or attach zipped file for > > project. > > > > Btw, there's known "correctness" issue on streaming-streaming left/right > > outer join. Please refer SPARK-26154 [1] for details. That's not a same > > case, but should be good to know once you're dealing with > > streaming-streaming join. > > > > Thanks, > > Jungtaek Lim (HeartSaVioR) > > > > 1. https://issues.apache.org/jira/browse/SPARK-26154 > > > > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann wrote: > > > > > Hi all > > > > > > sorry, tl;dr > > > > > > I'm on my first Python Spark structured streaming app, in the end > joining > > > messages from ~10 different Kafka topics. I've recently upgraded to > Spark > > > 2.4.3, which has resolved all my issues with the time handling > (watermarks, > > > join windows) I had before with Spark 2.3.2. > > > > > > My current problem happens during a leftOuter join, where messages > from 3 > > > topics are joined, the results are then aggregated with a groupBy and > > > finally put onto a result Kafka topic. On the 3 input topics involved, > all > > > messages have ID and LAST_MOD fields. I use the ID for joining, and the > > > LAST_MOD as event timestamp on all incoming streams. Since the fields > on > > > the incoming messages are all named the same (ID and LAST_MOD), I > rename > > > them on all incoming streams with > > > > > > aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as > > > A_LAST_MOD").drop(*["ID", "LAST_MOD"]) > > > > > > For those data frames, I then take the watermark with the > A/B/C_LAST_MOD > > > as event time, before joining. I know that the LAST_MOD timestamps are > > > equal on the messages that I want to join together. > > > > > > The first join is an inner join, where a field on stream A links with > the > > > ID of stream B. So I have > > > > > > aDf > > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in > stream A > > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > > .agg( > > > collect_list(struct("*")).alias("RESULTS"), > > > count("A_ID").alias("NUM_RESULTS"), > > > # just add a timestamp to watermark on, they are all the > > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > > ) > > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > > ) > > > > > > This works perfectly and generates (on my current data set) some 10'000 > > > records. This is the expected result. > > > > > > When I add the leftOuter join of the third topic as follows > > > > > > aDf > > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in > stream A > > > # here the additional left join > > > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", > > > "leftOuter)) # C_FK is the field in stream B > > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > > .agg( > > > collect_list(struct("*")).alias("RESULTS"), > > > count(
Re: Spark structured streaming leftOuter join not working as I expect
Hi Jungtaek Thanks for your response! I actually have set watermarks on all the streams A/B/C with the respective event time column A/B/C_LAST_MOD. So I think this should not be the reason. Of course, the event time on the C stream (the "optional one") progresses much slower than on the other 2. I try to adjust for this by setting spark.sql.streaming.multipleWatermarkPolicy=max and judging from the microbatch results, this also works. The global watermark seems to progress as expected with the event time from A/B stream. I will try to put together an isolated test case to reproduce the issue, that whole code is embedded in a larger app and hence not easily to rip out. I did some more testing, and for now these are my observations - inner join followed by aggregation works as expected - inner join with 1 left outer (and no aggregation) works as expected - inner join with 2 left outer only produces results where both outer have a match - inner join with 1 left outer followed by aggregation only produces the messages with a match Of course, all are stream-stream joins CU, Joe On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim wrote: > I would suspect that rows are never evicted in state in second join. To > determine whether the row is NOT matched to other side, Spark should check > whether the row is ever matched before evicted. You need to set watermark > either B_LAST_MOD or C_LAST_MOD. > > If you already did but not exposed to here, please paste all codes > (assuming you've already redacted) to gist or attach zipped file for > project. > > Btw, there's known "correctness" issue on streaming-streaming left/right > outer join. Please refer SPARK-26154 [1] for details. That's not a same > case, but should be good to know once you're dealing with > streaming-streaming join. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > 1. https://issues.apache.org/jira/browse/SPARK-26154 > > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann wrote: > > > Hi all > > > > sorry, tl;dr > > > > I'm on my first Python Spark structured streaming app, in the end joining > > messages from ~10 different Kafka topics. I've recently upgraded to Spark > > 2.4.3, which has resolved all my issues with the time handling (watermarks, > > join windows) I had before with Spark 2.3.2. > > > > My current problem happens during a leftOuter join, where messages from 3 > > topics are joined, the results are then aggregated with a groupBy and > > finally put onto a result Kafka topic. On the 3 input topics involved, all > > messages have ID and LAST_MOD fields. I use the ID for joining, and the > > LAST_MOD as event timestamp on all incoming streams. Since the fields on > > the incoming messages are all named the same (ID and LAST_MOD), I rename > > them on all incoming streams with > > > > aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as > > A_LAST_MOD").drop(*["ID", "LAST_MOD"]) > > > > For those data frames, I then take the watermark with the A/B/C_LAST_MOD > > as event time, before joining. I know that the LAST_MOD timestamps are > > equal on the messages that I want to join together. > > > > The first join is an inner join, where a field on stream A links with the > > ID of stream B. So I have > > > > aDf > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > .agg( > > collect_list(struct("*")).alias("RESULTS"), > > count("A_ID").alias("NUM_RESULTS"), > > # just add a timestamp to watermark on, they are all the > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > ) > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > ) > > > > This works perfectly and generates (on my current data set) some 10'000 > > records. This is the expected result. > > > > When I add the leftOuter join of the third topic as follows > > > > aDf > > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > > # here the additional left join > > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", > > "leftOuter)) # C_FK is the field in stream B > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > > .agg( > > collect_list(struct("*")).alias("RESULTS"), > > count("A_ID").alias("NUM_RESULTS"), > > # just add a timestamp to watermark on, they are all the > > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > > ) > > .withWatermark("RESULT_LAST_MOD", "30 seconds") > > ) > > > > then what I would expect is that I get the same number of output records > > (~10'000), and some of them have the additional fields from the C stream. > > > > But what happens is that my output is reduced to ~1'500 records, exactly > > those which have a successful join on records on topic C. The other are not > > shown on the output. > > > > I alread
Re: Spark structured streaming leftOuter join not working as I expect
I would suspect that rows are never evicted in state in second join. To determine whether the row is NOT matched to other side, Spark should check whether the row is ever matched before evicted. You need to set watermark either B_LAST_MOD or C_LAST_MOD. If you already did but not exposed to here, please paste all codes (assuming you've already redacted) to gist or attach zipped file for project. Btw, there's known "correctness" issue on streaming-streaming left/right outer join. Please refer SPARK-26154 [1] for details. That's not a same case, but should be good to know once you're dealing with streaming-streaming join. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-26154 On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann wrote: > Hi all > > sorry, tl;dr > > I'm on my first Python Spark structured streaming app, in the end joining > messages from ~10 different Kafka topics. I've recently upgraded to Spark > 2.4.3, which has resolved all my issues with the time handling (watermarks, > join windows) I had before with Spark 2.3.2. > > My current problem happens during a leftOuter join, where messages from 3 > topics are joined, the results are then aggregated with a groupBy and > finally put onto a result Kafka topic. On the 3 input topics involved, all > messages have ID and LAST_MOD fields. I use the ID for joining, and the > LAST_MOD as event timestamp on all incoming streams. Since the fields on > the incoming messages are all named the same (ID and LAST_MOD), I rename > them on all incoming streams with > > aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as > A_LAST_MOD").drop(*["ID", "LAST_MOD"]) > > For those data frames, I then take the watermark with the A/B/C_LAST_MOD > as event time, before joining. I know that the LAST_MOD timestamps are > equal on the messages that I want to join together. > > The first join is an inner join, where a field on stream A links with the > ID of stream B. So I have > > aDf > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > .agg( > collect_list(struct("*")).alias("RESULTS"), > count("A_ID").alias("NUM_RESULTS"), > # just add a timestamp to watermark on, they are all the > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > ) > .withWatermark("RESULT_LAST_MOD", "30 seconds") > ) > > This works perfectly and generates (on my current data set) some 10'000 > records. This is the expected result. > > When I add the leftOuter join of the third topic as follows > > aDf > .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A > # here the additional left join > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", > "leftOuter)) # C_FK is the field in stream B > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) > .agg( > collect_list(struct("*")).alias("RESULTS"), > count("A_ID").alias("NUM_RESULTS"), > # just add a timestamp to watermark on, they are all the > min("A_LAST_MOD").alias("RESULT_LAST_MOD") > ) > .withWatermark("RESULT_LAST_MOD", "30 seconds") > ) > > then what I would expect is that I get the same number of output records > (~10'000), and some of them have the additional fields from the C stream. > > But what happens is that my output is reduced to ~1'500 records, exactly > those which have a successful join on records on topic C. The other are not > shown on the output. > > I already tried > >* make sure that the optional FK on topic B is never null, by using an > NVL2(C_FK, C_FK, '') >* widen the time window join on the leftOuter to "B_LAST_MOD < > C_LAST_LAST_MOD - interval 5 seconds ..." >* use various combinations of joinWindows and watermarkLateThreshold > > The result is always the same: I'm "losing" the ~8'500 records for which > the optional join FK is NULL on topic B. > > Did I totally misunderstand the concept of stream-stream left outer join? > Or what could be wrong > > -- > CU, Joe > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Name : Jungtaek Lim Blog : http://medium.com/@heartsavior Twitter : http://twitter.com/heartsavior LinkedIn : http://www.linkedin.com/in/heartsavior
Spark structured streaming leftOuter join not working as I expect
Hi all sorry, tl;dr I'm on my first Python Spark structured streaming app, in the end joining messages from ~10 different Kafka topics. I've recently upgraded to Spark 2.4.3, which has resolved all my issues with the time handling (watermarks, join windows) I had before with Spark 2.3.2. My current problem happens during a leftOuter join, where messages from 3 topics are joined, the results are then aggregated with a groupBy and finally put onto a result Kafka topic. On the 3 input topics involved, all messages have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as event timestamp on all incoming streams. Since the fields on the incoming messages are all named the same (ID and LAST_MOD), I rename them on all incoming streams with aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as A_LAST_MOD").drop(*["ID", "LAST_MOD"]) For those data frames, I then take the watermark with the A/B/C_LAST_MOD as event time, before joining. I know that the LAST_MOD timestamps are equal on the messages that I want to join together. The first join is an inner join, where a field on stream A links with the ID of stream B. So I have aDf .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) .agg( collect_list(struct("*")).alias("RESULTS"), count("A_ID").alias("NUM_RESULTS"), # just add a timestamp to watermark on, they are all the min("A_LAST_MOD").alias("RESULT_LAST_MOD") ) .withWatermark("RESULT_LAST_MOD", "30 seconds") ) This works perfectly and generates (on my current data set) some 10'000 records. This is the expected result. When I add the leftOuter join of the third topic as follows aDf .join(bDf, expr("B_FK = B_ID")) # B_FK is the field in stream A # here the additional left join -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) # C_FK is the field in stream B .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds")) .agg( collect_list(struct("*")).alias("RESULTS"), count("A_ID").alias("NUM_RESULTS"), # just add a timestamp to watermark on, they are all the min("A_LAST_MOD").alias("RESULT_LAST_MOD") ) .withWatermark("RESULT_LAST_MOD", "30 seconds") ) then what I would expect is that I get the same number of output records (~10'000), and some of them have the additional fields from the C stream. But what happens is that my output is reduced to ~1'500 records, exactly those which have a successful join on records on topic C. The other are not shown on the output. I already tried * make sure that the optional FK on topic B is never null, by using an NVL2(C_FK, C_FK, '') * widen the time window join on the leftOuter to "B_LAST_MOD < C_LAST_LAST_MOD - interval 5 seconds ..." * use various combinations of joinWindows and watermarkLateThreshold The result is always the same: I'm "losing" the ~8'500 records for which the optional join FK is NULL on topic B. Did I totally misunderstand the concept of stream-stream left outer join? Or what could be wrong -- CU, Joe - To unsubscribe e-mail: user-unsubscr...@spark.apache.org