Functionally yes. But this straightforward solution is not working for me for two main reasons:

 - it either blows state in batch case or the time complexity of the sort would be O(n^2) (and reprocessing several years of dense time-series data makes it a no go)

 - it is not reusable for different time-ordering needs, because the logic implemented purely in user-space cannot be transferred to different problem (there are two states needed, one for buffer, the other for user-state) and extending DoFns does not work (cannot create abstract SortedDoFn, because of the state annotation definitions)

Jan

On 11/26/19 12:56 PM, David Morávek wrote:
Hi,

I think what Jan has in mind would look something like this <https://gist.github.com/dmvk/3ea32eb36c6406fa72d70b9b1df1d878>, if implemented in user code. Am I right?

D.


On Tue, Nov 26, 2019 at 10:23 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:


    On 11/25/19 11:45 PM, Kenneth Knowles wrote:


    On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi Rui,

        > Hi Kenn, you think stateful DoFn based join can emit joined
        rows that never to be retracted because in stateful DoFn case
        joined rows will be controlled by timers and emit will be
        only once? If so I will agree with it. Generally speaking, if
        only emit once is the factor of needing retraction or not.

        that would imply buffering elements up until watermark, then
        sorting and so reduces to the option a) again, is that true?
        This also has to deal with allowed lateness, that would mean,
        that with allowed lateness greater than zero, there can still
        be multiple firings and so retractions are needed.

    Specifically, when I say "bi-temporal join" I mean
    unbounded-to-unbounded join where one of the join conditions is
    that elements are within event time distance d of one another. An
    element at time t will be saved until time t + 2d and then
    garbage collected. Every matching pair can be emitted immediately.

    OK, this might simplify things a little. Is there a design doc for
    that? If there are multiple LHS elements within event time
    distance from RHS element, which one should be joined? I suppose
    all of them, but that is not "(time-varying-)relational" join
    semantics. In that semantics only the last element must be joined,
    because that is how a (classical) relational database would see
    the relation at time T (the old record would have been overwritten
    and not be part of the output). Because of the time distance
    constraint this is different from the join I have in mind, because
    that simply joins every LHS element(s) to most recent RHS
    element(s) and vice versa, without any additional time constraints
    (that is the RHS "update" can happen arbitrarily far in past).

    Jan


    In the triggered CoGBK + join-product implementation, you do need
    retractions as a model concept. But you don't need full support,
    since they only need to be shipped as deltas and only from the
    CoGBK to the join-product transform where they are all consumed
    to create only positive elements. Again a delay is not required;
    this yields correct results with the "always" trigger.

    Neither case requires waiting or time sorting a whole buffer. The
    bi-temporal join requires something more, in a way, since you
    need to query by time range and GC time prefixes.

    Kenn

        Jan

        On 11/25/19 10:17 PM, Rui Wang wrote:


        On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:


            On 11/25/19 7:47 PM, Kenneth Knowles wrote:


            On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                I can put down a design document, but before that I
                need to clarify some things for me. I'm struggling
                to put all of this into a bigger picture. Sorry if
                the arguments are circulating, but I didn't notice
                any proposal of how to solve these. If anyone can
                disprove any of this logic it would be very much
                appreciated as I might be able to get from a dead end:

                 a) in the bi-temporal join you can either buffer
                until watermark, or emit false data that has to be
                retracted

            This is not the case. A stateful DoFn based join can
            emit immediately joined rows that will never need to be
            retracted. The need for retractions has to do with
            CoGBK-based implementation of a join.
            I fail to see how this could work. If I emit joined rows
            immediately without waiting for watermark to pass, I can
            join two elements, that don't belong to each other,
            because later can arrive element with lower time
            distance, that should have been joint in the place of
            the previously emitted one. This is wrong result that
            has to be retracted. Or what I'm missing?


        Hi Kenn, you think stateful DoFn based join can emit joined
        rows that never to be retracted because in stateful DoFn
        case joined rows will be controlled by timers and emit will
        be only once? If so I will agree with it. Generally
        speaking, if only emit once is the factor of needing
        retraction or not.

        In the past brainstorming, even having retractions ready,
        streaming join with windowing are likely be implemented by a
        style of CoGBK + stateful DoFn.


            I suggest that you work out the definition of the join
            you are interested in, with a good amount of
            mathematical rigor, and then consider the ways you can
            implement it. That is where a design doc will probably
            clarify things.

            Kenn

                 b) until retractions are 100% functional (and that
                is sort of holy grail for now), then the only
                solution is using a buffer holding data up to
                watermark *and then sort by event time*

                 c) even if retractions were 100% functional, there
                would have to be special implementation for batch
                case, because otherwise this would simply blow up
                downstream processing with insanely many false
                additions and subsequent retractions

                Property b) means that if we want this feature now,
                we must sort by event time and there is no way
                around. Property c) shows that even in the future,
                we must make (in certain cases) distinction between
                batch and streaming code paths, which seems weird
                to me, but it might be an option. But still, there
                is no way to express this join in batch case,
                because it would require either buffering (up to)
                whole input on local worker (doesn't look like
                viable option) or provide a way in user code to
                signal the need for ordering of data inside GBK
                (and we are there again :)). Yes, we might shift
                this need from stateful dofn to GBK like

                 input.apply(GroupByKey.sorted())

                I cannot find a good reasoning why this would be
                better than giving this semantics to (stateful) ParDo.

                Maybe someone can help me out here?

                Jan

                On 11/24/19 5:05 AM, Kenneth Knowles wrote:
                I don't actually see how event time sorting
                simplifies this case much. You still need to
                buffer elements until they can no longer be
                matched in the join, and you still need to query
                that buffer for elements that might match. The
                general "bi-temporal join" (without sorting)
                requires one new state type and then it has
                identical API, does not require any novel data
                structures or reasoning, yields better latency (no
                sort buffer delay), and discards less data (no
                sort buffer cutoff; watermark is better). Perhaps
                a design document about this specific case would
                clarify.

                Kenn

                On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    I didn't want to go too much into detail, but
                    to describe the idea roughly (ignoring the
                    problem of different window fns on both sides
                    to keep it as simple as possible):

                    rhs -----  \

                    flatten (on global window) ---- stateful par
                    do (sorted by event time)  ---- output

                    lhs -----  /

                    If we can guarantee event time order arrival
                    of events into the stateful pardo, then the
                    whole complexity reduces to keep current value
                    of left and right element and just flush them
                    out each time there is an update. That is the
                    "knob" is actually when watermark moves,
                    because it is what tells the join operation
                    that there will be no more (not late) input.
                    This is very, very simplified, but depicts the
                    solution. The "classical" windowed join
                    reduces to this if all data in each window is
                    projected onto window end boundary. Then there
                    will be a cartesian product, because all the
                    elements have the same timestamp. I can put
                    this into a design doc with all the details, I
                    was trying to find out if there is or was any
                    effort around this.

                    I was in touch with Reza in the PR #9032, I
                    think that it currently suffers from problems
                    with running this on batch.

                    I think I can even (partly) resolve the
                    retraction issue (for joins), as described on
                    the thread [1]. Shortly, there can be two
                    copies of the stateful dofn, one running at
                    watermark and the other at (watermark -
                    allowed lateness). One would produce ON_TIME
                    (maybe wrong) results, the other would produce
                    LATE but correct ones. Being able to compare
                    them, the outcome would be that it would be
                    possible to retract the wrong results.

                    Yes, this is also about providing more
                    evidence of why I think event-time sorting
                    should be (somehow) part of the model. :-)

                    Jan

                    [1]
                    
