Hi Reuven,

> How so? AFAIK stateful DoFns work just fine in batch runners.

Stateful ParDo works in batch as far, as the logic inside the state works for absolutely unbounded out-of-orderness of elements. That basically (practically) can work only for cases, where the order of input elements doesn't matter. But, "state" can refer to "state machine", and any time you have a state machine involved, then the ordering of elements would matter.

Jan

On 5/17/19 4:06 PM, Reuven Lax wrote:


*From: *Jozef Vilcek <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>
*Date: *Fri, May 17, 2019 at 2:31 AM
*To: * <dev@beam.apache.org <mailto:dev@beam.apache.org>>

    Interesting discussion. I think it is very important information,
    that when user will use a stateful ParDo, he can run into the
    situation where it will not behave correctly in "batch operating
    mode".


How so? AFAIK stateful DoFns work just fine in batch runners.

    But some transforms known to Beam, like fixed-window, will work
    fine? Is there a sorting applied to keyed elements before
    evaluating window key group? If answer is yes, then why not also
    do the same in case of stateful ParDo? It would feel consistent to
    me.

    Part of SDK or not, I see DataFlow runner is doing this
    optimisation, probably precisely for making stateful ParDo
    operations stable in batch mode
    
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64


    On Thu, May 16, 2019 at 5:09 PM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi Max,
        answers inline.
        ---------- Původní e-mail ----------
        Od: Maximilian Michels <m...@apache.org <mailto:m...@apache.org>>
        Komu: dev@beam.apache.org <mailto:dev@beam.apache.org>
        Datum: 16. 5. 2019 15:59:59
        Předmět: Re: Definition of Unified model (WAS: Semantics of
        PCollection.isBounded)

            Hi Jan,

            Thanks for the discussion. Aljoscha already gave great
            answers. Just a
            couple of remarks:

            > a) streaming semantics (i.e. what I can express using
            Transforms) are subset of batch semantics

            I think you mean streaming is a superset of batch, or
            batch is a subset
            of streaming. This is the ideal. In practice, the two
            execution modes
            are sometimes accomplished by two different execution
            engines. Even in
            Flink, we have independent APIs for batch and streaming
            and the
            execution semantics are slightly different. For example,
            there are no
            watermarks in the batch API. Thus, batch rarely is simply
            an execution
            mode of streaming. However, I still think the unified Beam
            model works
            in both cases.

            > batch semantics and streaming semantics differs only in
            that I can have GlobalWindow with default trigger on batch
and cannot on stream
        Actually I really thought, that regarding semantics, streaming
        should be subset of batch. That is because in batch, you can
        be sure that the watermark will eventually approach infinity.
        That gives you one additional feature, that streaming
        generally doesn't have (if you don't manually forward
        watermark to infinity as you suggest).



            You can have a GlobalWindow in streaming with a default
            trigger. You
            could define additional triggers that do early firings.
            And you could
            even trigger the global window by advancing the watermark
to +inf.
        Yes, but then you actually changed streaming to batch, you
        just execute batch pipeline in streaming way.



            > On batch engines, this is generally not an issue,
            because the buffering is eliminated by sorting - when a
            Group by operation occurs, batch runners sort elements
            with the same key to be together and therefore eliminate
            the need for potentially infinite cache.

            The batch engines you normally use might do that. However,
            I do not see
            how sorting is an inherent property of the streaming
            model. We do not
            guarantee a deterministic order of events in streaming
            with respect to
            event time. In that regard, batch is a true subset of
            streaming because
            we make no guarantees on the order. Actually, because we
            only advance
            the watermark from -inf to +inf once we have read all
            data, this nicely
