Re: Spark structured streaming leftOuter join not working as I expect

2019-06-11 Thread Jungtaek Lim
Got the point. If you would like to get "correct" output, you may need to
set global watermark as "min", because watermark is not only used for
evicting rows in state, but also discarding input rows later than
watermark. Here you may want to be aware that there're two stateful
operators which will receive inputs from previous stage and discard them
via watermark before processing.

Btw, you may also need to consider the difference of the concept of
watermark between Spark and others:

1. Spark uses high watermark (picks highest event timestamp of input rows)
even for single watermark whereas other frameworks use low watermark (picks
lowest event timestamp of input rows). So you may always need to set enough
delay on watermark.

2. Spark uses global watermark whereas other frameworks normally use
operator-wise watermark. This is limitation of Spark (given outputs of
previous stateful operator will become inputs of next stateful operator,
they should have different watermark) and one of contributor proposes the
approach [1] which would fit for Spark (unfortunately it haven't been
reviewed by committers so long).

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://github.com/apache/spark/pull/23576

On Tue, Jun 11, 2019 at 7:06 AM Joe Ammann  wrote:

> Hi all
>
> it took me some time to get the issues extracted into a piece of
> standalone code. I created the following gist
>
> https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17
>
> I has messages for 4 topics A/B/C/D and a simple Python program which
> shows 6 use cases, with my expectations and observations with Spark 2.4.3
>
> It would be great if you could have a look and check if I'm doing
> something wrong, or this is indeed a limitation of Spark?
>
> On 6/5/19 5:35 PM, Jungtaek Lim wrote:
> > Nice to hear you're investigating the issue deeply.
> >
> > Btw, if attaching code is not easy, maybe you could share
> logical/physical plan on any batch: "detail" in SQL tab would show up the
> plan as string. Plans from sequential batches would be much helpful - and
> streaming query status in these batch (especially watermark) should be
> helpful too.
> >
>
>
> --
> CU, Joe
>


-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Spark structured streaming leftOuter join not working as I expect

2019-06-10 Thread Joe Ammann
Hi all

it took me some time to get the issues extracted into a piece of standalone 
code. I created the following gist

https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17

I has messages for 4 topics A/B/C/D and a simple Python program which shows 6 
use cases, with my expectations and observations with Spark 2.4.3

It would be great if you could have a look and check if I'm doing something 
wrong, or this is indeed a limitation of Spark?

On 6/5/19 5:35 PM, Jungtaek Lim wrote:
> Nice to hear you're investigating the issue deeply.
> 
> Btw, if attaching code is not easy, maybe you could share logical/physical 
> plan on any batch: "detail" in SQL tab would show up the plan as string. 
> Plans from sequential batches would be much helpful - and streaming query 
> status in these batch (especially watermark) should be helpful too.
> 


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
Nice to hear you're investigating the issue deeply.

Btw, if attaching code is not easy, maybe you could share logical/physical
plan on any batch: "detail" in SQL tab would show up the plan as string.
Plans from sequential batches would be much helpful - and streaming query
status in these batch (especially watermark) should be helpful too.


On Wed, Jun 5, 2019 at 11:57 PM Joe Ammann  wrote:

