I can put down a design document, but before that I need to clarify some things for me. I'm struggling to put all of this into a bigger picture. Sorry if the arguments are circulating, but I didn't notice any proposal of how to solve these. If anyone can disprove any of this logic it would be very much appreciated as I might be able to get from a dead end:

 a) in the bi-temporal join you can either buffer until watermark, or emit false data that has to be retracted

 b) until retractions are 100% functional (and that is sort of holy grail for now), then the only solution is using a buffer holding data up to watermark *and then sort by event time*

 c) even if retractions were 100% functional, there would have to be special implementation for batch case, because otherwise this would simply blow up downstream processing with insanely many false additions and subsequent retractions

Property b) means that if we want this feature now, we must sort by event time and there is no way around. Property c) shows that even in the future, we must make (in certain cases) distinction between batch and streaming code paths, which seems weird to me, but it might be an option. But still, there is no way to express this join in batch case, because it would require either buffering (up to) whole input on local worker (doesn't look like viable option) or provide a way in user code to signal the need for ordering of data inside GBK (and we are there again :)). Yes, we might shift this need from stateful dofn to GBK like

 input.apply(GroupByKey.sorted())

I cannot find a good reasoning why this would be better than giving this semantics to (stateful) ParDo.

Maybe someone can help me out here?

Jan

On 11/24/19 5:05 AM, Kenneth Knowles wrote:
I don't actually see how event time sorting simplifies this case much. You still need to buffer elements until they can no longer be matched in the join, and you still need to query that buffer for elements that might match. The general "bi-temporal join" (without sorting) requires one new state type and then it has identical API, does not require any novel data structures or reasoning, yields better latency (no sort buffer delay), and discards less data (no sort buffer cutoff; watermark is better). Perhaps a design document about this specific case would clarify.

Kenn

On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I didn't want to go too much into detail, but to describe the idea
    roughly (ignoring the problem of different window fns on both
    sides to keep it as simple as possible):

    rhs -----  \

                    flatten (on global window) ---- stateful par do
    (sorted by event time)  ---- output

    lhs -----  /

    If we can guarantee event time order arrival of events into the
    stateful pardo, then the whole complexity reduces to keep current
    value of left and right element and just flush them out each time
    there is an update. That is the "knob" is actually when watermark
    moves, because it is what tells the join operation that there will
    be no more (not late) input. This is very, very simplified, but
    depicts the solution. The "classical" windowed join reduces to
    this if all data in each window is projected onto window end
    boundary. Then there will be a cartesian product, because all the
    elements have the same timestamp. I can put this into a design doc
    with all the details, I was trying to find out if there is or was
    any effort around this.

    I was in touch with Reza in the PR #9032, I think that it
    currently suffers from problems with running this on batch.

    I think I can even (partly) resolve the retraction issue (for
    joins), as described on the thread [1]. Shortly, there can be two
    copies of the stateful dofn, one running at watermark and the
    other at (watermark - allowed lateness). One would produce ON_TIME
    (maybe wrong) results, the other would produce LATE but correct
    ones. Being able to compare them, the outcome would be that it
    would be possible to retract the wrong results.

    Yes, this is also about providing more evidence of why I think
    event-time sorting should be (somehow) part of the model. :-)

    Jan

    [1]
    
