On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Reuven,
>
> I didn't investigate that particular one, but looking into that now, it
> looks that is (same as the "classic" join library) builds around CoGBK. Is
> that correct? If yes, then it essentially means that it:
>
 - works only for cases where both sides have the same windowfn (that is
> limitation of Flatten that precedes CoGBK)
>
Correct. Did you want to join different windows? If so what are the
semantics? If the lhs has FixedWindows and the rhs has SessionWindows, what
do you want the join semantics to be? The only thing I could imagine would
be for the user to provide some function telling the join how to map the
windows together, but that could be pretty complicated.


>  - when using global window, there has to be trigger and (afaik) there is
> no trigger that would guarantee firing after each data element (for early
> panes) (because triggers are there to express cost-latency tradeoff, not
> semantics)
>

Can you explain the use case where this matters? If you do trigger
elementCountAtLeast(1) on the join, then the consumer will simply see a
continuous stream of outputs. I'm not sure I understand why the consumer
cares that some of those outputs were in a pane that really held 3 outputs
instead of 1.


> Moreover, I'd like to define the join semantics so that when there are
> available elements from both sides, the fired pane should be ON_TIME, not
> EARLY. That essentially means that the fully general case would not be
> built around (Co)GBK, but stateful ParDo. There are specific options where
> this fully general case "degrades" into forms that can be efficiently
> expressed using (Co)GBK, that is true.
>

BTW building this around stateful DoFn might be a better fit. The main
reason I didn't is because we would need a good distributed MapState
(something discussed fairly recently on the list), and that is not yet
built. Once we had that, I might be inclined to rewrite this join on
stateful DoFn.

However can you explain what you are expecting from the pane? An EARLY pane
simply means that we are producing output before the end of the window. If
you are in the global window triggering every element, then every output is
EARLY. It might seem weird if you are interpreting EARLY as "outputting
data that isn't ready," however that's not what EARLY is defined to be. Any
change to the pane semantics would be a major breaking change to very
fundamental semantics.

I wonder if you are really objecting to the name EARLY and ON_TIME? Maybe
we would've been better off tagging it BEFORE_WINDOW_END instead of EARLY,
to make it clear what is meant?


> Jan
> On 11/22/19 6:47 PM, Reuven Lax wrote:
>
> Have you seen the Join library that is part of schemas? I'm curious
> whether this fits your needs, or there's something lacking there.
>
> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> based on roadmap [1], we would like to define and implement a full set
>> of (unified) stream-stream joins. That would include:
>>
>>   - joins (left, right, full outer) on global window with "immediate
>> trigger"
>>
>>   - joins with different windowing functions on left and right side
>>
>> The approach would be to define these operations in a natural way, so
>> that the definition is aligned with how current joins work (same
>> windows, cartesian product of values with same keys, output timestamp
>> projected to the end of window, etc.). Because this should be a generic
>> approach, this effort should probably be part of join library, that can
>> the be reused by other components, too (e.g. SQL).
>>
>> The question is - is (or was) there any effort that we can build upon?
>> Or should this be designed from scratch?
>>
>> Jan
>>
>> [1] https://beam.apache.org/roadmap/euphoria/
>>
>>

Reply via email to