Hi,
the general pattern here would be to map both the PCollections to a
common type, e.g. PCollection<KV<TupleTag, T>> and then flatten them
into one PCollection, onto which you apply a stateful DoFn, see [1]. You
would hold the DataY value of your ID in the state and match it against
events coming from DataX stream. Under the assumption you do not need to
make ensure that each DataX stream is matched on the *exactly preceding*
DataY event (in event time), this works fine.
If you need to be sure that each DataX event is matched against the
latest DataY (and most of the time it is likely you don't have this
requirement), then you can:
a) buffer DataX in a BagState and use timers to flush the state after
some timeout, or
b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports
it), which will do the buffering for you and pass the elements into
@ProcessElement method sorted by event timestamp
In both cases it is worth to realize how you want to handle late data
(i.e. data that arrived after watermark, or after an element was already
matched, but on a wrong element). The solution (b) simply drops the late
element (which might not be what you want), or introduces latency
defined by allowedLateness. Another option would be to implement
retractions and process them downstream. I implemented something like
that in [3].
Hope that helps,
Jan
[1] https://beam.apache.org/blog/stateful-processing/
[2]
https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
[3]
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java
On 1/4/23 16:28, Ifat Afek (Nokia) wrote:
Thanks Sören,
I already saw your stack overflow question while trying to find a
solution 😊
I prefer a solution that does not involve an external cache like
Redis, if possible.
Best Regards,
Ifat
*From: *Sören Henning <soeren.henn...@email.uni-kiel.de>
*Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
*Date: *Tuesday, 3 January 2023 at 14:56
*To: *"user@beam.apache.org" <user@beam.apache.org>
*Subject: *Re: Join streams with different frequencies
Hi,
while I cannot provide you with a definite answer to your question,
maybe my Stack Overflow question is interesting for you:
https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i
Best regards,
Sören
Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):
Hi,
We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and
require some processing. Each event holds data for a specific
non-unique ID (we keep getting updated data for each ID). There
might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these
IDs, that arrive in a variable frequency. Could be after a minute
and then again after 5 hours.
We would like to join the current DataX and latest DataY events by
ID (and process only IDs that have both DataX and DataY events).
We thought of holding a state of DataY events per ID in a global
window, and then use it as a side input for filtering the DataX
events stream. The state should hold the latest (by timestamp)
DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each
DataY event is fired only once and cannot be reused later on for
filtering. On the other hand, if we are using
accumulatingFiredPanes(), then a list of all DataY events that
arrived is fired.
Are we missing something? what is the best practice for combining
two streams, one with a variable frequency?
Thanks,
Ifat