aligns with the streaming model.

        Sure, streaming, doesn't  have the time ordering guarantees.
        Having so would be impractical. But - there is no issues in
        having these quarantees in batch mode, moreover, it gives the
        pipelines, that need to have "bounded out of orderness" the
        chance to ever finish.


        I think that there is some issues in how we think about the
        properties of batch vs. stream. If we define streaming as the
        superset, then we cannot define some properties for batch,
        that streaming doesn't have. But - if we just split it on the
        part of semantics and on the part of runtime properties and
        guarantees, than it is possible to define properties of batch,
        that streaming doesn't have.


        Jan




            -Max

            On 16.05.19 15:20, Aljoscha Krettek wrote:
            > Hi,
            >
            > I think it’s helpful to consider that events never truly
            arrive in order in the real world (you mentioned as much
            yourself). For streaming use cases, there might be some
            out-of-orderness (or a lot of it, depending on the use
            case) so your implementation has to be able to deal with
            that. On the other end of the spectrum we have batch use
            cases, where out-of-orderness is potentially even bigger
            because it allows for more efficient parallel execution.
            If your implementation can deal with out-of-orderness that
            also shouldn’t be a problem.
            >
            > Another angle is completeness vs. latency: you usually
            cannot have both in a streaming world. If you want 100 %
            completeness, i.e. you want to ensure that you process all
            events and never drop anything, you can never advance the
            watermark from its initial -Inf if you want to also never
            have watermark violations. In typical use cases I would
            expect any sorting guarantees to be constantly violated,
            unless you are willing to drop late data.
            >
            > I think these are some reasons why there is no mention
            of ordering by timestamp anywhere (unless I’m mistaken and
            there is somewhere).
            >
            > You are right, of course, that batch-style runners can
            use grouping/sorting for a GroupByKey operation. Flink
            does that and even allows sorting by secondary key, so you
            could manually sort by timestamp as a secondary key with
            hardly any additional cost. However, exposing that in the
            model would make implementing Runners quite hard, or they
            would be prohibitively slow.
            >
            > You’re also right that user functions that do arbitrary
            stateful operations can be quite dangerous and lead to
            unexpected behaviour. You example of reacting to changes
            in 0 and 1 would produce wrong results if events are not
            100% sorted by timestamp. In general, state changes that
            rely on processing order are problematic while operations
            that move monotonously though some space are fine.
            Examples of such operations are adding elements to a set
            or summing numbers. If you “see” a given set of events you
            can apply them to state in any order and as long as you
            see the same set of events on different executions the
            result will be the same.
            >
            > As for the Beam execution model in relation to
            processing and time, I think the only “guarantees” are:
            > - you will eventually see all events
            > - the timestamp of those events is usually not less than
            the watermark (but not always)
            > - the watermark will advance when the system thinks you
            won’t see events with a smaller timestamp in the future
            (but you sometimes might)
            >
            > Those seem quite “poor”, but I think you can’t get
            better guarantees for general cases for the reasons
            mentioned above. Also, this is just of the top of my head
            and I might be wrong in my understanding of the Beam
            model. :-O
            >
            > Best,
            > Aljoscha
            >
            >> On 16. May 2019, at 13:53, Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>
            >> Hi,
            >>
            >> this is starting to be really exciting. It seems to me
            that there is either something wrong with my definition of
            "Unified model" or with how it is implemented inside (at
            least) Direct and Flink Runners.
            >>
            >> So, first what I see as properties of Unified model:
            >>
            >> a) streaming semantics (i.e. what I can express using
            Transforms) are subset of batch semantics
            >>
            >> - this is true, batch semantics and streaming semantics
            differs only in that I can have GlobalWindow with default
            trigger on batch and cannot on stream
            >>
            >> b) runtime conditions of batch have to be subset of
            streaming conditions
            >>
            >> - this is because otherwise it might be intractable to
            run streaming pipeline on batch engine
            >>
            >> - generally this is also true - in batch mode watermark
            advances only between two states (-inf and +inf), which
            makes it possible to turn (most) stateful operations into
            group by key operations, and take advantage of many other
            optimizations (ability to re-read inputs make it possible
            to drop checkpointing, etc, etc)
            >>
            >> Now there is also one not so obvious runtime condition
            of streaming engines - that is how skewed watermark and
            event time of elements being processed can be - if this
            gets too high (i.e. watermark is not moving, and/or
            elements are very out-of-order, then the processing might
            become intractable, because everything might have to be
            buffered).
            >>
            >> On batch engines, this is generally not an issue,
            because the buffering is eliminated by sorting - when a
            Group by operation occurs, batch runners sort elements
            with the same key to be together and therefore eliminate
            the need for potentially infinite cache.
            >>
            >> When this turns out to be an issue, is whenever there
            is a stateful ParDo operation, because then (without
            sorting) there is violation of property b) - on streaming
            engine the difference between element timestamp and
            watermark will tend to be generally low (and late events
            will be dropped to restrict the size of buffers), but on
            batch it can be arbitrarily large and therefore size
            buffers that would be needed is potentially unbounded.
            >>
            >> This line of thinking leads me to a conclusion, that if
            Beam doesn't (on purpose) sort elements before stateful
            ParDo by timestamp, then it basically violates the Unified
            model, because pipelines with stateful ParDo will not
            function properly on batch engines. Which is what I
            observe - there is non determinism on batch pipeline
            although everything seems to be "well defined", elements
            arrive arbitrarily out of order and are arbitrarily out of
            order dropped. This leads to different results everytime
            batch pipeline is run.
            >>
            >> Looking forward to any comments on this.
            >>
            >> Jan
            >>
            >> On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
            >>> Please take this with a grain of salt, because I might
            be a bit rusty on this.
            >>>
            >>> I think the Beam model does not prescribe any ordering
            (by time or otherwise) on inputs. Mostly because always
            requiring it would be prohibitively expensive on most
            Runners, especially global sorting.
            >>>
            >>> If you want to have sorting by key, you could do a
            GroupByKey and then sort the groups in memory. This only
            works, of course, if your groups are not too large.
            >>>
            >>>> On 15. May 2019, at 21:01, Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >>>>
            >>>> Hmmm, looking into the code of FlinkRunner (and also
            by observing results from the stateful ParDo), it seems,
            that I got it wrong from the beginning. The data is not
            sorted before the stateful ParDo, but that a little
            surprises me. How the operator should work in this case?
            It would mean, that in the batch case I have to hold
            arbitrarily long allowedLateness inside the BagState,
            which seems to be kind of suboptimal. Or am I missing
            something obvious here? I'll describe the use case in more
            detail, let's suppose I have a series of ones and zeros
            and I want emit at each time point value of 1 if value
            changes from 0 to 1, value of -1 if changes from 1 to 0
            and 0 otherwise. So:
            >>>>
            >>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
            >>>>
            >>>> Does anyone have a better idea how to solve it? And
            if not, how to make it running on batch, without possibly
            infinite buffer? Should the input to stateful ParDo be
            sorted in batch case? My intuition would be that it should
            be, because in my understanding of "batch as a special
            case of streaming" in batch case, there is (by default)
            single window, time advances from -inf to +inf at the end,
            and the data contains no out of order data, in places
            where this might matter (which therefore enables some
            optimizations). The order would be relevant only in the
            stateful ParDo, I'd say.
            >>>>
            >>>> Jan
            >>>>
            >>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
            >>>>> Just to clarify, I understand, that changing
            semantics of the PCollection.isBounded, is probably
            impossible now, because would probably introduce chicken
            egg problem. Maybe I will state it more clearly - would it
            be better to be able to run bounded pipelines using batch
            semantics on DirectRunner (including sorting before
            stateful ParDos), or would it be better to come up with
            some way to notify the pipeline that it will be running in
            a streaming way although it consists only of bounded
            inputs? And I'm not saying how to do it, just trying to
            find out if anyone else ever had such a need.
            >>>>>
            >>>>> Jan
            >>>>>
            >>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
            >>>>>> Hi,
            >>>>>>
            >>>>>> I have come across unexpected (at least for me)
            behavior of some apparent inconsistency of how a
            PCollection is processed in DirectRunner and what
            PCollection.isBounded signals. Let me explain:
            >>>>>>
            >>>>>> - I have a stateful ParDo, which needs to make sure
            that elements arrive in order - it accomplishes this by
            defining BagState for buffering input elements and sorting
            them inside this buffer, it also keeps track of element
            with highest timestamp to somehow estimate local watermark
            (minus some allowed lateness), to know when to remove
            elements from the buffer, sort them by time and pass them
            to some (time ordered) processing
            >>>>>>
            >>>>>> - this seems to work well for streaming (unbounded)
            data
            >>>>>>
            >>>>>> - for batch (bounded) data the semantics of
            stateful ParDo should be (please correct me if I'm wrong)
            that elements always arrive in order, because the runner
            can sort them by timestamp
            >>>>>>
            >>>>>> - this implies that for batch processed input
            (bounded) the allowedLateness can be set to zero, so that
            the processing is little more effective, because it
            doesn't have to use the BagState at all
            >>>>>>
            >>>>>> - now, the trouble seems to be, that DirectRunner
            always uses streaming processing, even if the input is
            bounded (that is by definition possible), but there is no
            way now to know when it is possible to change allowed
            lateness to zero (because input will arrive ordered)
            >>>>>>
            >>>>>> - so - it seems to me, that either DirectRunner
            should apply sorting to stateful ParDo, when it processes
            bounded data (the same way that other runners do), or it
            can apply streaming processing, but then it should change
            PCollection.isBounded to UNBOUNDED, even if the input is
            originally bounded
            >>>>>>
            >>>>>> - that way, the semantics of PCollection.isBounded,
            would be not if the data are known in advance to be
            finite, but *how* the data are going to be processed,
            which is much more valuable (IMO)
            >>>>>>
            >>>>>> Any thoughts?
            >>>>>>
            >>>>>> Jan
            >>>>>>
            >

Reply via email to