Re: Spark structured streaming leftOuter join not working as I expect

2019-06-10 Thread Joe Ammann
Hi all

it took me some time to get the issues extracted into a piece of standalone 
code. I created the following gist

https://gist.github.com/jammann/b58bfbe0f4374b89ecea63c1e32c8f17

I has messages for 4 topics A/B/C/D and a simple Python program which shows 6 
use cases, with my expectations and observations with Spark 2.4.3

It would be great if you could have a look and check if I'm doing something 
wrong, or this is indeed a limitation of Spark?

On 6/5/19 5:35 PM, Jungtaek Lim wrote:
> Nice to hear you're investigating the issue deeply.
> 
> Btw, if attaching code is not easy, maybe you could share logical/physical 
> plan on any batch: "detail" in SQL tab would show up the plan as string. 
> Plans from sequential batches would be much helpful - and streaming query 
> status in these batch (especially watermark) should be helpful too.
> 


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming leftOuter join not working as I expect

2019-06-05 Thread Joe Ammann
Hi Jungtaek

Thanks for your response!

I actually have set watermarks on all the streams A/B/C with the respective 
event time
column A/B/C_LAST_MOD. So I think this should not be the reason.

Of course, the event time on the C stream (the "optional one") progresses much 
slower
than on the other 2. I try to adjust for this by setting 

   spark.sql.streaming.multipleWatermarkPolicy=max

and judging from the microbatch results, this also works. The global watermark 
seems
to progress as expected with the event time from A/B stream.

I will try to put together an isolated test case to reproduce the issue, that 
whole code
is embedded in a larger app and hence not easily to rip out.

I did some more testing, and for now these are my observations
 - inner join followed by aggregation works as expected
 - inner join with 1 left outer (and no aggregation) works as expected
 - inner join with 2 left outer only produces results where both outer have a 
match
 - inner join with 1 left outer followed by aggregation only produces the 
messages with a match 

Of course, all are stream-stream joins

CU, Joe
 