https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

    On 11/23/19 5:54 AM, Kenneth Knowles wrote:
    +Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
    <mailto:[email protected]> +Reza Rokni
    <mailto:[email protected]> who have all done some investigations here.


    On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:


        On 11/22/19 7:54 PM, Reuven Lax wrote:


        On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Hi Reuven,

            I didn't investigate that particular one, but looking
            into that now, it looks that is (same as the "classic"
            join library) builds around CoGBK. Is that correct? If
            yes, then it essentially means that it:

             - works only for cases where both sides have the same
            windowfn (that is limitation of Flatten that precedes CoGBK)

        Correct. Did you want to join different windows? If so what
        are the semantics? If the lhs has FixedWindows and the rhs
        has SessionWindows, what do you want the join semantics to
        be? The only thing I could imagine would be for the user to
        provide some function telling the join how to map the
        windows together, but that could be pretty complicated.
        I don't want to go too far into details, but generally both
        lhs and rhs can be put onto time line and then full join can
        be defined as each pair of (lhs, first preceding rhs) and
        (rhs, first preceding lhs). Then the end of window is
        semantically just clearing the joined value (setting it to
        null, thus at the end of window there will be pair (lhs,
        null) or (null, rhs) in case of full outer join). This way
        any combination of windows is possible, because all window
        does is that it "scopes" validity of respective values (lhs,
        rhs).


    I think it is very valid to hope to do a join in the sense of a
    relational join where it is row-to-row. In this case, Beam's
    concept of windowing may or may not make sense. It is just a tool
    for the job. It is just a grouping key that provides a time when
    state can be deleted. So I would say your use case is more global
    window to global window join. That is what I think of as a true
    stream-to-stream join anyhow. You probably don't want to wait
    forever for output. So you'll need to use some knob other than
    Beam windows or triggers.

    Reza has prototyped a join like you describe here:
    https://github.com/apache/beam/pull/9032

    If your join condition explicitly includes the event time
    distance between elements, then it could "just work". If that
    isn't really part of your join condition, then you will have to
    see this restriction as a "knob" that you tweak on your results.

             - when using global window, there has to be trigger and
            (afaik) there is no trigger that would guarantee firing
            after each data element (for early panes) (because
            triggers are there to express cost-latency tradeoff, not
            semantics)


        Can you explain the use case where this matters? If you do
        trigger elementCountAtLeast(1) on the join, then the
        consumer will simply see a continuous stream of outputs. I'm
        not sure I understand why the consumer cares that some of
        those outputs were in a pane that really held 3 outputs
        instead of 1.

        What I'm trying to solve is basically this:

         - lhs is event stream

         - rhs is stream of a "state updates"

        purpose of the join is "take each event, pair it with
        currently valid state and produce output and possibly
        modified state". I cannot process two events at a time,
        because first event can modify the state and the subsequent
        event should see this. It is not a "simple" stateful pardo
        either, because the state can be modified externally (not
        going into too much detail here, but e.g. by writing into
        kafka topic).

    Reuven's explanation is missing some detail. If the CoGBK is in
    discarding mode, then it will miss join results. If the CoGBK is
    in accumulating mode, it will duplicate join results. This is a
    known problem and the general solution is retractions.

    Basically, CoGBK-based joins just don't work with triggers until
    we have retractions.

            Moreover, I'd like to define the join semantics so that
            when there are available elements from both sides, the
            fired pane should be ON_TIME, not EARLY. That
            essentially means that the fully general case would not
            be built around (Co)GBK, but stateful ParDo. There are
            specific options where this fully general case
            "degrades" into forms that can be efficiently expressed
            using (Co)GBK, that is true.


        BTW building this around stateful DoFn might be a better
        fit. The main reason I didn't is because we would need a
        good distributed MapState (something discussed fairly
        recently on the list), and that is not yet built. Once we
        had that, I might be inclined to rewrite this join on
        stateful DoFn.
        Yes, the sorted state helps for streaming case. But I'd be
        careful about that for batch case, where this might lead to
        high pressure on the state (and InMemoryStateInternals might
        OOME for instance).

        However can you explain what you are expecting from the
        pane? An EARLY pane simply means that we are producing
        output before the end of the window. If you are in the
        global window triggering every element, then every output is
        EARLY. It might seem weird if you are interpreting EARLY as
        "outputting data that isn't ready," however that's not what
        EARLY is defined to be. Any change to the pane semantics
        would be a major breaking change to very fundamental semantics.

        I wonder if you are really objecting to the name EARLY and
        ON_TIME? Maybe we would've been better off tagging it
        BEFORE_WINDOW_END instead of EARLY, to make it clear what is
        meant?

        Essentially I don't object anything here. I'm missing
        solution to the "event vs. state" join described above. I was
        thinking about how to make these types of problems more user
        friendly and it essentially leads to creating a somewhat more
        generic semantics of join, where end-of-window is converted
        into "'value-delete events" and then just joining by the
        "previous" or "valid" value (yes, this relates to validity
        windows mentioned on Beam Summit Europe). It actually turns
        out that with some work we could define quite "naturally" a
        join on two streams with global window and no trigger. It
        would even function with lowest latency possible (but yes,
        with the highest expenses, it is actually the introduction of
        (same!) windows that enable certain optimizations). It the
        correctly defines semantics for different windows, although
        the result would be (probably unexpectedly) windowed using
        global window. But that doesn't seem to be any breaking
        change, because it is currently not possible (any such
        pipeline will not be validated).

        Maybe for reference, the unwindowed join would be what is
        described here [1]

        [1]
        
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

            Jan

            On 11/22/19 6:47 PM, Reuven Lax wrote:
            Have you seen the Join library that is part of schemas?
            I'm curious whether this fits your needs, or there's
            something lacking there.

            On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi,

                based on roadmap [1], we would like to define and
                implement a full set
                of (unified) stream-stream joins. That would include:

                  - joins (left, right, full outer) on global
                window with "immediate
                trigger"

                  - joins with different windowing functions on
                left and right side

                The approach would be to define these operations in
                a natural way, so
                that the definition is aligned with how current
                joins work (same
                windows, cartesian product of values with same
                keys, output timestamp
                projected to the end of window, etc.). Because this
                should be a generic
                approach, this effort should probably be part of
                join library, that can
                the be reused by other components, too (e.g. SQL).

                The question is - is (or was) there any effort that
                we can build upon?
                Or should this be designed from scratch?

                Jan

                [1] https://beam.apache.org/roadmap/euphoria/

Reply via email to