Hi,

the general pattern here would be to map both the PCollections to a common type, e.g. PCollection<KV<TupleTag, T>> and then flatten them into one PCollection, onto which you apply a stateful DoFn, see [1]. You would hold the DataY value of your ID in the state and match it against events coming from DataX stream. Under the assumption you do not need to make ensure that each DataX stream is matched on the *exactly preceding* DataY event (in event time), this works fine.

If you need to be sure that each DataX event is matched against the latest DataY (and most of the time it is likely you don't have this requirement), then you can:

 a) buffer DataX in a BagState and use timers to flush the state after some timeout, or

 b) use @DoFn.RequiresTimeSortedInput [2] (if your runners supports it), which will do the buffering for you and pass the elements into @ProcessElement method sorted by event timestamp

In both cases it is worth to realize how you want to handle late data (i.e. data that arrived after watermark, or after an element was already matched, but on a wrong element). The solution (b) simply drops the late element (which might not be what you want), or introduces latency defined by allowedLateness. Another option would be to implement retractions and process them downstream. I implemented something like that in [3].

Hope that helps,

 Jan

[1] https://beam.apache.org/blog/stateful-processing/

[2] https://beam.apache.org/releases/javadoc/2.43.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[3] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter4/src/main/java/com/packtpub/beam/chapter4/StreamingInnerJoin.java

On 1/4/23 16:28, Ifat Afek (Nokia) wrote:

Thanks Sören,

I already saw your stack overflow question while trying to find a solution 😊

I prefer a solution that does not involve an external cache like Redis, if possible.

Best Regards,

Ifat

*From: *Sören Henning <soeren.henn...@email.uni-kiel.de>
*Reply-To: *"user@beam.apache.org" <user@beam.apache.org>
*Date: *Tuesday, 3 January 2023 at 14:56
*To: *"user@beam.apache.org" <user@beam.apache.org>
*Subject: *Re: Join streams with different frequencies

Hi,

while I cannot provide you with a definite answer to your question, maybe my Stack Overflow question is interesting for you: https://stackoverflow.com/questions/66067263/how-to-join-a-frequently-updating-stream-with-an-irregularly-updating-stream-i

Best regards,
Sören

Am 03.01.23 um 09:15 schrieb Ifat Afek (Nokia):

    Hi,

    We are trying to implement the following use case:

    We have a stream of DataX events that arrive every 5 minutes and
    require some processing. Each event holds data for a specific
    non-unique ID (we keep getting updated data for each ID). There
    might be up to 1,000,000 IDs.

    In addition, there is a stream of DataY events for some of these
    IDs, that arrive in a variable frequency. Could be after a minute
    and then again after 5 hours.

    We would like to join the current DataX and latest DataY events by
    ID (and process only IDs that have both DataX and DataY events).

    We thought of holding a state of DataY events per ID in a global
    window, and then use it as a side input for filtering the DataX
    events stream. The state should hold the latest (by timestamp)
    DataY event that arrived.

    The problem is: if we are using discardingFiredPanes(), then each
    DataY event is fired only once and cannot be reused later on for
    filtering. On the other hand, if we are using
    accumulatingFiredPanes(), then a list of all DataY events that
    arrived is fired.

    Are we missing something? what is the best practice for combining
    two streams, one with a variable frequency?

    Thanks,

    Ifat

Reply via email to