On Wednesday, June 5, 2019 09:17 CEST, Jungtaek Lim  wrote: 
> I would suspect that rows are never evicted in state in second join. To
> determine whether the row is NOT matched to other side, Spark should check
> whether the row is ever matched before evicted. You need to set watermark
> either B_LAST_MOD or C_LAST_MOD.
> 
> If you already did but not exposed to here, please paste all codes
> (assuming you've already redacted) to gist or attach zipped file for
> project.
> 
> Btw, there's known "correctness" issue on streaming-streaming left/right
> outer join. Please refer SPARK-26154 [1] for details. That's not a same
> case, but should be good to know once you're dealing with
> streaming-streaming join.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> 1. https://issues.apache.org/jira/browse/SPARK-26154
> 
> On Tue, Jun 4, 2019 at 9:31 PM Joe Ammann  wrote:
> 
> > Hi all
> >
> > sorry, tl;dr
> >
> > I'm on my first Python Spark structured streaming app, in the end joining
> > messages from ~10 different Kafka topics. I've recently upgraded to Spark
> > 2.4.3, which has resolved all my issues with the time handling (watermarks,
> > join windows) I had before with Spark 2.3.2.
> >
> > My current problem happens during a leftOuter join, where messages from 3
> > topics are joined, the results are then aggregated with a groupBy and
> > finally put onto a result Kafka topic. On the 3 input topics involved, all
> > messages have ID and LAST_MOD fields. I use the ID for joining, and the
> > LAST_MOD as event timestamp on all incoming streams. Since the fields on
> > the incoming messages are all named the same (ID and LAST_MOD), I rename
> > them on all incoming streams with
> >
> >  aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as
> > A_LAST_MOD").drop(*["ID", "LAST_MOD"])
> >
> > For those data frames, I then take the watermark with the A/B/C_LAST_MOD
> > as event time, before joining. I know that the LAST_MOD timestamps are
> > equal on the messages that I want to join together.
> >
> > The first join is an inner join, where a field on stream A links with the
> > ID of stream B. So I have
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULTS"),
> > count("A_ID").alias("NUM_RESULTS"),
> > # just add a timestamp to watermark on, they are all the
> > min("A_LAST_MOD").alias("RESULT_LAST_MOD")
> > )
> > .withWatermark("RESULT_LAST_MOD", "30 seconds")
> > )
> >
> > This works perfectly and generates (on my current data set) some 10'000
> > records. This is the expected result.
> >
> > When I add the leftOuter join of the third topic as follows
> >
> >  aDf
> > .join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
> > # here the additional left join
> > -join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD",
> > "leftOuter)) # C_FK is the field in stream B
> > .groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
> > .agg(
> > collect_list(struct("*")).alias("RESULT

Spark structured streaming leftOuter join not working as I expect

2019-06-04 Thread Joe Ammann
Hi all

sorry, tl;dr

I'm on my first Python Spark structured streaming app, in the end joining 
messages from ~10 different Kafka topics. I've recently upgraded to Spark 
2.4.3, which has resolved all my issues with the time handling (watermarks, 
join windows) I had before with Spark 2.3.2.

My current problem happens during a leftOuter join, where messages from 3 
topics are joined, the results are then aggregated with a groupBy and finally 
put onto a result Kafka topic. On the 3 input topics involved, all messages 
have ID and LAST_MOD fields. I use the ID for joining, and the LAST_MOD as 
event timestamp on all incoming streams. Since the fields on the incoming 
messages are all named the same (ID and LAST_MOD), I rename them on all 
incoming streams with

 aDf = aStream.selectExpr("*", "ID as A_ID", "LAST_MOD as 
A_LAST_MOD").drop(*["ID", "LAST_MOD"])

For those data frames, I then take the watermark with the A/B/C_LAST_MOD as 
event time, before joining. I know that the LAST_MOD timestamps are equal on 
the messages that I want to join together.

The first join is an inner join, where a field on stream A links with the ID of 
stream B. So I have

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

This works perfectly and generates (on my current data set) some 10'000 
records. This is the expected result.

When I add the leftOuter join of the third topic as follows

 aDf
.join(bDf, expr("B_FK = B_ID"))   # B_FK is the field in stream A
# here the additional left join
-join(cDF, expr("C_FK = C_ID and B_LAST_MOD = C_LAST_MOD", "leftOuter)) 
# C_FK is the field in stream B
.groupBy("SOME_FIELD", window("A_LAST_MOD", "10 seconds"))
.agg(
collect_list(struct("*")).alias("RESULTS"),
count("A_ID").alias("NUM_RESULTS"),
# just add a timestamp to watermark on, they are all the
min("A_LAST_MOD").alias("RESULT_LAST_MOD")
)
.withWatermark("RESULT_LAST_MOD", "30 seconds")
)

then what I would expect is that I get the same number of output records 
(~10'000), and some of them have the additional fields from the C stream.

But what happens is that my output is reduced to ~1'500 records, exactly those 
which have a successful join on records on topic C. The other are not shown on 
the output.

I already tried

   * make sure that the optional FK on topic B is never null, by using an 
NVL2(C_FK, C_FK, '')
   * widen the time window join on the leftOuter to "B_LAST_MOD < 
C_LAST_LAST_MOD - interval 5 seconds ..."
   * use various combinations of joinWindows and watermarkLateThreshold

The result is always the same: I'm "losing" the ~8'500 records for which the 
optional join FK is NULL on topic B.

Did I totally misunderstand the concept of stream-stream left outer join? Or 
what could be wrong

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Watermark handling on initial query start (Structured Streaming)

2019-05-20 Thread Joe Ammann
Hi all

I'm currently developing a Spark structured streaming application which 
joins/aggregates messages from ~7 Kafka topics and produces messages onto 
another Kafka topic.

Quite often in my development cycle, I want to "reprocess from scratch": I stop 
the program, delete the target topic and associated checkpoint information, and 
restart the application with the query.

My assumption would be that the newly started query then processes all messages 
that are on the input topics, sets the watermark according to the freshest 
messages on the topic and produces the output messages which have moved past 
the watermark and can thus be safely produced. As an example, if the freshest 
message on the topic has an event time of "2019-05-20 10:13" I restart the 
query at "2019-05-20 11:30" and I have a watermark duration of 10 minutes, I 
would expect the query to have a eventTime watermark of "2019-05-20 10:03" and 
all earlier results are produced.

But my observations indicate that after initial query startup and reading all 
input topics, the watermark stays at Unix epoch (1970-01-01) and no messages 
are produced. Only once a new message comes in, after the start of the query, 
then the watermark is moved ahead and all the messages are produced.

Is this the expected behaviour, and my assumption is wrong? Am I doing 
something wrong during query setup?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Suket, Anastasios

Many thanks for your time and your suggestions!

I tried again with various settings for the watermarks and the trigger time

- watermark 20sec, trigger 2sec
- watermark 10sec, trigger 1sec
- watermark 20sec, trigger 0sec

I also tried continuous processing mode, but since I want to do aggregations, 
this did not work at all.

With all the combinations above, my observations (using 'append' output mode) 
are the same: the latest group/aggregation of events is not output/published to 
the target Kafka topic, until another event arrives with a later event 
timestamp that moves the watermark ahead far enough, so that this waiting group 
of events can safely be published. Neither the processing time (wall clock 
time) nor the trigger time play any role in that decision. Only a new event can 
move the watermark ahead, and cause the publishing/output. As long as no new 
events arrive, new mini-batches will be triggered very frequently, but will not 
produce new results.

In the meantime, I read a lot about the semantics of such event time handling 
in various streaming systems. And I think Spark's behaviour that I'm observing 
makes actually sense and is fully in line with the documentation. It just does 
not match my naive intuition.

Using 'update' mode instead of 'append' solves this and aggregates are 
immediately published (may be amended later). But 'update' mode is not very 
useful for my application, because I need to join these aggregates with other 
streams. Using 'update' would force me to persist those intermediate 
aggregation results. But I'm getting the impression this is what I will have to 
do.

On 5/14/19 6:49 PM, Suket Arora wrote:
>   df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
> // ProcessingTime trigger with two-seconds micro-batch interval
> 
> |df.writeStream .format("console") .trigger(Trigger.ProcessingTime("2 
> seconds")) .start()|
> 
> 
> On Tue, 14 May 2019 at 20:40, Joe Ammann mailto:j...@pyx.ch>> 
> wrote:
> 
> Hi Anastasios
> 
> On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> > Hi Joe,
> >
> > How often do you trigger your mini-batch? Maybe you can specify the 
> trigger time explicitly to a low value or even better set it off.
> >
> > See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
> 
> I tried different values for the trigger, and settled on 10 seconds. I 
> can see in the logs that this actually works (it outputs a mini-batch summary 
> in the log every 10 seconds).
> 
> There in these log entries I also see that the watermark does not 
> progress, if no new data is coming in. This is how I came to my suspsicion on 
> how it works internally.
> 
> I understand that it is quite uncommon to have such "slowly moving 
> topics", but unfortunately in my use case I have them.
> 
> > On Tue, May 14, 2019 at 3:49 PM Joe Ammann  <mailto:j...@pyx.ch> <mailto:j...@pyx.ch <mailto:j...@pyx.ch>>> wrote:
> >
> >     Hi all
> >
> >     I'm fairly new to Spark structured streaming and I'm only starting 
> to develop an understanding for the watermark handling.
> >
> >     Our application reads data from a Kafka input topic and as one of 
> the first steps, it has to group incoming messages. Those messages come in 
> bulks, e.g. 5 messages which belong to the same "business event" (share a 
> common key), with event timestamps differing in only a few millisecs. And 
> then no messages for say 3 minutes. And after that another bulk of 3 messages 
> with very close event timestamps.
> >
> >     I have set a watermark of 20 seconds on my streaming query, and a 
> groupBy on the shared common key, and a window of 20 seconds (10 seconds 
> sliding). So something like
> >
> >         df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> >
> >     The output mode is set to append, since I intend to join this 
> streams with other streams later in the application.
> >
> >     Naively, I would have expected to see any incoming bulk of messages 
> as an aggregated message ~20 seconds after it's eventtime on the output 
> stream. But my observations indicate that the "latest bulk of events" always 
> stays queued inside the query, until a new bulk of events arrive and bump up 
> the watermark. In my example above, this means that I see the first bulk of 
> events only after 3 mi

Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Anastasios

On 5/14/19 4:15 PM, Anastasios Zouzias wrote:
> Hi Joe,
> 
> How often do you trigger your mini-batch? Maybe you can specify the trigger 
> time explicitly to a low value or even better set it off.
> 
> See: 
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

I tried different values for the trigger, and settled on 10 seconds. I can see 
in the logs that this actually works (it outputs a mini-batch summary in the 
log every 10 seconds).

There in these log entries I also see that the watermark does not progress, if 
no new data is coming in. This is how I came to my suspsicion on how it works 
internally.

I understand that it is quite uncommon to have such "slowly moving topics", but 
unfortunately in my use case I have them.

> On Tue, May 14, 2019 at 3:49 PM Joe Ammann mailto:j...@pyx.ch>> 
> wrote:
> 
> Hi all
> 
> I'm fairly new to Spark structured streaming and I'm only starting to 
> develop an understanding for the watermark handling.
> 
> Our application reads data from a Kafka input topic and as one of the 
> first steps, it has to group incoming messages. Those messages come in bulks, 
> e.g. 5 messages which belong to the same "business event" (share a common 
> key), with event timestamps differing in only a few millisecs. And then no 
> messages for say 3 minutes. And after that another bulk of 3 messages with 
> very close event timestamps.
> 
> I have set a watermark of 20 seconds on my streaming query, and a groupBy 
> on the shared common key, and a window of 20 seconds (10 seconds sliding). So 
> something like
> 
>     df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
> 
> The output mode is set to append, since I intend to join this streams 
> with other streams later in the application.
> 
> Naively, I would have expected to see any incoming bulk of messages as an 
> aggregated message ~20 seconds after it's eventtime on the output stream. But 
> my observations indicate that the "latest bulk of events" always stays queued 
> inside the query, until a new bulk of events arrive and bump up the 
> watermark. In my example above, this means that I see the first bulk of 
> events only after 3 minutes, when the second bulk comes in.
> 
> This does indeed make some sense, and if I understand the documentation 
> correctly the watermark is only ever updated upon arrival of new inputs. The 
> "real time" does not play a role in the setting of watermarks.
> 
> But to me this means that any bulk of events is prohibited from being 
> sent downstreams until a new bulk comes in. This is not what I intended.
> 
> Is my understanding more or less correct? And is there any way of 
> bringing "the real time" into the calculation of the watermark (short of 
> producing regular dummy messages which are then again filtered out).
> -- 
> -- Anastasios Zouzias
> <mailto:a...@zurich.ibm.com>


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi Suket

Sorry, this was a typo in the pseudo-code I sent. Of course that what you 
suggested (using the same eventtime attribute for the watermark and the window) 
is what my code does in reality. Sorry, to confuse people.

On 5/14/19 4:14 PM, suket arora wrote:
> Hi Joe,
> As per the spark structured streaming documentation and I quote
> |"withWatermark| must be called on the same column as the timestamp column 
> used in the aggregate. For example, |df.withWatermark("time", "1 
> min").groupBy("time2").count()| is invalid in Append output mode, as 
> watermark is defined on a different column from the aggregation column."
> 
> *And after referring  the following code *
>          // Group the data by window and word and compute the count of each 
> group
> 
> |val windowedCounts = words .withWatermark("timestamp", "10 minutes") 
> .groupBy( window($"timestamp", "10 minutes", "5 minutes"), $"word") .count()|
> 
> 
> I would suggest you to try following code
> 
>  df = inputStream.withWatermark("eventtime", "20 
> seconds").groupBy($"sharedId", window($"eventtime","20 seconds", "10 
> seconds"))
> 
> And If this doesn't work, you can try trigger on query.

Can you maybe explain what you mean by "try trigger on query" - I don't 
understand that.

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Handling of watermark in structured streaming

2019-05-14 Thread Joe Ammann
Hi all

I'm fairly new to Spark structured streaming and I'm only starting to develop 
an understanding for the watermark handling.

Our application reads data from a Kafka input topic and as one of the first 
steps, it has to group incoming messages. Those messages come in bulks, e.g. 5 
messages which belong to the same "business event" (share a common key), with 
event timestamps differing in only a few millisecs. And then no messages for 
say 3 minutes. And after that another bulk of 3 messages with very close event 
timestamps.

I have set a watermark of 20 seconds on my streaming query, and a groupBy on 
the shared common key, and a window of 20 seconds (10 seconds sliding). So 
something like

df = inputStream.withWatermark("eventtime", "20 
seconds").groupBy("sharedId", window("20 seconds", "10 seconds")

The output mode is set to append, since I intend to join this streams with 
other streams later in the application.

Naively, I would have expected to see any incoming bulk of messages as an 
aggregated message ~20 seconds after it's eventtime on the output stream. But 
my observations indicate that the "latest bulk of events" always stays queued 
inside the query, until a new bulk of events arrive and bump up the watermark. 
In my example above, this means that I see the first bulk of events only after 
3 minutes, when the second bulk comes in.

This does indeed make some sense, and if I understand the documentation 
correctly the watermark is only ever updated upon arrival of new inputs. The 
"real time" does not play a role in the setting of watermarks.

But to me this means that any bulk of events is prohibited from being sent 
downstreams until a new bulk comes in. This is not what I intended.

Is my understanding more or less correct? And is there any way of bringing "the 
real time" into the calculation of the watermark (short of producing regular 
dummy messages which are then again filtered out).

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-07 Thread Joe Ammann
Hi Yuanjian

On 5/7/19 4:55 AM, Yuanjian Li wrote:
> Hi Joe
> 
> I think you met this issue: https://issues.apache.org/jira/browse/SPARK-27340
> You can check the description in Jira and PR. We also met this in our 
> production env and fixed by the providing PR.
> 
> The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com 
> ), who's the author for the fix.

Yes, this very much looks like the issue I'm having. As an exercise for me 
(never built Spark locally) I will try to build your branch and see if it fixes 
my issue

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
On 5/6/19 6:23 PM, Pat Ferrel wrote:
> Streams have no end until watermarked or closed. Joins need bounded datasets, 
> et voila. Something tells me you should consider the streaming nature of your 
> data and whether your joins need to use increments/snippets of infinite 
> streams or to re-join the entire contents of the streams accumulated at 
> checkpoints.

I certainly don't question the need for watermarks. 

What I was wondering is that when I use fields called 
"entityX_LAST_MODIFICATION" for watermarks/conditions, my joins work as 
expected.

Whereas when I nest the attributes and use "entityX.LAST_MODIFICATION" (notice 
the dot for the nesting) the joins fail.

I have a feeling that the Spark execution plan get's somewhat confused, because 
in the latter case, there are multiple fields called "LAST_MODIFICATION" with 
differing nesting prefixes.

> From: Joe Ammann  <mailto:j...@pyx.ch>
> Reply: Joe Ammann  <mailto:j...@pyx.ch>
> Date: May 6, 2019 at 6:45:13 AM
> To: user@spark.apache.org <mailto:user@spark.apache.org> 
>  <mailto:user@spark.apache.org>
> Subject: Spark structured streaming watermarks on nested attributes
> 
>> Hi all
>>
>> I'm pretty new to Spark and implementing my first non-trivial structured 
>> streaming job with outer joins. My environment is a Hortonworks HDP 3.1 
>> cluster with Spark 2.3.2, working with Python.
>>
>> I understood that I need to provide watermarks and join conditions for left 
>> outer joins to work. All my incoming Kafka streams have an attribute 
>> "LAST_MODIFICATION" which is well suited to indicate the event time, so I 
>> chose that for watermarking. Since I'm joining from multiple topics where 
>> the incoming messages have common attributes, I though I'd prefix/nest all 
>> incoming messages. Something like
>>
>> entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")
>> entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")
>>
>> Now when I try to join such 2 streams, it would fail and tell me that I need 
>> to use watermarks
>>
>> When I leave the watermarking attribute "at the top level", everything works 
>> as expected, e.g.
>>
>> entity1DF.select(struct("*").alias("entity1"), 
>> col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")
>>
>> Before I hunt this down any further, is this kind of a known limitation? Or 
>> am I doing something fundamentally wrong?


-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark structured streaming watermarks on nested attributes

2019-05-06 Thread Joe Ammann
Hi all

I'm pretty new to Spark and implementing my first non-trivial structured 
streaming job with outer joins. My environment is a Hortonworks HDP 3.1 cluster 
with Spark 2.3.2, working with Python.

I understood that I need to provide watermarks and join conditions for left 
outer joins to work. All my incoming Kafka streams have an attribute 
"LAST_MODIFICATION" which is well suited to indicate the event time, so I chose 
that for watermarking. Since I'm joining from multiple topics where the 
incoming messages have common attributes, I though I'd prefix/nest all incoming 
messages. Something like


entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")

entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")

Now when I try to join such 2 streams, it would fail and tell me that I need to 
use watermarks

When I leave the watermarking attribute "at the top level", everything works as 
expected, e.g.

entity1DF.select(struct("*").alias("entity1"), 
col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")

Before I hunt this down any further, is this kind of a known limitation? Or am 
I doing something fundamentally wrong?

-- 
CU, Joe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org