Hi Reuven,

> It also gets awkward with Flatten - the sequence number is no longer enough, you must also encode which side of the flatten each element came from.

That is a generic need. Even if you read data from Kafka, the offsets are comparable only inside single partition. So, for Kafka to work as a FIFO for ordering, elements with same key have to be pushed to the same partition (otherwise Kafka cannot act as FIFO, because different partitions can be handled by different brokers, which means different observers and they therefore might not agree on the order of events). So if we want to emulate FIFO per key, then the sequence IDs have also be per key.

On 5/28/19 2:33 PM, Reuven Lax wrote:
Sequence metadata does have the disadvantage that users can no longer use the types coming from the source. You must create a new type that contains a sequence number (unless Beam provides this). It also gets awkward with Flatten - the sequence number is no longer enough, you must also encode which side of the flatten each element came from.

On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    As I understood it, Kenn was supporting the idea that sequence
    metadata
    is preferable over FIFO. I was trying to point out, that it even
    should
    provide the same functionally as FIFO, plus one important more -
    reproducibility and ability to being persisted and reused the same
    way
    in batch and streaming.

    There is no doubt, that sequence metadata can be stored in every
    storage. But, regarding some implicit ordering that sources might
    have -
    yes, of course, data written into HDFS or Cloud Storage has ordering,
    but only partial - inside some bulk (e.g. file) and the ordering
    is not
    defined correctly on boundaries of these bulks (between files).
    That is
    why I'd say, that ordering of sources is relevant only for
    (partitioned!) streaming sources and generally always reduces to
    sequence metadata (e.g. offsets).

    Jan

    On 5/28/19 11:43 AM, Robert Bradshaw wrote:
    > Huge +1 to all Kenn said.
    >
    > Jan, batch sources can have orderings too, just like Kafka. I think
    > it's reasonable (for both batch and streaming) that if a source
    has an
    > ordering that is an important part of the data, it should preserve
    > this ordering into the data itself (e.g. as sequence numbers,
    offsets,
    > etc.)
    >
    > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles
    <[email protected] <mailto:[email protected]>> wrote:
    >> I strongly prefer explicit sequence metadata over FIFO
    requirements, because:
    >>
    >>   - FIFO is complex to specify: for example Dataflow has "per
    stage key-to-key" FIFO today, but it is not guaranteed to remain
    so (plus "stage" is not a portable concept, nor even guaranteed to
    remain a Dataflow concept)
    >>   - complex specifications are by definition poor usability (if
    necessary, then it is what it is)
    >>   - overly restricts the runner, reduces parallelism, for
    example any non-stateful ParDo has per-element parallelism, not
    per "key"
    >>   - another perspective on that: FIFO makes everyone pay rather
    than just the transform that requires exactly sequencing
    >>   - previous implementation details like reshuffles become part
    of the model
    >>   - I'm not even convinced the use cases involved are addressed
    by some careful FIFO restrictions; many sinks re-key and they
    would all have to become aware of how keying of a sequence of
    "stages" affects the end-to-end FIFO
    >>
    >> A noop becoming a non-noop is essentially the mathematical
    definition of moving from higher-level to lower-level abstraction.
    >>
    >> So this strikes at the core question of what level of
    abstraction Beam aims to represent. Lower-level means there are
    fewer possible implementations and it is more tied to the
    underlying architecture, and anything not near-exact match pays a
    huge penalty. Higher-level means there are more implementations
    possible with different tradeoffs, though they may all pay a minor
    penalty.
    >>
    >> I could be convinced to change my mind, but it needs some
    extensive design, examples, etc. I think it is probably about the
    most consequential design decision in the whole Beam model, around
    the same level as the decision to use ParDo and GBK as the
    primitives IMO.
    >>
    >> Kenn
    >>
    >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:
    >>> Not really. I'm suggesting that some variant of FIFO ordering
    is necessary, which requires either runners natively support FIFO
    ordering or transforms adding some extra sequence number to each
    record to sort by.
    >>>
    >>> I still think your proposal is very useful by the way. I'm
    merely pointing out that to solve the state-machine problem we
    probably need something more.
    >>>
    >>> Reuven
    >>>
    >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:
    >>>> Hi,
    >>>> yes. It seems that ordering by user supplied UDF makes sense
    and I will update the design proposal accordingly.
    >>>> Would that solve the issues you mention?
    >>>> Jan
    >>>> ---------- Původní e-mail ----------
    >>>> Od: Reuven Lax <[email protected] <mailto:[email protected]>>
    >>>> Komu: dev <[email protected] <mailto:[email protected]>>
    >>>> Datum: 23. 5. 2019 18:44:38
    >>>> Předmět: Re: Definition of Unified model
    >>>>
    >>>> I'm simply saying that timestamp ordering is insufficient for
    state machines. I wasn't proposing Kafka as a solution - that was
    simply an example of how people solve this problem in other scenarios.
    >>>>
    >>>> BTW another example of ordering: Imagine today that you have
    a triggered Sum aggregation writing out to a key-value sink. In
    theory we provide no ordering, so the sink might write the
    triggered sums in the wrong order, ending up with an incorrect
    value in the sink. In this case you probably want values ordered
    by trigger pane index.
    >>>>
    >>>> Reuven
    >>>>
    >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:
    >>>>
    >>>> Hi Reuven,
    >>>> I share the view point of Robert. I think the isuue you refer
    to is not in reality related to timestamps, but to the fact, that
    ordering of events in time is observer dependent (either caused by
    relativity, or time skew, essentially this has the same
    consequences). And the resolution in fact isn't Kafka, but
    generally an authoritative observer, that tells you "I saw the
    events in this order". And you either have one (and have the
    outcome of his observation persisted in the data - e.g. as offset
    in Kafka partition), then you should be able to use it (maybe that
    suggests afterall that sorting by some user supplied UDF might
    make sense), or do not have it, and then any interpretation of the
    data seems to be equally valid. Although determinism is fine, of
    course.
    >>>> Jan
    >>>> ---------- Původní e-mail ----------
    >>>> Od: Reuven Lax <[email protected] <mailto:[email protected]>>
    >>>> Komu: dev <[email protected] <mailto:[email protected]>>
    >>>> Datum: 23. 5. 2019 17:39:12
    >>>> Předmět: Re: Definition of Unified model
    >>>>
    >>>> So an example would be elements of type "startUserSession"
    and "endUserSession" (website sessions, not Beam sessions).
    Logically you may need to process them in the correct order if you
    have any sort of state-machine logic. However timestamp ordering
    is never guaranteed to match the logical ordering. Not only might
    you have several elements with the same timestamp, but in reality
    time skew across backend servers can cause the events to have
    timestamps in reverse order of the actual causality order.
    >>>>
    >>>> People do solve this problem today though. Publish the events
    to Kafka, making sure that events for the same user end up in the
    same Kafka partition. This ensures that the events appear in the
    Kafka partitions in causality order, even if the timestamp order
    doesn't match. The your Kafka subscriber simply process the
    elements in each partition in order.
    >>>>
    >>>> I think the ability to impose FIFO causality ordering is
    what's needed for any state-machine work. Timestamp ordering has
    advantages (though often I think the advantage is in state), but
    does not solve this problem.
    >>>>
    >>>> Reuven
    >>>>
    >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw
    <[email protected] <mailto:[email protected]>> wrote:
    >>>>
    >>>> Good point.
    >>>>
    >>>> The "implementation-specific" way I would do this is
    >>>> window-by-instant, followed by a DoFn that gets all the
    elements with
    >>>> the same timestamp and sorts/acts accordingly, but this
    counts on the
    >>>> runner producing windows in timestamp order (likely?) and
    also the
    >>>> subsequent DoFn getting them in this order (also likely, due to
    >>>> fusion).
    >>>>
    >>>> One could make the argument that, though it does not provide
    >>>> deterministic behavior, getting elements of the same timestamp in
    >>>> different orders should produce equally valid interpretations
    of the
    >>>> data. (After all, due to relatively, timestamps are not
    technically
    >>>> well ordered across space.) I can see how data-dependent
    tiebreakers
    >>>> could be useful, or promises of preservation of order between
    >>>> operations.
    >>>>
    >>>> - Robert
    >>>>
    >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:
    >>>>> So Jan's example of state machines is quite a valid use case
    for ordering. However in my experience, timestamp ordering is
    insufficient for state machines. Elements that cause state
    transitions might come in with the exact same timestamp, yet still
    have a necessary ordering. Especially given Beam's decision to
    have milliseconds timestamps this is possible, but even at
    microsecond or nanosecond precision this can happen at scale. To
    handle state machines you usually need some sort of FIFO ordering
    along with an ordered sources, such as Kafka, not timestamp ordering.
    >>>>>
    >>>>> Reuven
    >>>>>
    >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský
    <[email protected] <mailto:[email protected]>> wrote:
    >>>>>> Hi all,
    >>>>>>
    >>>>>> thanks everyone for this discussion. I think I have
    gathered enough
    >>>>>> feedback to be able to put down a proposition for changes,
    which I will
    >>>>>> do and send to this list for further discussion. There are
    still doubts
    >>>>>> remaining the non-determinism and it's relation to outputs
    stability vs.
    >>>>>> latency. But I will try to clarify all this in the design
    document.
    >>>>>>
    >>>>>> Thanks,
    >>>>>>
    >>>>>>    Jan
    >>>>>>
    >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote:
    >>>>>>>> Someone from Flink might correct me if I'm wrong, but
    that's my
    >>>>>>>> current understanding.
    >>>>>>> In essence your description of how exactly-once works in
    Flink is
    >>>>>>> correct. The general assumption in Flink is that pipelines
    must be
    >>>>>>> deterministic and thus produce idempotent writes in the
    case of
    >>>>>>> failures. However, that doesn't mean Beam sinks can't
    guarantee a bit
    >>>>>>> more with what Flink has to offer.
    >>>>>>>
    >>>>>>> Luke already mentioned the design discussions for
    @RequiresStableInput
    >>>>>>> which ensures idempotent writes for non-deterministic
    pipelines. This
    >>>>>>> is not part of the model but an optional Beam feature.
    >>>>>>>
    >>>>>>> We recently implemented support for @RequiresStableInput
    in the Flink
    >>>>>>> Runner. Reuven mentioned the Flink checkpoint
    confirmation, which
    >>>>>>> allows us to buffer (and checkpoint) processed data and
    only emit it
    >>>>>>> once a Flink checkpoint has completed.
    >>>>>>>
    >>>>>>> Cheers,
    >>>>>>> Max
    >>>>>>>
    >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote:
    >>>>>>>> Hi,
    >>>>>>>>
    >>>>>>>>   > Actually, I think it is a larger (open) question
    whether exactly
    >>>>>>>> once is guaranteed by the model or whether runners are
    allowed to
    >>>>>>>> relax that. I would think, however, that sources correctly
    >>>>>>>> implemented should be idempotent when run atop an exactly
    once
    >>>>>>>> infrastructure such as Flink of Dataflow.
    >>>>>>>>
    >>>>>>>> I would assume, that the model basically inherits
    guarantees of
    >>>>>>>> underlying infrastructure. Because Flink does not work as you
    >>>>>>>> described (atomic commit of inputs, state and outputs),
    but rather a
    >>>>>>>> checkpoint mark is flowing through the DAG much like
    watermark and on
    >>>>>>>> failures operators are restored and data reprocessed, it
    (IMHO)
    >>>>>>>> implies, that you have exactly once everywhere in the DAG
    *but*
    >>>>>>>> sinks. That is because sinks cannot be restored to
    previous state,
    >>>>>>>> instead sinks are supposed to be idempotent in order for
    the exactly
    >>>>>>>> once to really work (or at least be able to commit outputs on
    >>>>>>>> checkpoint in sink). That implies that if you don't have
    sink that is
    >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline
    >>>>>>>> execution should be deterministic upon retries, otherwise
    shadow
    >>>>>>>> writes from failed paths of the pipeline might appear.
    >>>>>>>>
    >>>>>>>> Someone from Flink might correct me if I'm wrong, but
    that's my
    >>>>>>>> current understanding.
    >>>>>>>>
    >>>>>>>>   > Sounds like we should make this clearer.
    >>>>>>>>
    >>>>>>>> I meant that you are right that we must not in any
    thoughts we are
    >>>>>>>> having forget that streams are by definition
    out-of-order. That is
    >>>>>>>> property that we cannot change. But - that doesn't limit
    us from
    >>>>>>>> creating operator that presents the data to UDF as if the
    stream was
    >>>>>>>> ideally sorted. It can do that by introducing latency, of
    course.
    >>>>>>>>
    >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
    >>>>>>>>> Reza: One could provide something like this as a utility
    class, but
    >>>>>>>>> one downside is that it is not scale invariant. It
    requires a tuning
    >>>>>>>>> parameter that, if to small, won't mitigate the problem,
    but if to
    >>>>>>>>> big, greatly increases latency. (Possibly one could
    define a dynamic
    >>>>>>>>> session-like window to solve this though...) It also
    might be harder
    >>>>>>>>> for runners that *can* cheaply present stuff in
    timestamp order to
    >>>>>>>>> optimize. (That and, in practice, our annotation-style
    process methods
    >>>>>>>>> don't lend themselves to easy composition.) I think it
    could work in
    >>>>>>>>> specific cases though.
    >>>>>>>>>
    >>>>>>>>> More inline below.
    >>>>>>>>>
    >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský
    <[email protected] <mailto:[email protected]>> wrote:
    >>>>>>>>>> Hi Robert,
    >>>>>>>>>>
    >>>>>>>>>>    > Beam has an exactly-once model. If the data was
    consumed, state
    >>>>>>>>>> mutated, and outputs written downstream (these three
    are committed
    >>>>>>>>>> together atomically) it will not be replayed. That does
    not, of
    >>>>>>>>>> course,
    >>>>>>>>>> solve the non-determanism due to ordering (including
    the fact that two
    >>>>>>>>>> operations reading the same PCollection may view
    different ordering).
    >>>>>>>>>>
    >>>>>>>>>> I think what you describe is a property of a runner,
    not of the model,
    >>>>>>>>>> right? I think if I run my pipeline on Flink I will not
    get this
    >>>>>>>>>> atomicity, because although Flink uses also
    exactly-once model if
    >>>>>>>>>> might
    >>>>>>>>>> write outputs multiple times.
    >>>>>>>>> Actually, I think it is a larger (open) question whether
    exactly once
    >>>>>>>>> is guaranteed by the model or whether runners are
    allowed to relax
    >>>>>>>>> that. I would think, however, that sources correctly
    implemented
    >>>>>>>>> should be idempotent when run atop an exactly once
    infrastructure such
    >>>>>>>>> as Flink of Dataflow.
    >>>>>>>>>
    >>>>>>>>>>    > 1) Is it correct for a (Stateful)DoFn to assume
    elements are
    >>>>>>>>>> received
    >>>>>>>>>> in a specific order? In the current model, it is not.
    Being able to
    >>>>>>>>>> read, handle, and produced out-of-order data, including
    late data,
    >>>>>>>>>> is a
    >>>>>>>>>> pretty fundamental property of distributed systems.
    >>>>>>>>>>
    >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful
    ParDo should
    >>>>>>>>>> presume to receive elements in any order, but to
    _present_ it as
    >>>>>>>>>> such to
    >>>>>>>>>> the user @ProcessElement function.
    >>>>>>>>> Sounds like we should make this clearer.
    >>>>>>>>>
    >>>>>>>>>>    > 2) Given that some operations are easier (or
    possibly only
    >>>>>>>>>> possible)
    >>>>>>>>>> to write when operating on ordered data, and that
    different runners
    >>>>>>>>>> may
    >>>>>>>>>> have (significantly) cheaper ways to provide this
    ordering than can be
    >>>>>>>>>> done by the user themselves, should we elevate this to
    a property of
    >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a
    compelling
    >>>>>>>>>> argument can be made here that we should.
    >>>>>>>>>>
    >>>>>>>>>> +1
    >>>>>>>>>>
    >>>>>>>>>> Jan
    >>>>>>>>>>
    >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
    >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský
    <[email protected] <mailto:[email protected]>> wrote:
    >>>>>>>>>>>>     > I don't see batch vs. streaming as part of the
    model. One
    >>>>>>>>>>>> can have
    >>>>>>>>>>>> microbatch, or even a runner that alternates between
    different
    >>>>>>>>>>>> modes.
    >>>>>>>>>>>>
    >>>>>>>>>>>> Although I understand motivation of this statement,
    this project
    >>>>>>>>>>>> name is
    >>>>>>>>>>>> "Apache Beam: An advanced unified programming model".
    What does the
    >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of
    the model?
    >>>>>>>>>>> What I mean is that streaming vs. batch is no longer
    part of the
    >>>>>>>>>>> model
    >>>>>>>>>>> (or ideally API), but pushed down to be a concern of
    the runner
    >>>>>>>>>>> (executor) of the pipeline.
    >>>>>>>>>>>
    >>>>>>>>>>>
    >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský
    <[email protected] <mailto:[email protected]>>
    >>>>>>>>>>> wrote:
    >>>>>>>>>>>> Hi Kenn,
    >>>>>>>>>>>>
    >>>>>>>>>>>> OK, so if we introduce annotation, we can have
    stateful ParDo
    >>>>>>>>>>>> with sorting, that would perfectly resolve my issues.
    I still
    >>>>>>>>>>>> have some doubts, though. Let me explain. The current
    behavior of
    >>>>>>>>>>>> stateful ParDo has the following properties:
    >>>>>>>>>>>>
    >>>>>>>>>>>>     a) might fail in batch, although runs fine in
    streaming (that
    >>>>>>>>>>>> is due to the buffering, and unbounded lateness in
    batch, which
    >>>>>>>>>>>> was discussed back and forth in this thread)
    >>>>>>>>>>>>
    >>>>>>>>>>>>     b) might be non deterministic (this is because
    the elements
    >>>>>>>>>>>> arrive at somewhat random order, and even if you do
    the operation
    >>>>>>>>>>>> "assign unique ID to elements" this might produce
    different
    >>>>>>>>>>>> results when run multiple times)
    >>>>>>>>>>> PCollections are *explicitly* unordered. Any
    operations that
    >>>>>>>>>>> assume or
    >>>>>>>>>>> depend on a specific ordering for correctness (or
    determinism) must
    >>>>>>>>>>> provide that ordering themselves (i.e. tolerate
    "arbitrary shuffling
    >>>>>>>>>>> of inputs"). As you point out, that may be very
    expensive if you have
    >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew.
    >>>>>>>>>>>
    >>>>>>>>>>> StatefulDoFns are low-level operations that should be
    used with care;
    >>>>>>>>>>> the simpler windowing model gives determinism in the
    face of
    >>>>>>>>>>> unordered
    >>>>>>>>>>> data (though late data and non-end-of-window
    triggering introduces
    >>>>>>>>>>> some of the non-determanism back in).
    >>>>>>>>>>>
    >>>>>>>>>>>> What worries me most is the property b), because it
    seems to me
    >>>>>>>>>>>> to have serious consequences - not only that if you
    run twice
    >>>>>>>>>>>> batch pipeline you would get different results, but
    even on
    >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from
    >>>>>>>>>>>> checkpoint, produced output might differ from the
    previous run
    >>>>>>>>>>>> and data from the first run might have already been
    persisted
    >>>>>>>>>>>> into sink. That would create somewhat messy outputs.
    >>>>>>>>>>> Beam has an exactly-once model. If the data was
    consumed, state
    >>>>>>>>>>> mutated, and outputs written downstream (these three
    are committed
    >>>>>>>>>>> together atomically) it will not be replayed. That
    does not, of
    >>>>>>>>>>> course, solve the non-determanism due to ordering
    (including the fact
    >>>>>>>>>>> that two operations reading the same PCollection may
    view different
    >>>>>>>>>>> ordering).
    >>>>>>>>>>>
    >>>>>>>>>>>> These two properties makes me think that the current
    >>>>>>>>>>>> implementation is more of a _special case_ than the
    general one.
    >>>>>>>>>>>> The general one would be that your state doesn't have the
    >>>>>>>>>>>> properties to be able to tolerate buffering problems
    and/or
    >>>>>>>>>>>> non-determinism. Which is the case where you need
    sorting in both
    >>>>>>>>>>>> streaming and batch to be part of the model.
    >>>>>>>>>>>>
    >>>>>>>>>>>> Let me point out one more analogy - that is merging vs.
    >>>>>>>>>>>> non-merging windows. The general case (merging
    windows) implies
    >>>>>>>>>>>> sorting by timestamp in both batch case (explicit)
    and streaming
    >>>>>>>>>>>> (buffering). The special case (non-merging windows)
    doesn't rely
    >>>>>>>>>>>> on any timestamp ordering, so the sorting and
    buffering can be
    >>>>>>>>>>>> dropped. The underlying root cause of this is the
    same for both
    >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning
    window
    >>>>>>>>>>>> labels is a stateful operation when windowing
    function is merging).
    >>>>>>>>>>>>
    >>>>>>>>>>>> The reason for the current behavior of stateful ParDo
    seems to be
    >>>>>>>>>>>> performance, but is it right to abandon correctness
    in favor of
    >>>>>>>>>>>> performance? Wouldn't it be more consistent to have
    the default
    >>>>>>>>>>>> behavior prefer correctness and when you have the
    specific
    >>>>>>>>>>>> conditions of state function having special
    properties, then you
    >>>>>>>>>>>> can annotate your DoFn (with something like
    >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better
    performance in
    >>>>>>>>>>>> that case?
    >>>>>>>>>>> There are two separable questions here.
    >>>>>>>>>>>
    >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume
    elements are received
    >>>>>>>>>>> in a specific order? In the current model, it is not.
    Being able to
    >>>>>>>>>>> read, handle, and produced out-of-order data,
    including late data, is
    >>>>>>>>>>> a pretty fundamental property of distributed systems.
    >>>>>>>>>>>
    >>>>>>>>>>> 2) Given that some operations are easier (or possibly
    only possible)
    >>>>>>>>>>> to write when operating on ordered data, and that
    different runners
    >>>>>>>>>>> may have (significantly) cheaper ways to provide this
    ordering than
    >>>>>>>>>>> can be done by the user themselves, should we elevate
    this to a
    >>>>>>>>>>> property of (Stateful?)DoFns that the runner can
    provide? I think a
    >>>>>>>>>>> compelling argument can be made here that we should.
    >>>>>>>>>>>
    >>>>>>>>>>> - Robert
    >>>>>>>>>>>
    >>>>>>>>>>>
    >>>>>>>>>>>
    >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
    >>>>>>>>>>>>
    >>>>>>>>>>>> Thanks for the nice small example of a calculation
    that depends
    >>>>>>>>>>>> on order. You are right that many state machines have
    this
    >>>>>>>>>>>> property. I agree w/ you and Luke that it is
    convenient for batch
    >>>>>>>>>>>> processing to sort by event timestamp before running
    a stateful
    >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by
    event
    >>>>>>>>>>>> timestamp" by buffering until you know all earlier
    data will be
    >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness.
    >>>>>>>>>>>>
    >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in
    >>>>>>>>>>>> streaming. Many state machines diverge very rapidly
    when things
    >>>>>>>>>>>> are out of order. So each runner if they see the
    >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to
    deliver
    >>>>>>>>>>>> sorted data (by some mix of buffering and dropping),
    or to reject
    >>>>>>>>>>>> the pipeline as unsupported.
    >>>>>>>>>>>>
    >>>>>>>>>>>> And also want to say that this is not the default
    case - many
    >>>>>>>>>>>> uses of state & timers in ParDo yield different
    results at the
    >>>>>>>>>>>> element level, but the results are equivalent at in
    the big
    >>>>>>>>>>>> picture. Such as the example of "assign a unique
    sequence number
    >>>>>>>>>>>> to each element" or "group into batches" it doesn't
    matter
    >>>>>>>>>>>> exactly what the result is, only that it meets the
    spec. And
    >>>>>>>>>>>> other cases like user funnels are monotonic enough
    that you also
    >>>>>>>>>>>> don't actually need sorting.
    >>>>>>>>>>>>
    >>>>>>>>>>>> Kenn
    >>>>>>>>>>>>
    >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský
    <[email protected] <mailto:[email protected]>>
    >>>>>>>>>>>> wrote:
    >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you
    have not
    >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really
    not sure if
    >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might
    help to have
    >>>>>>>>>>>>> data driven trigger. There would still be some
    doubts, though.
    >>>>>>>>>>>>> The main question is still here - people say, that
    sorting by
    >>>>>>>>>>>>> timestamp before stateful ParDo would be
    prohibitively slow, but
    >>>>>>>>>>>>> I don't really see why - the sorting is very
    probably already
    >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted
    grouping),
    >>>>>>>>>>>>> then the sorting would affect only user defined
    StatefulParDos.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> This would suggest that the best way out of this
    would be really
    >>>>>>>>>>>>> to add annotation, so that the author of the
    pipeline can decide.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> If that would be acceptable I think I can try to
    prepare some
    >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be
    able to
    >>>>>>>>>>>>> cover all runners / sdks.
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> It is read all per key and window and not just read
    all (this
    >>>>>>>>>>>>> still won't scale with hot keys in the global
    window). The GBK
    >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are
    >>>>>>>>>>>>> processing all the values for a specific key and
    window at any
    >>>>>>>>>>>>> given time. Is there a specific window/trigger that
    is missing
    >>>>>>>>>>>>> that you feel would remove the need for you to use
    StatefulParDo?
    >>>>>>>>>>>>>
    >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský
    <[email protected] <mailto:[email protected]>>
    >>>>>>>>>>>>> wrote:
    >>>>>>>>>>>>>> Hi Lukasz,
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Today, if you must have a strict order, you must
    guarantee
    >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary
    "buffering &
    >>>>>>>>>>>>>>> sorting" into state.
    >>>>>>>>>>>>>> Yes, no problem with that. But this whole
    discussion started,
    >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply
    cannot first
    >>>>>>>>>>>>>> read everything from distributed storage and then
    buffer it all
    >>>>>>>>>>>>>> into memory, just to read it again, but sorted.
    That will not
    >>>>>>>>>>>>>> work. And even if it would, it would be a terrible
    waste of
    >>>>>>>>>>>>>> resources.
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> Jan
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský
    <[email protected] <mailto:[email protected]>>
    >>>>>>>>>>>>>> wrote:
    >>>>>>>>>>>>>>> This discussion brings many really interesting
    questions for
    >>>>>>>>>>>>>>> me. :-)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > I don't see batch vs. streaming as part of
    the model. One
    >>>>>>>>>>>>>>> can have
    >>>>>>>>>>>>>>> microbatch, or even a runner that alternates
    between different
    >>>>>>>>>>>>>>> modes.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Although I understand motivation of this
    statement, this
    >>>>>>>>>>>>>>> project name is
    >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming
    model". What
    >>>>>>>>>>>>>>> does the
    >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part
    of the model?
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or
    pure streaming
    >>>>>>>>>>>>>>> are
    >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I
    refer to.
    >>>>>>>>>>>>>>> All these
    >>>>>>>>>>>>>>> define several runtime parameters, which in turn
    define how
    >>>>>>>>>>>>>>> well/badly
    >>>>>>>>>>>>>>> will the pipeline perform and how many resources
    might be
    >>>>>>>>>>>>>>> needed. From
    >>>>>>>>>>>>>>> my point of view, pure streaming should be the
    most resource
    >>>>>>>>>>>>>>> demanding
    >>>>>>>>>>>>>>> (if not, why bother with batch? why not run
    everything in
    >>>>>>>>>>>>>>> streaming
    >>>>>>>>>>>>>>> only? what will there remain to "unify"?).
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > Fortunately, for batch, only the state for a
    single key
    >>>>>>>>>>>>>>> needs to be
    >>>>>>>>>>>>>>> preserved at a time, rather than the state for all
    keys across
    >>>>>>>>>>>>>>> the range
    >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys,
    one can still
    >>>>>>>>>>>>>>> have
    >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns).
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Yes, but here is still the presumption that my
    stateful DoFn can
    >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me
    explain the use
    >>>>>>>>>>>>>>> case in
    >>>>>>>>>>>>>>> more detail.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and
    0s (and
    >>>>>>>>>>>>>>> some key for
    >>>>>>>>>>>>>>> each element, which is irrelevant for the
    demonstration). Your
    >>>>>>>>>>>>>>> task is
    >>>>>>>>>>>>>>> to calculate in running global window the actual
    number of
    >>>>>>>>>>>>>>> changes
    >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When
    the state
    >>>>>>>>>>>>>>> doesn't
    >>>>>>>>>>>>>>> change, you don't calculate anything. If input
    (for given key)
    >>>>>>>>>>>>>>> would be
    >>>>>>>>>>>>>>> (tN denotes timestamp N):
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t1: 1
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t2: 0
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t3: 0
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t4: 1
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t5: 1
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t6: 0
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> then the output should yield (supposing that
    default state is
    >>>>>>>>>>>>>>> zero):
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t1: (one: 1, zero: 0)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t2: (one: 1, zero: 1)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t3: (one: 1, zero: 1)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t4: (one: 2, zero: 1)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t5: (one: 2, zero: 1)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>     t6: (one: 2, zero: 2)
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> How would you implement this in current Beam
    semantics?
    >>>>>>>>>>>>>> I think your saying here that I know that my input
    is ordered
    >>>>>>>>>>>>>> in a specific way and since I assume the order when
    writing my
    >>>>>>>>>>>>>> pipeline I can perform this optimization. But there
    is nothing
    >>>>>>>>>>>>>> preventing a runner from noticing that your
    processing in the
    >>>>>>>>>>>>>> global window with a specific type of trigger and
    re-ordering
    >>>>>>>>>>>>>> your inputs/processing to get better performance
    (since you
    >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your
    pipeline in
    >>>>>>>>>>>>>> streaming for the GlobalWindow).
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>> Today, if you must have a strict order, you must
    guarantee that
    >>>>>>>>>>>>>> your StatefulParDo implements the necessary
    "buffering &
    >>>>>>>>>>>>>> sorting" into state. I can see why you would want
    an annotation
    >>>>>>>>>>>>>> that says I must have timestamp ordered elements,
    since it
    >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier.
    StatefulParDo
    >>>>>>>>>>>>>> is a low-level function, it really is the "here you
    go and do
    >>>>>>>>>>>>>> whatever you need to but here be dragons" function
    while
    >>>>>>>>>>>>>> windowing and triggering is meant to keep many
    people from
    >>>>>>>>>>>>>> writing StatefulParDo in the first place.
    >>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > Pipelines that fail in the "worst case" batch
    scenario
    >>>>>>>>>>>>>>> are likely to
    >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when
    the watermark
    >>>>>>>>>>>>>>> falls
    >>>>>>>>>>>>>>> behind in streaming mode as well.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> But the worst case is defined by input of size
    (available
    >>>>>>>>>>>>>>> resources +
    >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have
    >>>>>>>>>>>>>>> finished, given
    >>>>>>>>>>>>>>> the right conditions.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > This might be reasonable, implemented by
    default by
    >>>>>>>>>>>>>>> buffering
    >>>>>>>>>>>>>>> everything and releasing elements as the watermark
    (+lateness)
    >>>>>>>>>>>>>>> advances,
    >>>>>>>>>>>>>>> but would likely lead to inefficient (though
    *maybe* easier to
    >>>>>>>>>>>>>>> reason
    >>>>>>>>>>>>>>> about) code.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because
    it would
    >>>>>>>>>>>>>>> have to
    >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will
    produce
    >>>>>>>>>>>>>>> correct results
    >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > Would it be roughly equivalent to GBK +
    FlatMap(lambda
    >>>>>>>>>>>>>>> (key, values):
    >>>>>>>>>>>>>>> [(key, value) for value in values])?
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in
    the trigger.
    >>>>>>>>>>>>>>> The trigger
    >>>>>>>>>>>>>>> should ideally fire as soon as watermark
    (+lateness) crosses
    >>>>>>>>>>>>>>> element
    >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this
    could be
    >>>>>>>>>>>>>>> somehow
    >>>>>>>>>>>>>>> emulated by fixed trigger each X millis.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>    > Or is the underlying desire just to be able
    to hint to
    >>>>>>>>>>>>>>> the runner
    >>>>>>>>>>>>>>> that the code may perform better (e.g. require
    less resources)
    >>>>>>>>>>>>>>> as skew
    >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff
    it's cheap)?
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming
    case as
    >>>>>>>>>>>>>>> well. That is
    >>>>>>>>>>>>>>> an imperative of the unified model. I think it is
    possible to
    >>>>>>>>>>>>>>> sort by
    >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all*
    batch
    >>>>>>>>>>>>>>> stateful pardos
    >>>>>>>>>>>>>>> without annotation), or introduce annotation, but
    then make
    >>>>>>>>>>>>>>> the same
    >>>>>>>>>>>>>>> guarantees for streaming case as well.
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> Jan
    >>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
    >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
    >>>>>>>>>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
    >>>>>>>>>>>>>>>>> Hi Robert,
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although
    no *explicit*
    >>>>>>>>>>>>>>>>> guarantees
    >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is
    *implicit*
    >>>>>>>>>>>>>>>>> ordering in
    >>>>>>>>>>>>>>>>> streaming case that is due to nature of the
    processing - the
    >>>>>>>>>>>>>>>>> difference
    >>>>>>>>>>>>>>>>> between watermark and timestamp of elements
    flowing through
    >>>>>>>>>>>>>>>>> the pipeline
    >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the
    >>>>>>>>>>>>>>>>> overbuffering
    >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch.
    >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a
    single key needs
    >>>>>>>>>>>>>>>> to be
    >>>>>>>>>>>>>>>> preserved at a time, rather than the state for
    all keys
    >>>>>>>>>>>>>>>> across the
    >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot
    keys, one can
    >>>>>>>>>>>>>>>> still
    >>>>>>>>>>>>>>>> have issues (and this is not specific to
    StatefulDoFns).
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions:
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    - the best and most natural seems to be
    extension of
    >>>>>>>>>>>>>>>>> the model, so
    >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming
    pipeline
    >>>>>>>>>>>>>>>>> executed in batch
    >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good
    runtime
    >>>>>>>>>>>>>>>>> characteristics as
    >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I
    really
    >>>>>>>>>>>>>>>>> don't think that
    >>>>>>>>>>>>>>>>> there are any conflicts with the current model,
    or that this
    >>>>>>>>>>>>>>>>> could
    >>>>>>>>>>>>>>>>> affect performance, because the required sorting
    (as pointed by
    >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during
    translation
    >>>>>>>>>>>>>>>>> of stateful
    >>>>>>>>>>>>>>>>> pardos. Also note that this definition only
    affects user
    >>>>>>>>>>>>>>>>> defined
    >>>>>>>>>>>>>>>>> stateful pardos
    >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the
    model. One can
    >>>>>>>>>>>>>>>> have
    >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
    >>>>>>>>>>>>>>>> different modes.
    >>>>>>>>>>>>>>>> The model describes what the valid outputs are
    given a
    >>>>>>>>>>>>>>>> (sometimes
    >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to
    define
    >>>>>>>>>>>>>>>> things like
    >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any
    >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to
    try and define
    >>>>>>>>>>>>>>>> (and
    >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable
    >>>>>>>>>>>>>>>> out-of-orderedness.
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch
    scenario are
    >>>>>>>>>>>>>>>> likely to
    >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when
    the watermark
    >>>>>>>>>>>>>>>> falls
    >>>>>>>>>>>>>>>> behind in streaming mode as well.
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    - another option would be to introduce
    annotation for
    >>>>>>>>>>>>>>>>> DoFns (e.g.
    >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would
    result in
    >>>>>>>>>>>>>>>>> the sorting
    >>>>>>>>>>>>>>>>> in batch case - but - this extension would have
    to ensure
    >>>>>>>>>>>>>>>>> the sorting in
    >>>>>>>>>>>>>>>>> streaming mode also - it would require
    definition of allowed
    >>>>>>>>>>>>>>>>> lateness,
    >>>>>>>>>>>>>>>>> and triggger (essentially similar to window)
    >>>>>>>>>>>>>>>> This might be reasonable, implemented by default
    by buffering
    >>>>>>>>>>>>>>>> everything and releasing elements as the
    watermark (+lateness)
    >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient
    (though
    >>>>>>>>>>>>>>>> *maybe* easier
    >>>>>>>>>>>>>>>> to reason about) code. Not sure about the
    semantics of
    >>>>>>>>>>>>>>>> triggering
    >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it
    be roughly
    >>>>>>>>>>>>>>>> equivalent
    >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key,
    value) for
    >>>>>>>>>>>>>>>> value in
    >>>>>>>>>>>>>>>> values])?
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to
    hint to the
    >>>>>>>>>>>>>>>> runner that
    >>>>>>>>>>>>>>>> the code may perform better (e.g. require less
    resources) as
    >>>>>>>>>>>>>>>> skew is
    >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's
    cheap)?
    >>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    - last option would be to introduce these
    "higher order
    >>>>>>>>>>>>>>>>> guarantees" in
    >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that
    seems to be the
    >>>>>>>>>>>>>>>>> worst
    >>>>>>>>>>>>>>>>> option to me
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> I see the first two options quite equally good,
    although the
    >>>>>>>>>>>>>>>>> letter one
    >>>>>>>>>>>>>>>>> is probably more time consuming to implement.
    But it would
    >>>>>>>>>>>>>>>>> bring
    >>>>>>>>>>>>>>>>> additional feature to streaming case as well.
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> Thanks for any thoughts.
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>    Jan
    >>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
    >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
    >>>>>>>>>>>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
    >>>>>>>>>>>>>>>>>>> Hi Reuven,
    >>>>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine
    in batch
    >>>>>>>>>>>>>>>>>>>> runners.
    >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the
    logic inside
    >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded
    out-of-orderness
    >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can
    work only
    >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements
    doesn't
    >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state
    machine", and any
    >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then
    the ordering
    >>>>>>>>>>>>>>>>>>> of elements would matter.
    >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either*
    streaming
    >>>>>>>>>>>>>>>>>> or batch
    >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that
    in order to
    >>>>>>>>>>>>>>>>>> make
    >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt
    to limit
    >>>>>>>>>>>>>>>>>> the amount of
    >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of
    event time vs.
    >>>>>>>>>>>>>>>>>> processing
    >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn
    could help
    >>>>>>>>>>>>>>>>>> cap the
    >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently,
    whereas a
    >>>>>>>>>>>>>>>>>> batch runner
    >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded
    until the whole
    >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future
    has been
    >>>>>>>>>>>>>>>>>> observed.
    >>>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved
    "batch to
    >>>>>>>>>>>>>>>>>> batch" in batch mode.
    >>>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
    >>>>>>>>>>>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
    >>>>>>>>>>>>>>>>>>
    >>>>>>>>>>>>>>>>>>>>      batch semantics and streaming semantics
    differs only
    >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default
    trigger on
    >>>>>>>>>>>>>>>>>>>> batch and cannot on stream
    >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with
    a default
    >>>>>>>>>>>>>>>>>>> trigger. You
    >>>>>>>>>>>>>>>>>>> could define additional triggers that do early
    firings.
    >>>>>>>>>>>>>>>>>>> And you could
    >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing
    the watermark
    >>>>>>>>>>>>>>>>>>> to +inf.
    >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global
    window with
    >>>>>>>>>>>>>>>>>> default
    >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK
    because this
    >>>>>>>>>>>>>>>>>> is more
    >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire
    to have no
    >>>>>>>>>>>>>>>>>> output until
    >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model.

Reply via email to