Hi Reza,

I think it probably would provide enough compression. But it would introduce complications and latency for the streaming case. Although I see your point, I was trying to figure out if the Beam model should support these use cases more "natively".

Cheers,

 Jan

On 5/21/19 11:03 AM, Reza Rokni wrote:
In a lot of cases the initial combiner can dramatically reduce the amount of data in this last phase making it tractable for a lot of use cases.

 I assume in your example the first phase would not provide enough compression?

Cheers

Reza

On Tue, 21 May 2019, 16:47 Jan Lukavský, <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Reza, thanks for reaction, comments inline.

    On 5/21/19 1:02 AM, Reza Rokni wrote:
    Hi,

    If I have understood the use case correctly, your output is an
    ordered counter of state changes.

    One approach  which might be worth exploring is outlined below,
    haven't had a chance to test it so could be missing pieces or be
    plane old wrong ( will try and come up with a test example later
    on to try it out).

    1 - Window into a small enough Duration such that the number of
    elements in a window per key can be read into memory structure
    for sorting.
    2 - GBK
    3 - In a DoFn do the ordering and output a Timestamped<V>
    elements that contain the state changes for just that window and
    the value of the last element  {timestamp-00:00:00: (one: 1,
    zero: 0, lastElement : 0)}. This will cause memory pressure so
    your step 1 is important.
    This is just an optimization, right?
    4- Window these outputs into the Global Window with a Stateful DoFn

    Because you finally have to do the stateful ParDo in Global
    window, you will end up with the same problem - the first three
    steps just might give you some extra time. But if you have enough
    data (long enough history, of very frequent changes, or both),
    then you will run into the same issues as without the optimization
    here. The BagState simply would not be able to hold all the data
    in batch case.

    Jan

    5-  Add elements to a BagState in the stateful dofn
    6 - In the Global Window set an EventTimer to fire at time
    boundaries that match the time window that you need. Note Timers
    do not have a read function for the time that they are set. (Here
    is one way to set metadata to emulate a read function
    
<https://stackoverflow.com/questions/55912522/setting-a-timer-to-the-minimum-timestamp-seen/55912542#55912542>)
    Again this can cause memory pressure.
    7 - At each OnTimer,
    7a-  read and sort the elements in the BagState,
    7b - True up the state changes with the cross-window state
    changes from the list.
    7c - Store the last accumulator into a different State

    Sorry that was off the top of my head so could be missing things.
    For example LateData would need to be dealt with outside of this
    flow...

    Cheers
    Reza

    On Tue, 21 May 2019 at 07:00, Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        Thanks for the nice small example of a calculation that
        depends on order. You are right that many state machines have
        this property. I agree w/ you and Luke that it is convenient
        for batch processing to sort by event timestamp before
        running a stateful ParDo. In streaming you could also
        implement "sort by event timestamp" by buffering until you
        know all earlier data will be dropped - a slack buffer up to
        allowed lateness.

        I do not think that it is OK to sort in batch and not in
        streaming. Many state machines diverge very rapidly when
        things are out of order. So each runner if they see the
        "@OrderByTimestamp" annotation (or whatever) needs to deliver
        sorted data (by some mix of buffering and dropping), or to
        reject the pipeline as unsupported.

        And also want to say that this is not the default case - many
        uses of state & timers in ParDo yield different results at
        the element level, but the results are equivalent at in the
        big picture. Such as the example of "assign a unique sequence
        number to each element" or "group into batches" it doesn't
        matter exactly what the result is, only that it meets the
        spec. And other cases like user funnels are monotonic enough
        that you also don't actually need sorting.

        Kenn

        On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:

            Yes, the problem will arise probably mostly when you have
            not well distributed keys (or too few keys). I'm really
            not sure if a pure GBK with a trigger can solve this - it
            might help to have data driven trigger. There would still
            be some doubts, though. The main question is still here -
            people say, that sorting by timestamp before stateful
            ParDo would be prohibitively slow, but I don't really see
            why - the sorting is very probably already there. And if
            not (hash grouping instead of sorted grouping), then the
            sorting would affect only user defined StatefulParDos.

            This would suggest that the best way out of this would be
            really to add annotation, so that the author of the
            pipeline can decide.

            If that would be acceptable I think I can try to prepare
            some basic functionality, but I'm not sure, if I would be
            able to cover all runners / sdks.

            On 5/20/19 11:36 PM, Lukasz Cwik wrote:
            It is read all per key and window and not just read all
            (this still won't scale with hot keys in the global
            window). The GBK preceding the StatefulParDo will
            guarantee that you are processing all the values for a
            specific key and window at any given time. Is there a
            specific window/trigger that is missing that you feel
            would remove the need for you to use StatefulParDo?

            On Mon, May 20, 2019 at 12:54 PM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                Hi Lukasz,

                > Today, if you must have a strict order, you must
                guarantee that your StatefulParDo implements the
                necessary "buffering & sorting" into state.

                Yes, no problem with that. But this whole discussion
                started, because *this doesn't work on batch*. You
                simply cannot first read everything from distributed
                storage and then buffer it all into memory, just to
                read it again, but sorted. That will not work. And
                even if it would, it would be a terrible waste of
                resources.

                Jan

                On 5/20/19 8:39 PM, Lukasz Cwik wrote:


                On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
                <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

                    This discussion brings many really interesting
                    questions for me. :-)

                     > I don't see batch vs. streaming as part of
                    the model. One can have
                    microbatch, or even a runner that alternates
                    between different modes.

                    Although I understand motivation of this
                    statement, this project name is
                    "Apache Beam: An advanced unified programming
                    model". What does the
                    model unify, if "streaming vs. batch" is not
                    part of the model?

                    Using microbatching, chaining of batch jobs, or
                    pure streaming are
                    exactly the "runtime
                    conditions/characteristics" I refer to. All these
                    define several runtime parameters, which in
                    turn define how well/badly
                    will the pipeline perform and how many
                    resources might be needed. From
                    my point of view, pure streaming should be the
                    most resource demanding
                    (if not, why bother with batch? why not run
                    everything in streaming
                    only? what will there remain to "unify"?).

                     > Fortunately, for batch, only the state for a
                    single key needs to be
                    preserved at a time, rather than the state for
                    all keys across the range
                    of skew. Of course if you have few or hot keys,
                    one can still have
                    issues (and this is not specific to StatefulDoFns).

                    Yes, but here is still the presumption that my
                    stateful DoFn can
                    tolerate arbitrary shuffling of inputs. Let me
                    explain the use case in
                    more detail.

                    Suppose you have input stream consisting of 1s
                    and 0s (and some key for
                    each element, which is irrelevant for the
                    demonstration). Your task is
                    to calculate in running global window the
                    actual number of changes
                    between state 0 and state 1 and vice versa.
                    When the state doesn't
                    change, you don't calculate anything. If input
                    (for given key) would be
                    (tN denotes timestamp N):

                      t1: 1

                      t2: 0

                      t3: 0

                      t4: 1

                      t5: 1

                      t6: 0

                    then the output should yield (supposing that
                    default state is zero):

                      t1: (one: 1, zero: 0)

                      t2: (one: 1, zero: 1)

                      t3: (one: 1, zero: 1)

                      t4: (one: 2, zero: 1)

                      t5: (one: 2, zero: 1)

                      t6: (one: 2, zero: 2)

                    How would you implement this in current Beam
                    semantics?

                I think your saying here that I know that my input
                is ordered in a specific way and since I assume the
                order when writing my pipeline I can perform this
                optimization. But there is nothing preventing a
                runner from noticing that your processing in the
                global window with a specific type of trigger and
                re-ordering your inputs/processing to get better
                performance (since you can't use an AfterWatermark
                trigger for your pipeline in streaming for the
                GlobalWindow).

                Today, if you must have a strict order, you must
                guarantee that your StatefulParDo implements the
                necessary "buffering & sorting" into state. I can
                see why you would want an annotation that says I
                must have timestamp ordered elements, since it
                makes writing certain StatefulParDos much easier.
                StatefulParDo is a low-level function, it really is
                the "here you go and do whatever you need to but
                here be dragons" function while windowing and
                triggering is meant to keep many people from
                writing StatefulParDo in the first place.

                     > Pipelines that fail in the "worst case"
                    batch scenario are likely to
                    degrade poorly (possibly catastrophically) when
                    the watermark falls
                    behind in streaming mode as well.

                    But the worst case is defined by input of size
                    (available resources +
                    single byte) -> pipeline fail. Although it
                    could have finished, given
                    the right conditions.

                     > This might be reasonable, implemented by
                    default by buffering
                    everything and releasing elements as the
                    watermark (+lateness) advances,
                    but would likely lead to inefficient (though
                    *maybe* easier to reason
                    about) code.

                    Sure, the pipeline will be less efficient,
                    because it would have to
                    buffer and sort the inputs. But at least it
                    will produce correct results
                    in cases where updates to state are
                    order-sensitive.

                     > Would it be roughly equivalent to GBK +
                    FlatMap(lambda (key, values):
                    [(key, value) for value in values])?

                    I'd say roughly yes, but difference would be in
                    the trigger. The trigger
                    should ideally fire as soon as watermark
                    (+lateness) crosses element
                    with lowest timestamp in the buffer. Although
                    this could be somehow
                    emulated by fixed trigger each X millis.

                     > Or is the underlying desire just to be able
                    to hint to the runner
                    that the code may perform better (e.g. require
                    less resources) as skew
                    is reduced (and hence to order by timestamp iff
                    it's cheap)?

                    No, the sorting would have to be done in
                    streaming case as well. That is
                    an imperative of the unified model. I think it
                    is possible to sort by
                    timestamp only in batch case (and do it for
                    *all* batch stateful pardos
                    without annotation), or introduce annotation,
                    but then make the same
                    guarantees for streaming case as well.

                    Jan

                    On 5/20/19 4:41 PM, Robert Bradshaw wrote:
                    > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
                    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
                    >> Hi Robert,
                    >>
                    >> yes, I think you rephrased my point -
                    although no *explicit* guarantees
                    >> of ordering are given in either mode, there
                    is *implicit* ordering in
                    >> streaming case that is due to nature of the
                    processing - the difference
                    >> between watermark and timestamp of elements
                    flowing through the pipeline
                    >> are generally low (too high difference leads
                    to the overbuffering
                    >> problem), but there is no such bound on batch.
                    > Fortunately, for batch, only the state for a
                    single key needs to be
                    > preserved at a time, rather than the state
                    for all keys across the
                    > range of skew. Of course if you have few or
                    hot keys, one can still
                    > have issues (and this is not specific to
                    StatefulDoFns).
                    >
                    >> As a result, I see a few possible solutions:
                    >>
                    >>    - the best and most natural seems to be
                    extension of the model, so
                    >> that it defines batch as not only "streaming
                    pipeline executed in batch
                    >> fashion", but "pipeline with at least as
                    good runtime characteristics as
                    >> in streaming case, executed in batch
                    fashion", I really don't think that
                    >> there are any conflicts with the current
                    model, or that this could
                    >> affect performance, because the required
                    sorting (as pointed by
                    >> Aljoscha) is very probably already done
                    during translation of stateful
                    >> pardos. Also note that this definition only
                    affects user defined
                    >> stateful pardos
                    > I don't see batch vs. streaming as part of
                    the model. One can have
                    > microbatch, or even a runner that alternates
                    between different modes.
                    > The model describes what the valid outputs
                    are given a (sometimes
                    > partial) set of inputs. It becomes really
                    hard to define things like
                    > "as good runtime characteristics." Once you
                    allow any
                    > out-of-orderedness, it is not very feasible
                    to try and define (and
                    > more cheaply implement) a "upper bound" of
                    acceptable
                    > out-of-orderedness.
                    >
                    > Pipelines that fail in the "worst case" batch
                    scenario are likely to
                    > degrade poorly (possibly catastrophically)
                    when the watermark falls
                    > behind in streaming mode as well.
                    >
                    >>    - another option would be to introduce
                    annotation for DoFns (e.g.
                    >> @RequiresStableTimeCharacteristics), which
                    would result in the sorting
                    >> in batch case - but - this extension would
                    have to ensure the sorting in
                    >> streaming mode also - it would require
                    definition of allowed lateness,
                    >> and triggger (essentially similar to window)
                    > This might be reasonable, implemented by
                    default by buffering
                    > everything and releasing elements as the
                    watermark (+lateness)
                    > advances, but would likely lead to
                    inefficient (though *maybe* easier
                    > to reason about) code. Not sure about the
                    semantics of triggering
                    > here, especially data-driven triggers. Would
                    it be roughly equivalent
                    > to GBK + FlatMap(lambda (key, values): [(key,
                    value) for value in
                    > values])?
                    >
                    > Or is the underlying desire just to be able
                    to hint to the runner that
                    > the code may perform better (e.g. require
                    less resources) as skew is
                    > reduced (and hence to order by timestamp iff
                    it's cheap)?
                    >
                    >>    - last option would be to introduce these
                    "higher order guarantees" in
                    >> some extension DSL (e.g. Euphoria), but that
                    seems to be the worst
                    >> option to me
                    >>
                    >> I see the first two options quite equally
                    good, although the letter one
                    >> is probably more time consuming to
                    implement. But it would bring
                    >> additional feature to streaming case as well.
                    >>
                    >> Thanks for any thoughts.
                    >>
                    >>    Jan
                    >>
                    >> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
                    >>> On Fri, May 17, 2019 at 4:48 PM Jan
                    Lukavský <je...@seznam.cz
                    <mailto:je...@seznam.cz>> wrote:
                    >>>> Hi Reuven,
                    >>>>
                    >>>>> How so? AFAIK stateful DoFns work just
                    fine in batch runners.
                    >>>> Stateful ParDo works in batch as far, as
                    the logic inside the state works for absolutely
                    unbounded out-of-orderness of elements. That
                    basically (practically) can work only for
                    cases, where the order of input elements
                    doesn't matter. But, "state" can refer to
                    "state machine", and any time you have a state
                    machine involved, then the ordering of elements
                    would matter.
                    >>> No guarantees on order are provided in
                    *either* streaming or batch
                    >>> mode by the model. However, it is the case
                    that in order to make
                    >>> forward progress most streaming runners
                    attempt to limit the amount of
                    >>> out-of-orderedness of elements (in terms of
                    event time vs. processing
                    >>> time) to make forward progress, which in
                    turn could help cap the
                    >>> amount of state that must be held
                    concurrently, whereas a batch runner
                    >>> may not allow any state to be safely
                    discarded until the whole
                    >>> timeline from infinite past to infinite
                    future has been observed.
                    >>>
                    >>> Also, as pointed out, state is not
                    preserved "batch to batch" in batch mode.
                    >>>
                    >>>
                    >>> On Thu, May 16, 2019 at 3:59 PM Maximilian
                    Michels <m...@apache.org
                    <mailto:m...@apache.org>> wrote:
                    >>>
                    >>>>>    batch semantics and streaming
                    semantics differs only in that I can have
                    GlobalWindow with default trigger on batch and
                    cannot on stream
                    >>>> You can have a GlobalWindow in streaming
                    with a default trigger. You
                    >>>> could define additional triggers that do
                    early firings. And you could
                    >>>> even trigger the global window by
                    advancing the watermark to +inf.
                    >>> IIRC, as a pragmatic note, we prohibited
                    global window with default
                    >>> trigger on unbounded PCollections in the
                    SDK because this is more
                    >>> likely to be user error than an actual
                    desire to have no output until
                    >>> drain. But it's semantically valid in the
                    model.



--
    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please let me
    know that it has gone to the wrong person.

    The above terms reflect a potential business arrangement, are
    provided solely as a basis for further discussion, and are not
    intended to be and do not constitute a legally binding
    obligation. No legally binding obligations will be created,
    implied, or inferred until an agreement in final form is executed
    in writing by all parties involved.

Reply via email to