Re: Join streams with different frequencies

2023-01-05 Thread Ifat Afek (Nokia)
Hi jan,

Thanks, I will check the options you suggested.

Best Regards,
Ifat

From: Jan Lukavský 
Reply-To: "user@beam.apache.org" 
Date: Wednesday, 4 January 2023 at 18:52
To: "user@beam.apache.org" 
Subject: Re: Join streams with different frequencies


Hi,

the general pattern here would be to map both the PCollections to a common 
type, e.g. PCollection> and then flatten them into one 
PCollection, onto which you apply a stateful DoFn, see [1]. You would hold the 
DataY value of your ID in the state and match it against events coming from 
DataX stream. Under the assumption you do not need to make ensure that each 
DataX stream is matched on the *exactly preceding* DataY event (in event time), 
this works fine.

If you need to be sure that each DataX event is matched against the latest 
DataY (and most of the time it is likely you don't have this requirement), then 
you can:

 a) buffer DataX in a BagState and use timers to flush the state after some 
timeout, or

 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports it), which 
will do the buffering for you and pass the elements into @ProcessElement method 
sorted by event timestamp

In both cases it is worth to realize how you want to handle late data (i.e. 
data that arrived after watermark, or after an element was already matched, but 
on a wrong element). The solution (b) simply drops the late element (which 
might not be what you want), or introduces latency defined by allowedLateness. 
Another option would be to implement retractions and process them downstream. I 
implemented something like that in [3].

Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] 
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java
On 1/4/23 16:28, Ifat Afek (Nokia) wrote:
Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊
I prefer a solution that does not involve an external cache like Redis, if 
possible.

Best Regards,
Ifat

From: Sören Henning 
<mailto:soeren.henn...@email.uni-kiel.de>
Reply-To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Date: Tuesday, 3 January 2023 at 14:56
To: "user@beam.apache.org"<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Subject: Re: Join streams with different frequencies


Hi,

while I cannot provide you with a definite answer to your question, maybe my 
Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some 
processing. Each event holds data for a specific non-unique ID (we keep getting 
updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that 
arrive in a variable frequency. Could be after a minute and then again after 5 
hours.
We would like to join the current DataX and latest DataY events by ID (and 
process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and 
then use it as a side input for filtering the DataX events stream. The state 
should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event 
is fired only once and cannot be reused later on for filtering. On the other 
hand, if we are using accumulatingFiredPanes(), then a list of all DataY events 
that arrived is fired.

Are we missing something? what is the best practice for combining two streams, 
one with a variable frequency?

Thanks,
Ifat



Re: Join streams with different frequencies

2023-01-04 Thread Jan Lukavský

Hi,

the general pattern here would be to map both the PCollections to a 
common type, e.g. PCollection> and then flatten them 
into one PCollection, onto which you apply a stateful DoFn, see [1]. You 
would hold the DataY value of your ID in the state and match it against 
events coming from DataX stream. Under the assumption you do not need to 
make ensure that each DataX stream is matched on the *exactly preceding* 
DataY event (in event time), this works fine.


If you need to be sure that each DataX event is matched against the 
latest DataY (and most of the time it is likely you don't have this 
requirement), then you can:


 a) buffer DataX in a BagState and use timers to flush the state after 
some timeout, or


 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports 
it), which will do the buffering for you and pass the elements into 
@ProcessElement method sorted by event timestamp


In both cases it is worth to realize how you want to handle late data 
(i.e. data that arrived after watermark, or after an element was already 
matched, but on a wrong element). The solution (b) simply drops the late 
element (which might not be what you want), or introduces latency 
defined by allowedLateness. Another option would be to implement 
retractions and process them downstream. I implemented something like 
that in [3].


Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] 
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html


[3] 
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java


On 1/4/23 16:28, Ifat Afek (Nokia) wrote:


Thanks Sören,

I already saw your stack overflow question while trying to find a 
solution 😊


I prefer a solution that does not involve an external cache like 
Redis, if possible.


Best Regards,

Ifat

*From: *Sören Henning 
*Reply-To: *"user@beam.apache.org" 
*Date: *Tuesday, 3 January 2023 at 14:56
*To: *"user@beam.apache.org" 
*Subject: *Re: Join streams with different frequencies

Hi,

while I cannot provide you with a definite answer to your question, 
maybe my Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i


Best regards,
Sören

Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):

Hi,

We are trying to implement the following use case:

We have a stream of DataX events that arrive every 5 minutes and
require some processing. Each event holds data for a specific
non-unique ID (we keep getting updated data for each ID). There
might be up to 1,000,000 IDs.

In addition, there is a stream of DataY events for some of these
IDs, that arrive in a variable frequency. Could be after a minute
and then again after 5 hours.

We would like to join the current DataX and latest DataY events by
ID (and process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global
window, and then use it as a side input for filtering the DataX
events stream. The state should hold the latest (by timestamp)
DataY event that arrived.

The problem is: if we are using discardingFiredPanes(), then each
DataY event is fired only once and cannot be reused later on for
filtering. On the other hand, if we are using
accumulatingFiredPanes(), then a list of all DataY events that
arrived is fired.

Are we missing something? what is the best practice for combining
two streams, one with a variable frequency?

Thanks,

Ifat


Re: Join streams with different frequencies

2023-01-04 Thread Ifat Afek (Nokia)
Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊
I prefer a solution that does not involve an external cache like Redis, if 
possible.

Best Regards,
Ifat

From: Sören Henning 
Reply-To: "user@beam.apache.org" 
Date: Tuesday, 3 January 2023 at 14:56
To: "user@beam.apache.org" 
Subject: Re: Join streams with different frequencies


Hi,

while I cannot provide you with a definite answer to your question, maybe my 
Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some 
processing. Each event holds data for a specific non-unique ID (we keep getting 
updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that 
arrive in a variable frequency. Could be after a minute and then again after 5 
hours.
We would like to join the current DataX and latest DataY events by ID (and 
process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and 
then use it as a side input for filtering the DataX events stream. The state 
should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event 
is fired only once and cannot be reused later on for filtering. On the other 
hand, if we are using accumulatingFiredPanes(), then a list of all DataY events 
that arrived is fired.

Are we missing something? what is the best practice for combining two streams, 
one with a variable frequency?

Thanks,
Ifat



Re: Join streams with different frequencies

2023-01-03 Thread Sören Henning

Hi,

while I cannot provide you with a definite answer to your question, 
maybe my Stack Overflow question is interesting for you: 
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i


Best regards,
Sören

Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):


Hi,

We are trying to implement the following use case:

We have a stream of DataX events that arrive every 5 minutes and 
require some processing. Each event holds data for a specific 
non-unique ID (we keep getting updated data for each ID). There might 
be up to 1,000,000 IDs.


In addition, there is a stream of DataY events for some of these IDs, 
that arrive in a variable frequency. Could be after a minute and then 
again after 5 hours.


We would like to join the current DataX and latest DataY events by ID 
(and process only IDs that have both DataX and DataY events).


We thought of holding a state of DataY events per ID in a global 
window, and then use it as a side input for filtering the DataX events 
stream. The state should hold the latest (by timestamp) DataY event 
that arrived.


The problem is: if we are using discardingFiredPanes(), then each 
DataY event is fired only once and cannot be reused later on for 
filtering. On the other hand, if we are using 
accumulatingFiredPanes(), then a list of all DataY events that arrived 
is fired.


Are we missing something? what is the best practice for combining two 
streams, one with a variable frequency?


Thanks,

Ifat