@Kenneth - thank for your response, for sure I was inspired a lot by
earlier discussions on the group and latest documentation updates about
Timers:
https://beam.apache.org/documentation/programming-guide/#timers

In the limitations I forgot to mention about SideInputs, it works quite
well for scenarios where one side of the join is updated slowly, very
slowly. But for scenarios where the main stream gets 50k+ events per
seconds and the joined stream ~100 events per second it simply does not
work. Especially if there is no support for updates in Map side input and
the side input has to be updated/broadcasted as a whole.

@Jan - very interesting, as I understood the joins are already implemented
(plenty of them in Scio, classic ones, sparse versions, etc.) the problem
is with limited windows semantics, triggering policy and the time of
emitted events.

Please look at LookupCacheDoFn, it looks like left outer join - but it
isn't. Only the latest Lookup value (right side of the join) is cached. And
the left side of the join is cached only until the first matching lookup is
observed. Not so generic but quite efficient.

https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala

Marcin

On Fri, 1 May 2020 at 22:22, Jan Lukavský <je...@seznam.cz> wrote:

> Interestingly, I'm currently also working on a proposal for generic join
> semantics. I plan to send a proposal for review, but unfortunately, there
> are still other things keeping me busy. I take this opportunity to review
> high-level thoughts, maybe someone can give some points.
>
> The general idea is to define a join that can incorporate all other types
> as special cases, where the generic implementation can be simplified or
> optimized, but the semantics remain the same. As I plan to put this down to
> a full design document I will just very roughly outline ideas:
>
>  a) the generic semantics, should be equivalent to running relational join
> against set of tables _after each individual modification of the relation_
> and producing results with timestamp of the last modification
>
>  b) windows "scope" state of each "table" - i.e. when time reaches
> window.maxTimestamp() corresponding "table" is cleared
>
>  c) it should be possible to derive other types of joins from this
> definition by certain manipulations (e.g. buffering multiple updates in
> single window and assigninig all elements timestamp of
> window.maxTimestamp() will yield the classical "windowed join" with the
> requirement to have same windows on both (all) sides as otherwise the
> result will be empty) - the goal of these modification is typically
> enabling some optimization (e.g. the fully generic implementation must
> include time sorting - either implicitly or explicitly, optimized variants
> can drop this requirement).
>
> It would be great is someone has any comments on this bottom-up approach.
>
> Jan
> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>
> +dev <dev@beam.apache.org>@beam and some people who I talk about joins
> with
>
> Interesting! It is a lot to take in and fully grok the code, so calling in
> reinforcements...
>
> Generally, I think there's agreement that for a lot of real use cases, you
> have to roll your own join using the lower level Beam primitives. So I
> think it would be great to get some of these other approaches to joins into
> Beam, perhaps as an extension of the Java SDK or even in the core (since
> schema joins are in the core). In particular:
>
>  - "join in fixed window with repeater" sounds similar (but not identical)
> to work by Mikhail
>  - "join in global window with cache" sounds similar (but not identical)
> to work and discussions w/ Reza and Tyson
>
> I want to be clear that I am *not* saying there's any duplication. I'm
> guessing these all fit into a collection of different ways to accomplish
> joins, and if everything comes to fruition we will have the great
> opportunity to document how a user should choose between them.
>
> Kenn
>
> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <marcin.kut...@gmail.com>
> wrote:
>
>> Hi,
>>
>> it's my first post here but I'm a group reader for a while, so thank you
>> for sharing the knowledge!
>>
>> I've been using Beam/Scio on Dataflow for about a year, mostly for stream
>> processing from unbounded source like PubSub. During my daily work I found
>> that built-in windowing is very generic and provides reach watermark/late
>> events semantics but there are a few very annoying limitations, e.g:
>> - both side of the join must be defined within compatible windows
>> - for fixed windows, elements close to window boundaries (but in
>> different windows) won't be joined
>> - for sliding windows there is a huge overhead if the duration is much
>> longer than offset
>>
>> I would like to ask you to review a few "join/windowing patterns" with
>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>> crafted for more specific needs. I published code with tests, feel free to
>> comment as GitHub issues or on the mailing list. The event time processing
>> with watermarks is so demanding that I'm almost sure that I overlooked many
>> important corner cases.
>> https://github.com/mkuthan/beam-examples
>>
>> If you think that the examples are somehow useful I'll be glad to write
>> blog post with more details :)
>>
>> Marcin
>>
>

Reply via email to