A slightly larger concern: it also will force users to create stateful
DoFns everywhere to generate these sequence numbers. If I have a ParDo that
is not a simple 1:1 transform (i.e. not MapElements), then the ParDo will
need to generate its own sequence numbers for ordering, and the only safe
way to do so is to use a stateful DoFn. This turns what used to be a simple
in-memory DoFn into one that has to access state. Also I believe many
runners will not fuse stateful DoFns. While none of this poses a problem
for the model, it could make ordering extremely expensive to achieve.

Reuven

On Tue, May 28, 2019 at 6:09 AM Jan Lukavský <[email protected]> wrote:

> Hi Reuven,
>
> > It also gets awkward with Flatten - the sequence number is no longer
> enough, you must also encode which side of the flatten each element came
> from.
>
> That is a generic need. Even if you read data from Kafka, the offsets are
> comparable only inside single partition. So, for Kafka to work as a FIFO
> for ordering, elements with same key have to be pushed to the same
> partition (otherwise Kafka cannot act as FIFO, because different partitions
> can be handled by different brokers, which means different observers and
> they therefore might not agree on the order of events). So if we want to
> emulate FIFO per key, then the sequence IDs have also be per key.
> On 5/28/19 2:33 PM, Reuven Lax wrote:
>
> Sequence metadata does have the disadvantage that users can no longer use
> the types coming from the source. You must create a new type that contains
> a sequence number (unless Beam provides this). It also gets awkward with
> Flatten - the sequence number is no longer enough, you must also encode
> which side of the flatten each element came from.
>
> On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <[email protected]> wrote:
>
>> As I understood it, Kenn was supporting the idea that sequence metadata
>> is preferable over FIFO. I was trying to point out, that it even should
>> provide the same functionally as FIFO, plus one important more -
>> reproducibility and ability to being persisted and reused the same way
>> in batch and streaming.
>>
>> There is no doubt, that sequence metadata can be stored in every
>> storage. But, regarding some implicit ordering that sources might have -
>> yes, of course, data written into HDFS or Cloud Storage has ordering,
>> but only partial - inside some bulk (e.g. file) and the ordering is not
>> defined correctly on boundaries of these bulks (between files). That is
>> why I'd say, that ordering of sources is relevant only for
>> (partitioned!) streaming sources and generally always reduces to
>> sequence metadata (e.g. offsets).
>>
>> Jan
>>
>> On 5/28/19 11:43 AM, Robert Bradshaw wrote:
>> > Huge +1 to all Kenn said.
>> >
>> > Jan, batch sources can have orderings too, just like Kafka. I think
>> > it's reasonable (for both batch and streaming) that if a source has an
>> > ordering that is an important part of the data, it should preserve
>> > this ordering into the data itself (e.g. as sequence numbers, offsets,
>> > etc.)
>> >
>> > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <[email protected]>
>> wrote:
>> >> I strongly prefer explicit sequence metadata over FIFO requirements,
>> because:
>> >>
>> >>   - FIFO is complex to specify: for example Dataflow has "per stage
>> key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage"
>> is not a portable concept, nor even guaranteed to remain a Dataflow concept)
>> >>   - complex specifications are by definition poor usability (if
>> necessary, then it is what it is)
>> >>   - overly restricts the runner, reduces parallelism, for example any
>> non-stateful ParDo has per-element parallelism, not per "key"
>> >>   - another perspective on that: FIFO makes everyone pay rather than
>> just the transform that requires exactly sequencing
>> >>   - previous implementation details like reshuffles become part of the
>> model
>> >>   - I'm not even convinced the use cases involved are addressed by
>> some careful FIFO restrictions; many sinks re-key and they would all have
>> to become aware of how keying of a sequence of "stages" affects the
>> end-to-end FIFO
>> >>
>> >> A noop becoming a non-noop is essentially the mathematical definition
>> of moving from higher-level to lower-level abstraction.
>> >>
>> >> So this strikes at the core question of what level of abstraction Beam
>> aims to represent. Lower-level means there are fewer possible
>> implementations and it is more tied to the underlying architecture, and
>> anything not near-exact match pays a huge penalty. Higher-level means there
>> are more implementations possible with different tradeoffs, though they may
>> all pay a minor penalty.
>> >>
>> >> I could be convinced to change my mind, but it needs some extensive
>> design, examples, etc. I think it is probably about the most consequential
>> design decision in the whole Beam model, around the same level as the
>> decision to use ParDo and GBK as the primitives IMO.
>> >>
>> >> Kenn
>> >>
>> >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <[email protected]> wrote:
>> >>> Not really. I'm suggesting that some variant of FIFO ordering is
>> necessary, which requires either runners natively support FIFO ordering or
>> transforms adding some extra sequence number to each record to sort by.
>> >>>
>> >>> I still think your proposal is very useful by the way. I'm merely
>> pointing out that to solve the state-machine problem we probably need
>> something more.
>> >>>
>> >>> Reuven
>> >>>
>> >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <[email protected]> wrote:
>> >>>> Hi,
>> >>>> yes. It seems that ordering by user supplied UDF makes sense and I
>> will update the design proposal accordingly.
>> >>>> Would that solve the issues you mention?
>> >>>> Jan
>> >>>> ---------- Původní e-mail ----------
>> >>>> Od: Reuven Lax <[email protected]>
>> >>>> Komu: dev <[email protected]>
>> >>>> Datum: 23. 5. 2019 18:44:38
>> >>>> Předmět: Re: Definition of Unified model
>> >>>>
>> >>>> I'm simply saying that timestamp ordering is insufficient for state
>> machines. I wasn't proposing Kafka as a solution - that was simply an
>> example of how people solve this problem in other scenarios.
>> >>>>
>> >>>> BTW another example of ordering: Imagine today that you have a
>> triggered Sum aggregation writing out to a key-value sink. In theory we
>> provide no ordering, so the sink might write the triggered sums in the
>> wrong order, ending up with an incorrect value in the sink. In this case
>> you probably want values ordered by trigger pane index.
>> >>>>
>> >>>> Reuven
>> >>>>
>> >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <[email protected]>
>> wrote:
>> >>>>
>> >>>> Hi Reuven,
>> >>>> I share the view point of Robert. I think the isuue you refer to is
>> not in reality related to timestamps, but to the fact, that ordering of
>> events in time is observer dependent (either caused by relativity, or time
>> skew, essentially this has the same consequences). And the resolution in
>> fact isn't Kafka, but generally an authoritative observer, that tells you
>> "I saw the events in this order". And you either have one (and have the
>> outcome of his observation persisted in the data - e.g. as offset in Kafka
>> partition), then you should be able to use it (maybe that suggests afterall
>> that sorting by some user supplied UDF might make sense), or do not have
>> it, and then any interpretation of the data seems to be equally valid.
>> Although determinism is fine, of course.
>> >>>> Jan
>> >>>> ---------- Původní e-mail ----------
>> >>>> Od: Reuven Lax <[email protected]>
>> >>>> Komu: dev <[email protected]>
>> >>>> Datum: 23. 5. 2019 17:39:12
>> >>>> Předmět: Re: Definition of Unified model
>> >>>>
>> >>>> So an example would be elements of type "startUserSession" and
>> "endUserSession" (website sessions, not Beam sessions). Logically you may
>> need to process them in the correct order if you have any sort of
>> state-machine logic. However timestamp ordering is never guaranteed to
>> match the logical ordering. Not only might you have several elements with
>> the same timestamp, but in reality time skew across backend servers can
>> cause the events to have timestamps in reverse order of the actual
>> causality order.
>> >>>>
>> >>>> People do solve this problem today though. Publish the events to
>> Kafka, making sure that events for the same user end up in the same Kafka
>> partition. This ensures that the events appear in the Kafka partitions in
>> causality order, even if the timestamp order doesn't match. The your Kafka
>> subscriber simply process the elements in each partition in order.
>> >>>>
>> >>>> I think the ability to impose FIFO causality ordering is what's
>> needed for any state-machine work. Timestamp ordering has advantages
>> (though often I think the advantage is in state), but does not solve this
>> problem.
>> >>>>
>> >>>> Reuven
>> >>>>
>> >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw <[email protected]>
>> wrote:
>> >>>>
>> >>>> Good point.
>> >>>>
>> >>>> The "implementation-specific" way I would do this is
>> >>>> window-by-instant, followed by a DoFn that gets all the elements with
>> >>>> the same timestamp and sorts/acts accordingly, but this counts on the
>> >>>> runner producing windows in timestamp order (likely?) and also the
>> >>>> subsequent DoFn getting them in this order (also likely, due to
>> >>>> fusion).
>> >>>>
>> >>>> One could make the argument that, though it does not provide
>> >>>> deterministic behavior, getting elements of the same timestamp in
>> >>>> different orders should produce equally valid interpretations of the
>> >>>> data. (After all, due to relatively, timestamps are not technically
>> >>>> well ordered across space.) I can see how data-dependent tiebreakers
>> >>>> could be useful, or promises of preservation of order between
>> >>>> operations.
>> >>>>
>> >>>> - Robert
>> >>>>
>> >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <[email protected]> wrote:
>> >>>>> So Jan's example of state machines is quite a valid use case for
>> ordering. However in my experience, timestamp ordering is insufficient for
>> state machines. Elements that cause state transitions might come in with
>> the exact same timestamp, yet still have a necessary ordering. Especially
>> given Beam's decision to have milliseconds timestamps this is possible, but
>> even at microsecond or nanosecond precision this can happen at scale. To
>> handle state machines you usually need some sort of FIFO ordering along
>> with an ordered sources, such as Kafka, not timestamp ordering.
>> >>>>>
>> >>>>> Reuven
>> >>>>>
>> >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <[email protected]>
>> wrote:
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> thanks everyone for this discussion. I think I have gathered enough
>> >>>>>> feedback to be able to put down a proposition for changes, which I
>> will
>> >>>>>> do and send to this list for further discussion. There are still
>> doubts
>> >>>>>> remaining the non-determinism and it's relation to outputs
>> stability vs.
>> >>>>>> latency. But I will try to clarify all this in the design document.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>>
>> >>>>>>    Jan
>> >>>>>>
>> >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote:
>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
>> >>>>>>>> current understanding.
>> >>>>>>> In essence your description of how exactly-once works in Flink is
>> >>>>>>> correct. The general assumption in Flink is that pipelines must be
>> >>>>>>> deterministic and thus produce idempotent writes in the case of
>> >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee a
>> bit
>> >>>>>>> more with what Flink has to offer.
>> >>>>>>>
>> >>>>>>> Luke already mentioned the design discussions for
>> @RequiresStableInput
>> >>>>>>> which ensures idempotent writes for non-deterministic pipelines.
>> This
>> >>>>>>> is not part of the model but an optional Beam feature.
>> >>>>>>>
>> >>>>>>> We recently implemented support for @RequiresStableInput in the
>> Flink
>> >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which
>> >>>>>>> allows us to buffer (and checkpoint) processed data and only emit
>> it
>> >>>>>>> once a Flink checkpoint has completed.
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Max
>> >>>>>>>
>> >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote:
>> >>>>>>>> Hi,
>> >>>>>>>>
>> >>>>>>>>   > Actually, I think it is a larger (open) question whether
>> exactly
>> >>>>>>>> once is guaranteed by the model or whether runners are allowed to
>> >>>>>>>> relax that. I would think, however, that sources correctly
>> >>>>>>>> implemented should be idempotent when run atop an exactly once
>> >>>>>>>> infrastructure such as Flink of Dataflow.
>> >>>>>>>>
>> >>>>>>>> I would assume, that the model basically inherits guarantees of
>> >>>>>>>> underlying infrastructure. Because Flink does not work as you
>> >>>>>>>> described (atomic commit of inputs, state and outputs), but
>> rather a
>> >>>>>>>> checkpoint mark is flowing through the DAG much like watermark
>> and on
>> >>>>>>>> failures operators are restored and data reprocessed, it (IMHO)
>> >>>>>>>> implies, that you have exactly once everywhere in the DAG *but*
>> >>>>>>>> sinks. That is because sinks cannot be restored to previous
>> state,
>> >>>>>>>> instead sinks are supposed to be idempotent in order for the
>> exactly
>> >>>>>>>> once to really work (or at least be able to commit outputs on
>> >>>>>>>> checkpoint in sink). That implies that if you don't have sink
>> that is
>> >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline
>> >>>>>>>> execution should be deterministic upon retries, otherwise shadow
>> >>>>>>>> writes from failed paths of the pipeline might appear.
>> >>>>>>>>
>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
>> >>>>>>>> current understanding.
>> >>>>>>>>
>> >>>>>>>>   > Sounds like we should make this clearer.
>> >>>>>>>>
>> >>>>>>>> I meant that you are right that we must not in any thoughts we
>> are
>> >>>>>>>> having forget that streams are by definition out-of-order. That
>> is
>> >>>>>>>> property that we cannot change. But - that doesn't limit us from
>> >>>>>>>> creating operator that presents the data to UDF as if the stream
>> was
>> >>>>>>>> ideally sorted. It can do that by introducing latency, of course.
>> >>>>>>>>
>> >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
>> >>>>>>>>> Reza: One could provide something like this as a utility class,
>> but
>> >>>>>>>>> one downside is that it is not scale invariant. It requires a
>> tuning
>> >>>>>>>>> parameter that, if to small, won't mitigate the problem, but if
>> to
>> >>>>>>>>> big, greatly increases latency. (Possibly one could define a
>> dynamic
>> >>>>>>>>> session-like window to solve this though...) It also might be
>> harder
>> >>>>>>>>> for runners that *can* cheaply present stuff in timestamp order
>> to
>> >>>>>>>>> optimize. (That and, in practice, our annotation-style process
>> methods
>> >>>>>>>>> don't lend themselves to easy composition.) I think it could
>> work in
>> >>>>>>>>> specific cases though.
>> >>>>>>>>>
>> >>>>>>>>> More inline below.
>> >>>>>>>>>
>> >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <[email protected]>
>> wrote:
>> >>>>>>>>>> Hi Robert,
>> >>>>>>>>>>
>> >>>>>>>>>>    > Beam has an exactly-once model. If the data was consumed,
>> state
>> >>>>>>>>>> mutated, and outputs written downstream (these three are
>> committed
>> >>>>>>>>>> together atomically) it will not be replayed. That does not, of
>> >>>>>>>>>> course,
>> >>>>>>>>>> solve the non-determanism due to ordering (including the fact
>> that two
>> >>>>>>>>>> operations reading the same PCollection may view different
>> ordering).
>> >>>>>>>>>>
>> >>>>>>>>>> I think what you describe is a property of a runner, not of
>> the model,
>> >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get
>> this
>> >>>>>>>>>> atomicity, because although Flink uses also exactly-once model
>> if
>> >>>>>>>>>> might
>> >>>>>>>>>> write outputs multiple times.
>> >>>>>>>>> Actually, I think it is a larger (open) question whether
>> exactly once
>> >>>>>>>>> is guaranteed by the model or whether runners are allowed to
>> relax
>> >>>>>>>>> that. I would think, however, that sources correctly implemented
>> >>>>>>>>> should be idempotent when run atop an exactly once
>> infrastructure such
>> >>>>>>>>> as Flink of Dataflow.
>> >>>>>>>>>
>> >>>>>>>>>>    > 1) Is it correct for a (Stateful)DoFn to assume elements
>> are
>> >>>>>>>>>> received
>> >>>>>>>>>> in a specific order? In the current model, it is not. Being
>> able to
>> >>>>>>>>>> read, handle, and produced out-of-order data, including late
>> data,
>> >>>>>>>>>> is a
>> >>>>>>>>>> pretty fundamental property of distributed systems.
>> >>>>>>>>>>
>> >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo
>> should
>> >>>>>>>>>> presume to receive elements in any order, but to _present_ it
>> as
>> >>>>>>>>>> such to
>> >>>>>>>>>> the user @ProcessElement function.
>> >>>>>>>>> Sounds like we should make this clearer.
>> >>>>>>>>>
>> >>>>>>>>>>    > 2) Given that some operations are easier (or possibly only
>> >>>>>>>>>> possible)
>> >>>>>>>>>> to write when operating on ordered data, and that different
>> runners
>> >>>>>>>>>> may
>> >>>>>>>>>> have (significantly) cheaper ways to provide this ordering
>> than can be
>> >>>>>>>>>> done by the user themselves, should we elevate this to a
>> property of
>> >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a
>> compelling
>> >>>>>>>>>> argument can be made here that we should.
>> >>>>>>>>>>
>> >>>>>>>>>> +1
>> >>>>>>>>>>
>> >>>>>>>>>> Jan
>> >>>>>>>>>>
>> >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
>> >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <[email protected]>
>> wrote:
>> >>>>>>>>>>>>     > I don't see batch vs. streaming as part of the model.
>> One
>> >>>>>>>>>>>> can have
>> >>>>>>>>>>>> microbatch, or even a runner that alternates between
>> different
>> >>>>>>>>>>>> modes.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Although I understand motivation of this statement, this
>> project
>> >>>>>>>>>>>> name is
>> >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What
>> does the
>> >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
>> model?
>> >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of
>> the
>> >>>>>>>>>>> model
>> >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the
>> runner
>> >>>>>>>>>>> (executor) of the pipeline.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <
>> [email protected]>
>> >>>>>>>>>>> wrote:
>> >>>>>>>>>>>> Hi Kenn,
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful ParDo
>> >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I still
>> >>>>>>>>>>>> have some doubts, though. Let me explain. The current
>> behavior of
>> >>>>>>>>>>>> stateful ParDo has the following properties:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>     a) might fail in batch, although runs fine in streaming
>> (that
>> >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch,
>> which
>> >>>>>>>>>>>> was discussed back and forth in this thread)
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>     b) might be non deterministic (this is because the
>> elements
>> >>>>>>>>>>>> arrive at somewhat random order, and even if you do the
>> operation
>> >>>>>>>>>>>> "assign unique ID to elements" this might produce different
>> >>>>>>>>>>>> results when run multiple times)
>> >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that
>> >>>>>>>>>>> assume or
>> >>>>>>>>>>> depend on a specific ordering for correctness (or
>> determinism) must
>> >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary
>> shuffling
>> >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if
>> you have
>> >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew.
>> >>>>>>>>>>>
>> >>>>>>>>>>> StatefulDoFns are low-level operations that should be used
>> with care;
>> >>>>>>>>>>> the simpler windowing model gives determinism in the face of
>> >>>>>>>>>>> unordered
>> >>>>>>>>>>> data (though late data and non-end-of-window triggering
>> introduces
>> >>>>>>>>>>> some of the non-determanism back in).
>> >>>>>>>>>>>
>> >>>>>>>>>>>> What worries me most is the property b), because it seems to
>> me
>> >>>>>>>>>>>> to have serious consequences - not only that if you run twice
>> >>>>>>>>>>>> batch pipeline you would get different results, but even on
>> >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from
>> >>>>>>>>>>>> checkpoint, produced output might differ from the previous
>> run
>> >>>>>>>>>>>> and data from the first run might have already been persisted
>> >>>>>>>>>>>> into sink. That would create somewhat messy outputs.
>> >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed,
>> state
>> >>>>>>>>>>> mutated, and outputs written downstream (these three are
>> committed
>> >>>>>>>>>>> together atomically) it will not be replayed. That does not,
>> of
>> >>>>>>>>>>> course, solve the non-determanism due to ordering (including
>> the fact
>> >>>>>>>>>>> that two operations reading the same PCollection may view
>> different
>> >>>>>>>>>>> ordering).
>> >>>>>>>>>>>
>> >>>>>>>>>>>> These two properties makes me think that the current
>> >>>>>>>>>>>> implementation is more of a _special case_ than the general
>> one.
>> >>>>>>>>>>>> The general one would be that your state doesn't have the
>> >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or
>> >>>>>>>>>>>> non-determinism. Which is the case where you need sorting in
>> both
>> >>>>>>>>>>>> streaming and batch to be part of the model.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Let me point out one more analogy - that is merging vs.
>> >>>>>>>>>>>> non-merging windows. The general case (merging windows)
>> implies
>> >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and
>> streaming
>> >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't
>> rely
>> >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can
>> be
>> >>>>>>>>>>>> dropped. The underlying root cause of this is the same for
>> both
>> >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window
>> >>>>>>>>>>>> labels is a stateful operation when windowing function is
>> merging).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems
>> to be
>> >>>>>>>>>>>> performance, but is it right to abandon correctness in favor
>> of
>> >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the
>> default
>> >>>>>>>>>>>> behavior prefer correctness and when you have the specific
>> >>>>>>>>>>>> conditions of state function having special properties, then
>> you
>> >>>>>>>>>>>> can annotate your DoFn (with something like
>> >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better
>> performance in
>> >>>>>>>>>>>> that case?
>> >>>>>>>>>>> There are two separable questions here.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are
>> received
>> >>>>>>>>>>> in a specific order? In the current model, it is not. Being
>> able to
>> >>>>>>>>>>> read, handle, and produced out-of-order data, including late
>> data, is
>> >>>>>>>>>>> a pretty fundamental property of distributed systems.
>> >>>>>>>>>>>
>> >>>>>>>>>>> 2) Given that some operations are easier (or possibly only
>> possible)
>> >>>>>>>>>>> to write when operating on ordered data, and that different
>> runners
>> >>>>>>>>>>> may have (significantly) cheaper ways to provide this
>> ordering than
>> >>>>>>>>>>> can be done by the user themselves, should we elevate this to
>> a
>> >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I
>> think a
>> >>>>>>>>>>> compelling argument can be made here that we should.
>> >>>>>>>>>>>
>> >>>>>>>>>>> - Robert
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Thanks for the nice small example of a calculation that
>> depends
>> >>>>>>>>>>>> on order. You are right that many state machines have this
>> >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for
>> batch
>> >>>>>>>>>>>> processing to sort by event timestamp before running a
>> stateful
>> >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event
>> >>>>>>>>>>>> timestamp" by buffering until you know all earlier data will
>> be
>> >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in
>> >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when
>> things
>> >>>>>>>>>>>> are out of order. So each runner if they see the
>> >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to deliver
>> >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to
>> reject
>> >>>>>>>>>>>> the pipeline as unsupported.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> And also want to say that this is not the default case - many
>> >>>>>>>>>>>> uses of state & timers in ParDo yield different results at
>> the
>> >>>>>>>>>>>> element level, but the results are equivalent at in the big
>> >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence
>> number
>> >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter
>> >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And
>> >>>>>>>>>>>> other cases like user funnels are monotonic enough that you
>> also
>> >>>>>>>>>>>> don't actually need sorting.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Kenn
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <
>> [email protected]>
>> >>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have
>> not
>> >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not
>> sure if
>> >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help to
>> have
>> >>>>>>>>>>>>> data driven trigger. There would still be some doubts,
>> though.
>> >>>>>>>>>>>>> The main question is still here - people say, that sorting
>> by
>> >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively
>> slow, but
>> >>>>>>>>>>>>> I don't really see why - the sorting is very probably
>> already
>> >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted
>> grouping),
>> >>>>>>>>>>>>> then the sorting would affect only user defined
>> StatefulParDos.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> This would suggest that the best way out of this would be
>> really
>> >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can
>> decide.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare
>> some
>> >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able to
>> >>>>>>>>>>>>> cover all runners / sdks.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> It is read all per key and window and not just read all
>> (this
>> >>>>>>>>>>>>> still won't scale with hot keys in the global window). The
>> GBK
>> >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are
>> >>>>>>>>>>>>> processing all the values for a specific key and window at
>> any
>> >>>>>>>>>>>>> given time. Is there a specific window/trigger that is
>> missing
>> >>>>>>>>>>>>> that you feel would remove the need for you to use
>> StatefulParDo?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <
>> [email protected]>
>> >>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>> Hi Lukasz,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee
>> >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary
>> "buffering &
>> >>>>>>>>>>>>>>> sorting" into state.
>> >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion
>> started,
>> >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot
>> first
>> >>>>>>>>>>>>>> read everything from distributed storage and then buffer
>> it all
>> >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will
>> not
>> >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste of
>> >>>>>>>>>>>>>> resources.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Jan
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <
>> [email protected]>
>> >>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>> This discussion brings many really interesting questions
>> for
>> >>>>>>>>>>>>>>> me. :-)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > I don't see batch vs. streaming as part of the
>> model. One
>> >>>>>>>>>>>>>>> can have
>> >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
>> different
>> >>>>>>>>>>>>>>> modes.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Although I understand motivation of this statement, this
>> >>>>>>>>>>>>>>> project name is
>> >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What
>> >>>>>>>>>>>>>>> does the
>> >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
>> model?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure
>> streaming
>> >>>>>>>>>>>>>>> are
>> >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer
>> to.
>> >>>>>>>>>>>>>>> All these
>> >>>>>>>>>>>>>>> define several runtime parameters, which in turn define
>> how
>> >>>>>>>>>>>>>>> well/badly
>> >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be
>> >>>>>>>>>>>>>>> needed. From
>> >>>>>>>>>>>>>>> my point of view, pure streaming should be the most
>> resource
>> >>>>>>>>>>>>>>> demanding
>> >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in
>> >>>>>>>>>>>>>>> streaming
>> >>>>>>>>>>>>>>> only? what will there remain to "unify"?).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > Fortunately, for batch, only the state for a single
>> key
>> >>>>>>>>>>>>>>> needs to be
>> >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
>> across
>> >>>>>>>>>>>>>>> the range
>> >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can
>> still
>> >>>>>>>>>>>>>>> have
>> >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful
>> DoFn can
>> >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain
>> the use
>> >>>>>>>>>>>>>>> case in
>> >>>>>>>>>>>>>>> more detail.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s (and
>> >>>>>>>>>>>>>>> some key for
>> >>>>>>>>>>>>>>> each element, which is irrelevant for the demonstration).
>> Your
>> >>>>>>>>>>>>>>> task is
>> >>>>>>>>>>>>>>> to calculate in running global window the actual number of
>> >>>>>>>>>>>>>>> changes
>> >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the state
>> >>>>>>>>>>>>>>> doesn't
>> >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for given
>> key)
>> >>>>>>>>>>>>>>> would be
>> >>>>>>>>>>>>>>> (tN denotes timestamp N):
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t1: 1
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t2: 0
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t3: 0
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t4: 1
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t5: 1
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t6: 0
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> then the output should yield (supposing that default
>> state is
>> >>>>>>>>>>>>>>> zero):
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t1: (one: 1, zero: 0)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t2: (one: 1, zero: 1)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t3: (one: 1, zero: 1)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t4: (one: 2, zero: 1)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t5: (one: 2, zero: 1)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>      t6: (one: 2, zero: 2)
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> How would you implement this in current Beam semantics?
>> >>>>>>>>>>>>>> I think your saying here that I know that my input is
>> ordered
>> >>>>>>>>>>>>>> in a specific way and since I assume the order when
>> writing my
>> >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is
>> nothing
>> >>>>>>>>>>>>>> preventing a runner from noticing that your processing in
>> the
>> >>>>>>>>>>>>>> global window with a specific type of trigger and
>> re-ordering
>> >>>>>>>>>>>>>> your inputs/processing to get better performance (since you
>> >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in
>> >>>>>>>>>>>>>> streaming for the GlobalWindow).
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Today, if you must have a strict order, you must guarantee
>> that
>> >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering &
>> >>>>>>>>>>>>>> sorting" into state. I can see why you would want an
>> annotation
>> >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it
>> >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier.
>> StatefulParDo
>> >>>>>>>>>>>>>> is a low-level function, it really is the "here you go and
>> do
>> >>>>>>>>>>>>>> whatever you need to but here be dragons" function while
>> >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from
>> >>>>>>>>>>>>>> writing StatefulParDo in the first place.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > Pipelines that fail in the "worst case" batch
>> scenario
>> >>>>>>>>>>>>>>> are likely to
>> >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
>> watermark
>> >>>>>>>>>>>>>>> falls
>> >>>>>>>>>>>>>>> behind in streaming mode as well.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> But the worst case is defined by input of size (available
>> >>>>>>>>>>>>>>> resources +
>> >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have
>> >>>>>>>>>>>>>>> finished, given
>> >>>>>>>>>>>>>>> the right conditions.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > This might be reasonable, implemented by default by
>> >>>>>>>>>>>>>>> buffering
>> >>>>>>>>>>>>>>> everything and releasing elements as the watermark
>> (+lateness)
>> >>>>>>>>>>>>>>> advances,
>> >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe*
>> easier to
>> >>>>>>>>>>>>>>> reason
>> >>>>>>>>>>>>>>> about) code.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it
>> would
>> >>>>>>>>>>>>>>> have to
>> >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce
>> >>>>>>>>>>>>>>> correct results
>> >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > Would it be roughly equivalent to GBK +
>> FlatMap(lambda
>> >>>>>>>>>>>>>>> (key, values):
>> >>>>>>>>>>>>>>> [(key, value) for value in values])?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the
>> trigger.
>> >>>>>>>>>>>>>>> The trigger
>> >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness)
>> crosses
>> >>>>>>>>>>>>>>> element
>> >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could
>> be
>> >>>>>>>>>>>>>>> somehow
>> >>>>>>>>>>>>>>> emulated by fixed trigger each X millis.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>     > Or is the underlying desire just to be able to hint
>> to
>> >>>>>>>>>>>>>>> the runner
>> >>>>>>>>>>>>>>> that the code may perform better (e.g. require less
>> resources)
>> >>>>>>>>>>>>>>> as skew
>> >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's
>> cheap)?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case as
>> >>>>>>>>>>>>>>> well. That is
>> >>>>>>>>>>>>>>> an imperative of the unified model. I think it is
>> possible to
>> >>>>>>>>>>>>>>> sort by
>> >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch
>> >>>>>>>>>>>>>>> stateful pardos
>> >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then
>> make
>> >>>>>>>>>>>>>>> the same
>> >>>>>>>>>>>>>>> guarantees for streaming case as well.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Jan
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
>> >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
>> >>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>>>>>>>>> Hi Robert,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no
>> *explicit*
>> >>>>>>>>>>>>>>>>> guarantees
>> >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is
>> *implicit*
>> >>>>>>>>>>>>>>>>> ordering in
>> >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing
>> - the
>> >>>>>>>>>>>>>>>>> difference
>> >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing
>> through
>> >>>>>>>>>>>>>>>>> the pipeline
>> >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the
>> >>>>>>>>>>>>>>>>> overbuffering
>> >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch.
>> >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key
>> needs
>> >>>>>>>>>>>>>>>> to be
>> >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
>> >>>>>>>>>>>>>>>> across the
>> >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys,
>> one can
>> >>>>>>>>>>>>>>>> still
>> >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns).
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>       - the best and most natural seems to be extension
>> of
>> >>>>>>>>>>>>>>>>> the model, so
>> >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline
>> >>>>>>>>>>>>>>>>> executed in batch
>> >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime
>> >>>>>>>>>>>>>>>>> characteristics as
>> >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really
>> >>>>>>>>>>>>>>>>> don't think that
>> >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or that
>> this
>> >>>>>>>>>>>>>>>>> could
>> >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as
>> pointed by
>> >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during
>> translation
>> >>>>>>>>>>>>>>>>> of stateful
>> >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects user
>> >>>>>>>>>>>>>>>>> defined
>> >>>>>>>>>>>>>>>>> stateful pardos
>> >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model.
>> One can
>> >>>>>>>>>>>>>>>> have
>> >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
>> >>>>>>>>>>>>>>>> different modes.
>> >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a
>> >>>>>>>>>>>>>>>> (sometimes
>> >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define
>> >>>>>>>>>>>>>>>> things like
>> >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any
>> >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and
>> define
>> >>>>>>>>>>>>>>>> (and
>> >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable
>> >>>>>>>>>>>>>>>> out-of-orderedness.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario
>> are
>> >>>>>>>>>>>>>>>> likely to
>> >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
>> watermark
>> >>>>>>>>>>>>>>>> falls
>> >>>>>>>>>>>>>>>> behind in streaming mode as well.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>       - another option would be to introduce annotation
>> for
>> >>>>>>>>>>>>>>>>> DoFns (e.g.
>> >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would result
>> in
>> >>>>>>>>>>>>>>>>> the sorting
>> >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to
>> ensure
>> >>>>>>>>>>>>>>>>> the sorting in
>> >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of
>> allowed
>> >>>>>>>>>>>>>>>>> lateness,
>> >>>>>>>>>>>>>>>>> and triggger (essentially similar to window)
>> >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by
>> buffering
>> >>>>>>>>>>>>>>>> everything and releasing elements as the watermark
>> (+lateness)
>> >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though
>> >>>>>>>>>>>>>>>> *maybe* easier
>> >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of
>> >>>>>>>>>>>>>>>> triggering
>> >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be
>> roughly
>> >>>>>>>>>>>>>>>> equivalent
>> >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for
>> >>>>>>>>>>>>>>>> value in
>> >>>>>>>>>>>>>>>> values])?
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to
>> the
>> >>>>>>>>>>>>>>>> runner that
>> >>>>>>>>>>>>>>>> the code may perform better (e.g. require less
>> resources) as
>> >>>>>>>>>>>>>>>> skew is
>> >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's cheap)?
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>       - last option would be to introduce these "higher
>> order
>> >>>>>>>>>>>>>>>>> guarantees" in
>> >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to
>> be the
>> >>>>>>>>>>>>>>>>> worst
>> >>>>>>>>>>>>>>>>> option to me
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I see the first two options quite equally good,
>> although the
>> >>>>>>>>>>>>>>>>> letter one
>> >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it
>> would
>> >>>>>>>>>>>>>>>>> bring
>> >>>>>>>>>>>>>>>>> additional feature to streaming case as well.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Thanks for any thoughts.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>       Jan
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>> >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
>> >>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>>>>>>>>>>> Hi Reuven,
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch
>> >>>>>>>>>>>>>>>>>>>> runners.
>> >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic
>> inside
>> >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded
>> out-of-orderness
>> >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work
>> only
>> >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't
>> >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine",
>> and any
>> >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the
>> ordering
>> >>>>>>>>>>>>>>>>>>> of elements would matter.
>> >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either*
>> streaming
>> >>>>>>>>>>>>>>>>>> or batch
>> >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in
>> order to
>> >>>>>>>>>>>>>>>>>> make
>> >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to
>> limit
>> >>>>>>>>>>>>>>>>>> the amount of
>> >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event time
>> vs.
>> >>>>>>>>>>>>>>>>>> processing
>> >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could
>> help
>> >>>>>>>>>>>>>>>>>> cap the
>> >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently,
>> whereas a
>> >>>>>>>>>>>>>>>>>> batch runner
>> >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until
>> the whole
>> >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has been
>> >>>>>>>>>>>>>>>>>> observed.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to
>> >>>>>>>>>>>>>>>>>> batch" in batch mode.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
>> >>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>       batch semantics and streaming semantics
>> differs only
>> >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default trigger
>> on
>> >>>>>>>>>>>>>>>>>>>> batch and cannot on stream
>> >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a
>> default
>> >>>>>>>>>>>>>>>>>>> trigger. You
>> >>>>>>>>>>>>>>>>>>> could define additional triggers that do early
>> firings.
>> >>>>>>>>>>>>>>>>>>> And you could
>> >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the
>> watermark
>> >>>>>>>>>>>>>>>>>>> to +inf.
>> >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global window
>> with
>> >>>>>>>>>>>>>>>>>> default
>> >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because
>> this
>> >>>>>>>>>>>>>>>>>> is more
>> >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have
>> no
>> >>>>>>>>>>>>>>>>>> output until
>> >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model.
>>
>>

Reply via email to