> Hi Jungtaek
>
> Thanks for your response!
>
> I actually have set watermarks on all the streams A/B/C with the
> respective event time
> column A/B/C_LAST_MOD. So I think this should not be the reason.
>
> Of course, the event time on the C stream (the "optional one") progresses
> much slower
> than on the other 2. I try to adjust for this by setting
>
>spark.sql.streaming.multipleWatermarkPolicy=max
>
> and judging from the microbatch results, this also works. The global
> watermark seems
> to progress as expected with the event time from A/B stream.
>
> I will try to put together an isolated test case to reproduce the issue,
> that whole code
> is embedded in a larger app and hence not easily to rip out.
>
> I did some more testing, and for now these are my observations
>  - inner join followed by aggregation works as expected
>  - inner join with 1 left outer (and no aggregation) works as expected
>  - inner join with 2 left outer only produces results where both outer
> have a match
>  - inner join with 1 left outer followed by aggregation only produces the
> messages with a match
>
> Of course, all are stream-stream joins
>
> CU, Joe
>
> On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim 
> wrote:
> > I would suspect that rows are never evicted in state in second join. To
> > determine whether the row is NOT matched to other side, Spark should
> check
> > whether the row is ever matched before evicted. You need to set watermark
> > either B_LAST_MOD or C_LAST_MOD.
> >
> > If you already did but not exposed to here, please paste all codes
> > (assuming you've already redacted) to gist or attach zipped file for
> > project.
> >
> > Btw, there's known "correctness" issue on streaming-streaming left/right
> > outer join. Please refer SPARK-26154 [1] for details. That's not a same
> > case, but should be good to know once you're dealing with
> > streaming-streaming join.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
> > 1. https://issues.apache.org/jira/browse/SPARK-26154
> >
> > On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:
> >
> > > Hi all
> > >
> > > sorry, tl;dr
> > >
> > > I'm on my first Python Spark structured streaming app, in the end
> joining
> > > messages from ~10 different Kafka topics. I've recently upgraded to
> Spark
> > > 2.4.3, which has resolved all my issues with the time handling
> (watermarks,
> > > join windows) I had before with Spark 2.3.2.
> > >
> > > My current problem happens during a leftOuter join, where messages
> from 3
> > > topics are joined, the results are then aggregated with a groupBy and
> > > finally put onto a result Kafka topic. On the 3 input topics involved,
> all
> > > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > > LAST_MOD as event timestamp on all incoming streams. Since the fields
> on
> > > the incoming messages are all named the same (ID and LAST_MOD), I
> rename
> > > them on all incoming streams with
> > >
> > >  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> > >
> > > For those data frames, I then take the watermark with the
> A/B/C_LAST_MOD
> > > as event time, before joining. I know that the LAST_MOD timestamps are
> > > equal on the messages that I want to join together.
> > >
> > > The first join is an inner join, where a field on stream A links with
> the
> > > ID of stream B. So I have
> > >
> > >  aDf
> > > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in
> stream A
> > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > > .agg(
> > > collect_list(struct("*")).alias("RESULTS"),
> > > count("A_ID").alias("NUM_RESULTS"),
> > > # just add a timestamp to watermark on, they are all the
> > > min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > > )
> > > .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > > )
> > >
> > > This works perfectly and generates (on my current data set) some 10'000
> > > records. This is the expected result.
> > >
> > > When I add the leftOuter join of the third topic as follows
> > >
> > >  aDf
> > > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in
> stream A
> > > # here the additional left join
> > > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > > "leftOuter)) # C_FK is the field in stream B
> > > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > > .agg(
> > > collect_list(struct("*")).alias("RESULTS"),
> > > count(

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Joe Ammann
Hi Jungtaek

Thanks for your response!

I actually have set watermarks on all the streams A/B/C with the respective 
event time
column A/B/C_LAST_MOD. So I think this should not be the reason.

Of course, the event time on the C stream (the "optional one") progresses much 
slower
than on the other 2. I try to adjust for this by setting 

   spark.sql.streaming.multipleWatermarkPolicy=max

and judging from the microbatch results, this also works. The global watermark 
seems
to progress as expected with the event time from A/B stream.

I will try to put together an isolated test case to reproduce the issue, that 
whole code
is embedded in a larger app and hence not easily to rip out.

I did some more testing, and for now these are my observations
 - inner join followed by aggregation works as expected
 - inner join with 1 left outer (and no aggregation) works as expected
 - inner join with 2 left outer only produces results where both outer have a 
match
 - inner join with 1 left outer followed by aggregation only produces the 
messages with a match 

Of course, all are stream-stream joins

CU, Joe
 
On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim  wrote: 
> I would suspect that rows are never evicted in state in second join. To
> determine whether the row is NOT matched to other side, Spark should check
> whether the row is ever matched before evicted. You need to set watermark
> either B_LAST_MOD or C_LAST_MOD.
> 
> If you already did but not exposed to here, please paste all codes
> (assuming you've already redacted) to gist or attach zipped file for
> project.
> 
> Btw, there's known "correctness" issue on streaming-streaming left/right
> outer join. Please refer SPARK-26154 [1] for details. That's not a same
> case, but should be good to know once you're dealing with
> streaming-streaming join.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 1. https://issues.apache.org/jira/browse/SPARK-26154
> 
> On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:
> 
> > Hi all
> >
> > sorry, tl;dr
> >
> > I'm on my first Python Spark structured streaming app, in the end joining
> > messages from ~10 different Kafka topics. I've recently upgraded to Spark
> > 2.4.3, which has resolved all my issues with the time handling (watermarks,
> > join windows) I had before with Spark 2.3.2.
> >
> > My current problem happens during a leftOuter join, where messages from 3
> > topics are joined, the results are then aggregated with a groupBy and
> > finally put onto a result Kafka topic. On the 3 input topics involved, all
> > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > LAST_MOD as event timestamp on all incoming streams. Since the fields on
> > the incoming messages are all named the same (ID and LAST_MOD), I rename
> > them on all incoming streams with
> >
> >  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> >
> > For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> > as event time, before joining. I know that the LAST_MOD timestamps are
> > equal on the messages that I want to join together.
> >
> > The first join is an inner join, where a field on stream A links with the
> > ID of stream B. So I have
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULTS"),
> > count("A_ID").alias("NUM_RESULTS"),
> > # just add a timestamp to watermark on, they are all the
> > min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > )
> > .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > )
> >
> > This works perfectly and generates (on my current data set) some 10'000
> > records. This is the expected result.
> >
> > When I add the leftOuter join of the third topic as follows
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > # here the additional left join
> > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > "leftOuter)) # C_FK is the field in stream B
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULTS"),
> > count("A_ID").alias("NUM_RESULTS"),
> > # just add a timestamp to watermark on, they are all the
> > min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > )
> > .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > )
> >
> > then what I would expect is that I get the same number of output records
> > (~10'000), and some of them have the additional fields from the C stream.
> >
> > But what happens is that my output is reduced to ~1'500 records, exactly
> > those which have a successful join on records on topic C. The other are not
> > shown on the output.
> >
> > I alread

Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Jungtaek Lim
I would suspect that rows are never evicted in state in second join. To
determine whether the row is NOT matched to other side, Spark should check
whether the row is ever matched before evicted. You need to set watermark
either B_LAST_MOD or C_LAST_MOD.

If you already did but not exposed to here, please paste all codes
(assuming you've already redacted) to gist or attach zipped file for
project.

Btw, there's known "correctness" issue on streaming-streaming left/right
outer join. Please refer SPARK-26154 [1] for details. That's not a same
case, but should be good to know once you're dealing with
streaming-streaming join.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-26154

On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:

> Hi all
>
> sorry, tl;dr
>
> I'm on my first Python Spark structured streaming app, in the end joining
> messages from ~10 different Kafka topics. I've recently upgraded to Spark
> 2.4.3, which has resolved all my issues with the time handling (watermarks,
> join windows) I had before with Spark 2.3.2.
>
> My current problem happens during a leftOuter join, where messages from 3
> topics are joined, the results are then aggregated with a groupBy and
> finally put onto a result Kafka topic. On the 3 input topics involved, all
> messages have ID and LAST_MOD fields. I use the ID for joining, and the
> LAST_MOD as event timestamp on all incoming streams. Since the fields on
> the incoming messages are all named the same (ID and LAST_MOD), I rename
> them on all incoming streams with
>
>  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> A_LAST_MOD").drop(*["ID", "LAST_MOD"])
>
> For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> as event time, before joining. I know that the LAST_MOD timestamps are
> equal on the messages that I want to join together.
>
> The first join is an inner join, where a field on stream A links with the
> ID of stream B. So I have
>
>  aDf
> .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> .agg(
> collect_list(struct("*")).alias("RESULTS"),
> count("A_ID").alias("NUM_RESULTS"),
> # just add a timestamp to watermark on, they are all the
> min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> )
> .withWatermark("RESULT_LAST_MOD", "30 seconds")
> )
>
> This works perfectly and generates (on my current data set) some 10'000
> records. This is the expected result.
>
> When I add the leftOuter join of the third topic as follows
>
>  aDf
> .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> # here the additional left join
> -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> "leftOuter)) # C_FK is the field in stream B
> .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> .agg(
> collect_list(struct("*")).alias("RESULTS"),
> count("A_ID").alias("NUM_RESULTS"),
> # just add a timestamp to watermark on, they are all the
> min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> )
> .withWatermark("RESULT_LAST_MOD", "30 seconds")
> )
>
> then what I would expect is that I get the same number of output records
> (~10'000), and some of them have the additional fields from the C stream.
>
> But what happens is that my output is reduced to ~1'500 records, exactly
> those which have a successful join on records on topic C. The other are not
> shown on the output.
>
> I already tried
>
>* make sure that the optional FK on topic B is never null, by using an
> NVL2(C_FK, C_FK, '')
>* widen the time window join on the leftOuter to "B_LAST_MOD <
> C_LAST_LAST_MOD - interval 5 seconds ..."
>* use various combinations of joinWindows and watermarkLateThreshold
>
> The result is always the same: I'm "losing" the ~8'500 records for which
> the optional join FK is NULL on topic B.
>
> Did I totally misunderstand the concept of stream-stream left outer join?
> Or what could be wrong
>
> --
> CU, Joe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Spark structured streaming leftOuter join not working as I expect

2019-06-04 Thread Joe Ammann
Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining 
messages from ~10 different Kafka topics. I've recently upgraded to Spark 
2.4.3, which has resolved all my issues with the time handling (watermarks, 
join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 
topics are joined, the results are then aggregated with a groupBy and finally 
put onto a result Kafka topic. On the 3 input topics involved, all messages 
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as 
event timestamp on all incoming streams. Since the fields on the incoming 
messages are all named the same (ID and LAST_MOD), I rename them on all 
incoming streams with

 aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as 
A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as 
event time, before joining. I know that the LAST_MOD timestamps are equal on 
the messages that I want to join together.

The first join is an inner join, where a field on stream A links with the ID of 
stream B. So I have

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

This works perfectly and generates (on my current data set) some 10'000 
records. This is the expected result.

When I add the leftOuter join of the third topic as follows

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
-join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) 
# C_FK is the field in stream B
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

then what I would expect is that I get the same number of output records 
(~10'000), and some of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those 
which have a successful join on records on topic C. The other are not shown on 
the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an 
NVL2(C_FK, C_FK, '')
   * widen the time window join on the leftOuter to "B_LAST_MOD < 
C_LAST_LAST_MOD - interval 5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the 
optional join FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or 
what could be wrong

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org