Hi Reza, thanks for reaction, comments inline.

On 5/21/19 1:02 AM, Reza Rokni wrote:
Hi,

If I have understood the use case correctly, your output is an ordered counter of state changes.

One approach  which might be worth exploring is outlined below, haven't had a chance to test it so could be missing pieces or be plane old wrong ( will try and come up with a test example later on to try it out).

1 - Window into a small enough Duration such that the number of elements in a window per key can be read into memory structure for sorting.
2 - GBK
3 - In a DoFn do the ordering and output a Timestamped<V> elements that contain the state changes for just that window and the value of the last element {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This will cause memory pressure so your step 1 is important.
This is just an optimization, right?
4- Window these outputs into the Global Window with a Stateful DoFn

Because you finally have to do the stateful ParDo in Global window, you will end up with the same problem - the first three steps just might give you some extra time. But if you have enough data (long enough history, of very frequent changes, or both), then you will run into the same issues as without the optimization here. The BagState simply would not be able to hold all the data in batch case.

Jan

5-  Add elements to a BagState in the stateful dofn
6 - In the Global Window set an EventTimer to fire at time boundaries that match the time window that you need. Note Timers do not have a read function for the time that they are set. (Here is one way to set metadata to emulate a read function <https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542>) Again this can cause memory pressure.
7 - At each OnTimer,
7a-  read and sort the elements in the BagState,
7b - True up the state changes with the cross-window state changes from the list.
7c - Store the last accumulator into a different State

Sorry that was off the top of my head so could be missing things. For example LateData would need to be dealt with outside of this flow...

Cheers
Reza

On Tue, 21 May 2019 at 07:00, Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:

    Thanks for the nice small example of a calculation that depends on
    order. You are right that many state machines have this property.
    I agree w/ you and Luke that it is convenient for batch processing
    to sort by event timestamp before running a stateful ParDo. In
    streaming you could also implement "sort by event timestamp" by
    buffering until you know all earlier data will be dropped - a
    slack buffer up to allowed lateness.

    I do not think that it is OK to sort in batch and not in
    streaming. Many state machines diverge very rapidly when things
    are out of order. So each runner if they see the
    "@OrderByTimestamp" annotation (or whatever) needs to deliver
    sorted data (by some mix of buffering and dropping), or to reject
    the pipeline as unsupported.

    And also want to say that this is not the default case - many uses
    of state & timers in ParDo yield different results at the element
    level, but the results are equivalent at in the big picture. Such
    as the example of "assign a unique sequence number to each
    element" or "group into batches" it doesn't matter exactly what
    the result is, only that it meets the spec. And other cases like
    user funnels are monotonic enough that you also don't actually
    need sorting.

    Kenn

    On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Yes, the problem will arise probably mostly when you have not
        well distributed keys (or too few keys). I'm really not sure
        if a pure GBK with a trigger can solve this - it might help to
        have data driven trigger. There would still be some doubts,
        though. The main question is still here - people say, that
        sorting by timestamp before stateful ParDo would be
        prohibitively slow, but I don't really see why - the sorting
        is very probably already there. And if not (hash grouping
        instead of sorted grouping), then the sorting would affect
        only user defined StatefulParDos.

        This would suggest that the best way out of this would be
        really to add annotation, so that the author of the pipeline
        can decide.

        If that would be acceptable I think I can try to prepare some
        basic functionality, but I'm not sure, if I would be able to
        cover all runners / sdks.

        On 5/20/19 11:36 PM, Lukasz Cwik wrote:
        It is read all per key and window and not just read all (this
        still won't scale with hot keys in the global window). The
        GBK preceding the StatefulParDo will guarantee that you are
        processing all the values for a specific key and window at
        any given time. Is there a specific window/trigger that is
        missing that you feel would remove the need for you to use
        StatefulParDo?

        On Mon, May 20, 2019 at 12:54 PM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            Hi Lukasz,

            > Today, if you must have a strict order, you must
            guarantee that your StatefulParDo implements the
            necessary "buffering & sorting" into state.

            Yes, no problem with that. But this whole discussion
            started, because *this doesn't work on batch*. You simply
            cannot first read everything from distributed storage and
            then buffer it all into memory, just to read it again,
            but sorted. That will not work. And even if it would, it
            would be a terrible waste of resources.

            Jan

            On 5/20/19 8:39 PM, Lukasz Cwik wrote:


            On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                This discussion brings many really interesting
                questions for me. :-)

                 > I don't see batch vs. streaming as part of the
                model. One can have
                microbatch, or even a runner that alternates between
                different modes.

                Although I understand motivation of this statement,
                this project name is
                "Apache Beam: An advanced unified programming
                model". What does the
                model unify, if "streaming vs. batch" is not part of
                the model?

                Using microbatching, chaining of batch jobs, or pure
                streaming are
                exactly the "runtime conditions/characteristics" I
                refer to. All these
                define several runtime parameters, which in turn
                define how well/badly
                will the pipeline perform and how many resources
                might be needed. From
                my point of view, pure streaming should be the most
                resource demanding
                (if not, why bother with batch? why not run
                everything in streaming
                only? what will there remain to "unify"?).

                 > Fortunately, for batch, only the state for a
                single key needs to be
                preserved at a time, rather than the state for all
                keys across the range
                of skew. Of course if you have few or hot keys, one
                can still have
                issues (and this is not specific to StatefulDoFns).

                Yes, but here is still the presumption that my
                stateful DoFn can
                tolerate arbitrary shuffling of inputs. Let me
                explain the use case in
                more detail.

                Suppose you have input stream consisting of 1s and
                0s (and some key for
                each element, which is irrelevant for the
                demonstration). Your task is
                to calculate in running global window the actual
                number of changes
                between state 0 and state 1 and vice versa. When the
                state doesn't
                change, you don't calculate anything. If input (for
                given key) would be
                (tN denotes timestamp N):

                  t1: 1

                  t2: 0

                  t3: 0

                  t4: 1

                  t5: 1

                  t6: 0

                then the output should yield (supposing that default
                state is zero):

                  t1: (one: 1, zero: 0)

                  t2: (one: 1, zero: 1)

                  t3: (one: 1, zero: 1)

                  t4: (one: 2, zero: 1)

                  t5: (one: 2, zero: 1)

                  t6: (one: 2, zero: 2)

                How would you implement this in current Beam semantics?

            I think your saying here that I know that my input is
            ordered in a specific way and since I assume the order
            when writing my pipeline I can perform this
            optimization. But there is nothing preventing a runner
            from noticing that your processing in the global window
            with a specific type of trigger and re-ordering your
            inputs/processing to get better performance (since you
            can't use an AfterWatermark trigger for your pipeline in
            streaming for the GlobalWindow).

            Today, if you must have a strict order, you must
            guarantee that your StatefulParDo implements the
            necessary "buffering & sorting" into state. I can see
            why you would want an annotation that says I must have
            timestamp ordered elements, since it makes writing
            certain StatefulParDos much easier. StatefulParDo is a
            low-level function, it really is the "here you go and do
            whatever you need to but here be dragons" function while
            windowing and triggering is meant to keep many people
            from writing StatefulParDo in the first place.

                 > Pipelines that fail in the "worst case" batch
                scenario are likely to
                degrade poorly (possibly catastrophically) when the
                watermark falls
                behind in streaming mode as well.

                But the worst case is defined by input of size
                (available resources +
                single byte) -> pipeline fail. Although it could
                have finished, given
                the right conditions.

                 > This might be reasonable, implemented by default
                by buffering
                everything and releasing elements as the watermark
                (+lateness) advances,
                but would likely lead to inefficient (though *maybe*
                easier to reason
                about) code.

                Sure, the pipeline will be less efficient, because
                it would have to
                buffer and sort the inputs. But at least it will
                produce correct results
                in cases where updates to state are order-sensitive.

                 > Would it be roughly equivalent to GBK +
                FlatMap(lambda (key, values):
                [(key, value) for value in values])?

                I'd say roughly yes, but difference would be in the
                trigger. The trigger
                should ideally fire as soon as watermark (+lateness)
                crosses element
                with lowest timestamp in the buffer. Although this
                could be somehow
                emulated by fixed trigger each X millis.

                 > Or is the underlying desire just to be able to
                hint to the runner
                that the code may perform better (e.g. require less
                resources) as skew
                is reduced (and hence to order by timestamp iff it's
                cheap)?

                No, the sorting would have to be done in streaming
                case as well. That is
                an imperative of the unified model. I think it is
                possible to sort by
                timestamp only in batch case (and do it for *all*
                batch stateful pardos
                without annotation), or introduce annotation, but
                then make the same
                guarantees for streaming case as well.

                Jan

                On 5/20/19 4:41 PM, Robert Bradshaw wrote:
                > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
                >> Hi Robert,
                >>
                >> yes, I think you rephrased my point - although no
                *explicit* guarantees
                >> of ordering are given in either mode, there is
                *implicit* ordering in
                >> streaming case that is due to nature of the
                processing - the difference
                >> between watermark and timestamp of elements
                flowing through the pipeline
                >> are generally low (too high difference leads to
                the overbuffering
                >> problem), but there is no such bound on batch.
                > Fortunately, for batch, only the state for a
                single key needs to be
                > preserved at a time, rather than the state for all
                keys across the
                > range of skew. Of course if you have few or hot
                keys, one can still
                > have issues (and this is not specific to
                StatefulDoFns).
                >
                >> As a result, I see a few possible solutions:
                >>
                >>    - the best and most natural seems to be
                extension of the model, so
                >> that it defines batch as not only "streaming
                pipeline executed in batch
                >> fashion", but "pipeline with at least as good
                runtime characteristics as
                >> in streaming case, executed in batch fashion", I
                really don't think that
                >> there are any conflicts with the current model,
                or that this could
                >> affect performance, because the required sorting
                (as pointed by
                >> Aljoscha) is very probably already done during
                translation of stateful
                >> pardos. Also note that this definition only
                affects user defined
                >> stateful pardos
                > I don't see batch vs. streaming as part of the
                model. One can have
                > microbatch, or even a runner that alternates
                between different modes.
                > The model describes what the valid outputs are
                given a (sometimes
                > partial) set of inputs. It becomes really hard to
                define things like
                > "as good runtime characteristics." Once you allow any
                > out-of-orderedness, it is not very feasible to try
                and define (and
                > more cheaply implement) a "upper bound" of acceptable
                > out-of-orderedness.
                >
                > Pipelines that fail in the "worst case" batch
                scenario are likely to
                > degrade poorly (possibly catastrophically) when
                the watermark falls
                > behind in streaming mode as well.
                >
                >>    - another option would be to introduce
                annotation for DoFns (e.g.
                >> @RequiresStableTimeCharacteristics), which would
                result in the sorting
                >> in batch case - but - this extension would have
                to ensure the sorting in
                >> streaming mode also - it would require definition
                of allowed lateness,
                >> and triggger (essentially similar to window)
                > This might be reasonable, implemented by default
                by buffering
                > everything and releasing elements as the watermark
                (+lateness)
                > advances, but would likely lead to inefficient
                (though *maybe* easier
                > to reason about) code. Not sure about the
                semantics of triggering
                > here, especially data-driven triggers. Would it be
                roughly equivalent
                > to GBK + FlatMap(lambda (key, values): [(key,
                value) for value in
                > values])?
                >
                > Or is the underlying desire just to be able to
                hint to the runner that
                > the code may perform better (e.g. require less
                resources) as skew is
                > reduced (and hence to order by timestamp iff it's
                cheap)?
                >
                >>    - last option would be to introduce these
                "higher order guarantees" in
                >> some extension DSL (e.g. Euphoria), but that
                seems to be the worst
                >> option to me
                >>
                >> I see the first two options quite equally good,
                although the letter one
                >> is probably more time consuming to implement. But
                it would bring
                >> additional feature to streaming case as well.
                >>
                >> Thanks for any thoughts.
                >>
                >>    Jan
                >>
                >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
                >>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
                >>>> Hi Reuven,
                >>>>
                >>>>> How so? AFAIK stateful DoFns work just fine in
                batch runners.
                >>>> Stateful ParDo works in batch as far, as the
                logic inside the state works for absolutely
                unbounded out-of-orderness of elements. That
                basically (practically) can work only for cases,
                where the order of input elements doesn't matter.
                But, "state" can refer to "state machine", and any
                time you have a state machine involved, then the
                ordering of elements would matter.
                >>> No guarantees on order are provided in *either*
                streaming or batch
                >>> mode by the model. However, it is the case that
                in order to make
                >>> forward progress most streaming runners attempt
                to limit the amount of
                >>> out-of-orderedness of elements (in terms of
                event time vs. processing
                >>> time) to make forward progress, which in turn
                could help cap the
                >>> amount of state that must be held concurrently,
                whereas a batch runner
                >>> may not allow any state to be safely discarded
                until the whole
                >>> timeline from infinite past to infinite future
                has been observed.
                >>>
                >>> Also, as pointed out, state is not preserved
                "batch to batch" in batch mode.
                >>>
                >>>
                >>> On Thu, May 16, 2019 at 3:59 PM Maximilian
                Michels <m...@apache.org <mailto:m...@apache.org>> wrote:
                >>>
                >>>>>    batch semantics and streaming semantics
                differs only in that I can have GlobalWindow with
                default trigger on batch and cannot on stream
                >>>> You can have a GlobalWindow in streaming with a
                default trigger. You
                >>>> could define additional triggers that do early
                firings. And you could
                >>>> even trigger the global window by advancing the
                watermark to +inf.
                >>> IIRC, as a pragmatic note, we prohibited global
                window with default
                >>> trigger on unbounded PCollections in the SDK
                because this is more
                >>> likely to be user error than an actual desire to
                have no output until
                >>> drain. But it's semantically valid in the model.



--

This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.

Reply via email to