https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

                    On 11/23/19 5:54 AM, Kenneth Knowles wrote:
                    +Mikhail Gryzykhin <mailto:mig...@google.com>
                    +Rui Wang <mailto:ruw...@google.com> +Reza
                    Rokni <mailto:r...@google.com> who have all
                    done some investigations here.


                    On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:


                        On 11/22/19 7:54 PM, Reuven Lax wrote:


                        On Fri, Nov 22, 2019 at 10:19 AM Jan
                        Lukavský <je...@seznam.cz
                        <mailto:je...@seznam.cz>> wrote:

                            Hi Reuven,

                            I didn't investigate that particular
                            one, but looking into that now, it
                            looks that is (same as the "classic"
                            join library) builds around CoGBK.
                            Is that correct? If yes, then it
                            essentially means that it:

                             - works only for cases where both
                            sides have the same windowfn (that
                            is limitation of Flatten that
                            precedes CoGBK)

                        Correct. Did you want to join different
                        windows? If so what are the semantics?
                        If the lhs has FixedWindows and the rhs
                        has SessionWindows, what do you want
                        the join semantics to be? The only thing
                        I could imagine would be for the user to
                        provide some function telling the join
                        how to map the windows together, but
                        that could be pretty complicated.
                        I don't want to go too far into details,
                        but generally both lhs and rhs can be put
                        onto time line and then full join can be
                        defined as each pair of (lhs, first
                        preceding rhs) and (rhs, first preceding
                        lhs). Then the end of window is
                        semantically just clearing the joined
                        value (setting it to null, thus at the
                        end of window there will be pair (lhs,
                        null) or (null, rhs) in case of full
                        outer join). This way any combination of
                        windows is possible, because all window
                        does is that it "scopes" validity of
                        respective values (lhs, rhs).


                    I think it is very valid to hope to do a join
                    in the sense of a relational join where it is
                    row-to-row. In this case, Beam's concept of
                    windowing may or may not make sense. It is
                    just a tool for the job. It is just a
                    grouping key that provides a time when state
                    can be deleted. So I would say your use case
                    is more global window to global window join.
                    That is what I think of as a true
                    stream-to-stream join anyhow. You probably
                    don't want to wait forever for output. So
                    you'll need to use some knob other than Beam
                    windows or triggers.

                    Reza has prototyped a join like you describe
                    here: https://github.com/apache/beam/pull/9032

                    If your join condition explicitly includes
                    the event time distance between elements,
                    then it could "just work". If that isn't
                    really part of your join condition, then you
                    will have to see this restriction as a "knob"
                    that you tweak on your results.

                             - when using global window, there
                            has to be trigger and (afaik) there
                            is no trigger that would guarantee
                            firing after each data element (for
                            early panes) (because triggers are
                            there to express cost-latency
                            tradeoff, not semantics)


                        Can you explain the use case where this
                        matters? If you do trigger
                        elementCountAtLeast(1) on the join, then
                        the consumer will simply see a
                        continuous stream of outputs. I'm not
                        sure I understand why the consumer cares
                        that some of those outputs were in a
                        pane that really held 3 outputs instead
                        of 1.

                        What I'm trying to solve is basically this:

                         - lhs is event stream

                         - rhs is stream of a "state updates"

                        purpose of the join is "take each event,
                        pair it with currently valid state and
                        produce output and possibly modified
                        state". I cannot process two events at a
                        time, because first event can modify the
                        state and the subsequent event should see
                        this. It is not a "simple" stateful pardo
                        either, because the state can be modified
                        externally (not going into too much
                        detail here, but e.g. by writing into
                        kafka topic).

                    Reuven's explanation is missing some detail.
                    If the CoGBK is in discarding mode, then it
                    will miss join results. If the CoGBK is in
                    accumulating mode, it will duplicate join
                    results. This is a known problem and the
                    general solution is retractions.

                    Basically, CoGBK-based joins just don't work
                    with triggers until we have retractions.

                            Moreover, I'd like to define the
                            join semantics so that when there
                            are available elements from both
                            sides, the fired pane should be
                            ON_TIME, not EARLY. That essentially
                            means that the fully general case
                            would not be built around (Co)GBK,
                            but stateful ParDo. There are
                            specific options where this fully
                            general case "degrades" into forms
                            that can be efficiently expressed
                            using (Co)GBK, that is true.


                        BTW building this around stateful DoFn
                        might be a better fit. The main reason I
                        didn't is because we would need a good
                        distributed MapState (something
                        discussed fairly recently on the list),
                        and that is not yet built. Once we had
                        that, I might be inclined to rewrite
                        this join on stateful DoFn.
                        Yes, the sorted state helps for streaming
                        case. But I'd be careful about that for
                        batch case, where this might lead to high
                        pressure on the state (and
                        InMemoryStateInternals might OOME for
                        instance).

                        However can you explain what you are
                        expecting from the pane? An EARLY pane
                        simply means that we are producing
                        output before the end of the window. If
                        you are in the global window triggering
                        every element, then every output is
                        EARLY. It might seem weird if you are
                        interpreting EARLY as "outputting data
                        that isn't ready," however that's not
                        what EARLY is defined to be. Any change
                        to the pane semantics would be a major
                        breaking change to very fundamental
                        semantics.

                        I wonder if you are really objecting to
                        the name EARLY and ON_TIME? Maybe we
                        would've been better off tagging it
                        BEFORE_WINDOW_END instead of EARLY, to
                        make it clear what is meant?

                        Essentially I don't object anything here.
                        I'm missing solution to the "event vs.
                        state" join described above. I was
                        thinking about how to make these types of
                        problems more user friendly and it
                        essentially leads to creating a somewhat
                        more generic semantics of join, where
                        end-of-window is converted into
                        "'value-delete events" and then just
                        joining by the "previous" or "valid"
                        value (yes, this relates to validity
                        windows mentioned on Beam Summit Europe).
                        It actually turns out that with some work
                        we could define quite "naturally" a join
                        on two streams with global window and no
                        trigger. It would even function with
                        lowest latency possible (but yes, with
                        the highest expenses, it is actually the
                        introduction of (same!) windows that
                        enable certain optimizations). It the
                        correctly defines semantics for different
                        windows, although the result would be
                        (probably unexpectedly) windowed using
                        global window. But that doesn't seem to
                        be any breaking change, because it is
                        currently not possible (any such pipeline
                        will not be validated).

                        Maybe for reference, the unwindowed join
                        would be what is described here [1]

                        [1]
                        
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

                            Jan

                            On 11/22/19 6:47 PM, Reuven Lax wrote:
                            Have you seen the Join library that
                            is part of schemas? I'm curious
                            whether this fits your needs, or
                            there's something lacking there.

                            On Fri, Nov 22, 2019 at 12:31 AM
                            Jan Lukavský <je...@seznam.cz
                            <mailto:je...@seznam.cz>> wrote:

                                Hi,

                                based on roadmap [1], we would
                                like to define and implement a
                                full set
                                of (unified) stream-stream
                                joins. That would include:

                                  - joins (left, right, full
                                outer) on global window with
                                "immediate
                                trigger"

                                  - joins with different
                                windowing functions on left and
                                right side

                                The approach would be to define
                                these operations in a natural
                                way, so
                                that the definition is aligned
                                with how current joins work (same
                                windows, cartesian product of
                                values with same keys, output
                                timestamp
                                projected to the end of window,
                                etc.). Because this should be a
                                generic
                                approach, this effort should
                                probably be part of join
                                library, that can
                                the be reused by other
                                components, too (e.g. SQL).

                                The question is - is (or was)
                                there any effort that we can
                                build upon?
                                Or should this be designed from
                                scratch?

                                Jan

                                [1]
                                https://beam.apache.org/roadmap/euphoria/

Reply via email to