Expanding the dimensionality could be the basis for loops within the graph
since loops could be modeled as (time, loop iteration #, nested loop
iteration #, nested nested loop iteration #, ...)

On Tue, May 28, 2019 at 12:10 PM Jan Lukavský <je...@seznam.cz> wrote:

> Could this be solved by "expanding the dimensionality" of time field? What
> I mean by that - if input element to to FlatMap has sequence number T, then
> the (stateless) FlatMap knows the ordering of output elements, right? If it
> would expand the field by which it will next sort the elements to (X, 1),
> (X, 2), ... (X, N), then it would be possible to sort the elements back
> later. There seems to be no need for state to achieve that, or?
>
> Jan
> On 5/28/19 6:52 PM, Reuven Lax wrote:
>
> A slightly larger concern: it also will force users to create stateful
> DoFns everywhere to generate these sequence numbers. If I have a ParDo that
> is not a simple 1:1 transform (i.e. not MapElements), then the ParDo will
> need to generate its own sequence numbers for ordering, and the only safe
> way to do so is to use a stateful DoFn. This turns what used to be a simple
> in-memory DoFn into one that has to access state. Also I believe many
> runners will not fuse stateful DoFns. While none of this poses a problem
> for the model, it could make ordering extremely expensive to achieve.
>
> Reuven
>
> On Tue, May 28, 2019 at 6:09 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Reuven,
>>
>> > It also gets awkward with Flatten - the sequence number is no longer
>> enough, you must also encode which side of the flatten each element came
>> from.
>>
>> That is a generic need. Even if you read data from Kafka, the offsets are
>> comparable only inside single partition. So, for Kafka to work as a FIFO
>> for ordering, elements with same key have to be pushed to the same
>> partition (otherwise Kafka cannot act as FIFO, because different partitions
>> can be handled by different brokers, which means different observers and
>> they therefore might not agree on the order of events). So if we want to
>> emulate FIFO per key, then the sequence IDs have also be per key.
>> On 5/28/19 2:33 PM, Reuven Lax wrote:
>>
>> Sequence metadata does have the disadvantage that users can no longer use
>> the types coming from the source. You must create a new type that contains
>> a sequence number (unless Beam provides this). It also gets awkward with
>> Flatten - the sequence number is no longer enough, you must also encode
>> which side of the flatten each element came from.
>>
>> On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> As I understood it, Kenn was supporting the idea that sequence metadata
>>> is preferable over FIFO. I was trying to point out, that it even should
>>> provide the same functionally as FIFO, plus one important more -
>>> reproducibility and ability to being persisted and reused the same way
>>> in batch and streaming.
>>>
>>> There is no doubt, that sequence metadata can be stored in every
>>> storage. But, regarding some implicit ordering that sources might have -
>>> yes, of course, data written into HDFS or Cloud Storage has ordering,
>>> but only partial - inside some bulk (e.g. file) and the ordering is not
>>> defined correctly on boundaries of these bulks (between files). That is
>>> why I'd say, that ordering of sources is relevant only for
>>> (partitioned!) streaming sources and generally always reduces to
>>> sequence metadata (e.g. offsets).
>>>
>>> Jan
>>>
>>> On 5/28/19 11:43 AM, Robert Bradshaw wrote:
>>> > Huge +1 to all Kenn said.
>>> >
>>> > Jan, batch sources can have orderings too, just like Kafka. I think
>>> > it's reasonable (for both batch and streaming) that if a source has an
>>> > ordering that is an important part of the data, it should preserve
>>> > this ordering into the data itself (e.g. as sequence numbers, offsets,
>>> > etc.)
>>> >
>>> > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>> >> I strongly prefer explicit sequence metadata over FIFO requirements,
>>> because:
>>> >>
>>> >>   - FIFO is complex to specify: for example Dataflow has "per stage
>>> key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage"
>>> is not a portable concept, nor even guaranteed to remain a Dataflow concept)
>>> >>   - complex specifications are by definition poor usability (if
>>> necessary, then it is what it is)
>>> >>   - overly restricts the runner, reduces parallelism, for example any
>>> non-stateful ParDo has per-element parallelism, not per "key"
>>> >>   - another perspective on that: FIFO makes everyone pay rather than
>>> just the transform that requires exactly sequencing
>>> >>   - previous implementation details like reshuffles become part of
>>> the model
>>> >>   - I'm not even convinced the use cases involved are addressed by
>>> some careful FIFO restrictions; many sinks re-key and they would all have
>>> to become aware of how keying of a sequence of "stages" affects the
>>> end-to-end FIFO
>>> >>
>>> >> A noop becoming a non-noop is essentially the mathematical definition
>>> of moving from higher-level to lower-level abstraction.
>>> >>
>>> >> So this strikes at the core question of what level of abstraction
>>> Beam aims to represent. Lower-level means there are fewer possible
>>> implementations and it is more tied to the underlying architecture, and
>>> anything not near-exact match pays a huge penalty. Higher-level means there
>>> are more implementations possible with different tradeoffs, though they may
>>> all pay a minor penalty.
>>> >>
>>> >> I could be convinced to change my mind, but it needs some extensive
>>> design, examples, etc. I think it is probably about the most consequential
>>> design decision in the whole Beam model, around the same level as the
>>> decision to use ParDo and GBK as the primitives IMO.
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <re...@google.com> wrote:
>>> >>> Not really. I'm suggesting that some variant of FIFO ordering is
>>> necessary, which requires either runners natively support FIFO ordering or
>>> transforms adding some extra sequence number to each record to sort by.
>>> >>>
>>> >>> I still think your proposal is very useful by the way. I'm merely
>>> pointing out that to solve the state-machine problem we probably need
>>> something more.
>>> >>>
>>> >>> Reuven
>>> >>>
>>> >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> >>>> Hi,
>>> >>>> yes. It seems that ordering by user supplied UDF makes sense and I
>>> will update the design proposal accordingly.
>>> >>>> Would that solve the issues you mention?
>>> >>>> Jan
>>> >>>> ---------- Původní e-mail ----------
>>> >>>> Od: Reuven Lax <re...@google.com>
>>> >>>> Komu: dev <dev@beam.apache.org>
>>> >>>> Datum: 23. 5. 2019 18:44:38
>>> >>>> Předmět: Re: Definition of Unified model
>>> >>>>
>>> >>>> I'm simply saying that timestamp ordering is insufficient for state
>>> machines. I wasn't proposing Kafka as a solution - that was simply an
>>> example of how people solve this problem in other scenarios.
>>> >>>>
>>> >>>> BTW another example of ordering: Imagine today that you have a
>>> triggered Sum aggregation writing out to a key-value sink. In theory we
>>> provide no ordering, so the sink might write the triggered sums in the
>>> wrong order, ending up with an incorrect value in the sink. In this case
>>> you probably want values ordered by trigger pane index.
>>> >>>>
>>> >>>> Reuven
>>> >>>>
>>> >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> >>>>
>>> >>>> Hi Reuven,
>>> >>>> I share the view point of Robert. I think the isuue you refer to is
>>> not in reality related to timestamps, but to the fact, that ordering of
>>> events in time is observer dependent (either caused by relativity, or time
>>> skew, essentially this has the same consequences). And the resolution in
>>> fact isn't Kafka, but generally an authoritative observer, that tells you
>>> "I saw the events in this order". And you either have one (and have the
>>> outcome of his observation persisted in the data - e.g. as offset in Kafka
>>> partition), then you should be able to use it (maybe that suggests afterall
>>> that sorting by some user supplied UDF might make sense), or do not have
>>> it, and then any interpretation of the data seems to be equally valid.
>>> Although determinism is fine, of course.
>>> >>>> Jan
>>> >>>> ---------- Původní e-mail ----------
>>> >>>> Od: Reuven Lax <re...@google.com>
>>> >>>> Komu: dev <dev@beam.apache.org>
>>> >>>> Datum: 23. 5. 2019 17:39:12
>>> >>>> Předmět: Re: Definition of Unified model
>>> >>>>
>>> >>>> So an example would be elements of type "startUserSession" and
>>> "endUserSession" (website sessions, not Beam sessions). Logically you may
>>> need to process them in the correct order if you have any sort of
>>> state-machine logic. However timestamp ordering is never guaranteed to
>>> match the logical ordering. Not only might you have several elements with
>>> the same timestamp, but in reality time skew across backend servers can
>>> cause the events to have timestamps in reverse order of the actual
>>> causality order.
>>> >>>>
>>> >>>> People do solve this problem today though. Publish the events to
>>> Kafka, making sure that events for the same user end up in the same Kafka
>>> partition. This ensures that the events appear in the Kafka partitions in
>>> causality order, even if the timestamp order doesn't match. The your Kafka
>>> subscriber simply process the elements in each partition in order.
>>> >>>>
>>> >>>> I think the ability to impose FIFO causality ordering is what's
>>> needed for any state-machine work. Timestamp ordering has advantages
>>> (though often I think the advantage is in state), but does not solve this
>>> problem.
>>> >>>>
>>> >>>> Reuven
>>> >>>>
>>> >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >>>>
>>> >>>> Good point.
>>> >>>>
>>> >>>> The "implementation-specific" way I would do this is
>>> >>>> window-by-instant, followed by a DoFn that gets all the elements
>>> with
>>> >>>> the same timestamp and sorts/acts accordingly, but this counts on
>>> the
>>> >>>> runner producing windows in timestamp order (likely?) and also the
>>> >>>> subsequent DoFn getting them in this order (also likely, due to
>>> >>>> fusion).
>>> >>>>
>>> >>>> One could make the argument that, though it does not provide
>>> >>>> deterministic behavior, getting elements of the same timestamp in
>>> >>>> different orders should produce equally valid interpretations of the
>>> >>>> data. (After all, due to relatively, timestamps are not technically
>>> >>>> well ordered across space.) I can see how data-dependent tiebreakers
>>> >>>> could be useful, or promises of preservation of order between
>>> >>>> operations.
>>> >>>>
>>> >>>> - Robert
>>> >>>>
>>> >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <re...@google.com>
>>> wrote:
>>> >>>>> So Jan's example of state machines is quite a valid use case for
>>> ordering. However in my experience, timestamp ordering is insufficient for
>>> state machines. Elements that cause state transitions might come in with
>>> the exact same timestamp, yet still have a necessary ordering. Especially
>>> given Beam's decision to have milliseconds timestamps this is possible, but
>>> even at microsecond or nanosecond precision this can happen at scale. To
>>> handle state machines you usually need some sort of FIFO ordering along
>>> with an ordered sources, such as Kafka, not timestamp ordering.
>>> >>>>>
>>> >>>>> Reuven
>>> >>>>>
>>> >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> >>>>>> Hi all,
>>> >>>>>>
>>> >>>>>> thanks everyone for this discussion. I think I have gathered
>>> enough
>>> >>>>>> feedback to be able to put down a proposition for changes, which
>>> I will
>>> >>>>>> do and send to this list for further discussion. There are still
>>> doubts
>>> >>>>>> remaining the non-determinism and it's relation to outputs
>>> stability vs.
>>> >>>>>> latency. But I will try to clarify all this in the design
>>> document.
>>> >>>>>>
>>> >>>>>> Thanks,
>>> >>>>>>
>>> >>>>>>    Jan
>>> >>>>>>
>>> >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote:
>>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
>>> >>>>>>>> current understanding.
>>> >>>>>>> In essence your description of how exactly-once works in Flink is
>>> >>>>>>> correct. The general assumption in Flink is that pipelines must
>>> be
>>> >>>>>>> deterministic and thus produce idempotent writes in the case of
>>> >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee
>>> a bit
>>> >>>>>>> more with what Flink has to offer.
>>> >>>>>>>
>>> >>>>>>> Luke already mentioned the design discussions for
>>> @RequiresStableInput
>>> >>>>>>> which ensures idempotent writes for non-deterministic pipelines.
>>> This
>>> >>>>>>> is not part of the model but an optional Beam feature.
>>> >>>>>>>
>>> >>>>>>> We recently implemented support for @RequiresStableInput in the
>>> Flink
>>> >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which
>>> >>>>>>> allows us to buffer (and checkpoint) processed data and only
>>> emit it
>>> >>>>>>> once a Flink checkpoint has completed.
>>> >>>>>>>
>>> >>>>>>> Cheers,
>>> >>>>>>> Max
>>> >>>>>>>
>>> >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote:
>>> >>>>>>>> Hi,
>>> >>>>>>>>
>>> >>>>>>>>   > Actually, I think it is a larger (open) question whether
>>> exactly
>>> >>>>>>>> once is guaranteed by the model or whether runners are allowed
>>> to
>>> >>>>>>>> relax that. I would think, however, that sources correctly
>>> >>>>>>>> implemented should be idempotent when run atop an exactly once
>>> >>>>>>>> infrastructure such as Flink of Dataflow.
>>> >>>>>>>>
>>> >>>>>>>> I would assume, that the model basically inherits guarantees of
>>> >>>>>>>> underlying infrastructure. Because Flink does not work as you
>>> >>>>>>>> described (atomic commit of inputs, state and outputs), but
>>> rather a
>>> >>>>>>>> checkpoint mark is flowing through the DAG much like watermark
>>> and on
>>> >>>>>>>> failures operators are restored and data reprocessed, it (IMHO)
>>> >>>>>>>> implies, that you have exactly once everywhere in the DAG *but*
>>> >>>>>>>> sinks. That is because sinks cannot be restored to previous
>>> state,
>>> >>>>>>>> instead sinks are supposed to be idempotent in order for the
>>> exactly
>>> >>>>>>>> once to really work (or at least be able to commit outputs on
>>> >>>>>>>> checkpoint in sink). That implies that if you don't have sink
>>> that is
>>> >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline
>>> >>>>>>>> execution should be deterministic upon retries, otherwise shadow
>>> >>>>>>>> writes from failed paths of the pipeline might appear.
>>> >>>>>>>>
>>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my
>>> >>>>>>>> current understanding.
>>> >>>>>>>>
>>> >>>>>>>>   > Sounds like we should make this clearer.
>>> >>>>>>>>
>>> >>>>>>>> I meant that you are right that we must not in any thoughts we
>>> are
>>> >>>>>>>> having forget that streams are by definition out-of-order. That
>>> is
>>> >>>>>>>> property that we cannot change. But - that doesn't limit us from
>>> >>>>>>>> creating operator that presents the data to UDF as if the
>>> stream was
>>> >>>>>>>> ideally sorted. It can do that by introducing latency, of
>>> course.
>>> >>>>>>>>
>>> >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote:
>>> >>>>>>>>> Reza: One could provide something like this as a utility
>>> class, but
>>> >>>>>>>>> one downside is that it is not scale invariant. It requires a
>>> tuning
>>> >>>>>>>>> parameter that, if to small, won't mitigate the problem, but
>>> if to
>>> >>>>>>>>> big, greatly increases latency. (Possibly one could define a
>>> dynamic
>>> >>>>>>>>> session-like window to solve this though...) It also might be
>>> harder
>>> >>>>>>>>> for runners that *can* cheaply present stuff in timestamp
>>> order to
>>> >>>>>>>>> optimize. (That and, in practice, our annotation-style process
>>> methods
>>> >>>>>>>>> don't lend themselves to easy composition.) I think it could
>>> work in
>>> >>>>>>>>> specific cases though.
>>> >>>>>>>>>
>>> >>>>>>>>> More inline below.
>>> >>>>>>>>>
>>> >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz>
>>> wrote:
>>> >>>>>>>>>> Hi Robert,
>>> >>>>>>>>>>
>>> >>>>>>>>>>    > Beam has an exactly-once model. If the data was
>>> consumed, state
>>> >>>>>>>>>> mutated, and outputs written downstream (these three are
>>> committed
>>> >>>>>>>>>> together atomically) it will not be replayed. That does not,
>>> of
>>> >>>>>>>>>> course,
>>> >>>>>>>>>> solve the non-determanism due to ordering (including the fact
>>> that two
>>> >>>>>>>>>> operations reading the same PCollection may view different
>>> ordering).
>>> >>>>>>>>>>
>>> >>>>>>>>>> I think what you describe is a property of a runner, not of
>>> the model,
>>> >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get
>>> this
>>> >>>>>>>>>> atomicity, because although Flink uses also exactly-once
>>> model if
>>> >>>>>>>>>> might
>>> >>>>>>>>>> write outputs multiple times.
>>> >>>>>>>>> Actually, I think it is a larger (open) question whether
>>> exactly once
>>> >>>>>>>>> is guaranteed by the model or whether runners are allowed to
>>> relax
>>> >>>>>>>>> that. I would think, however, that sources correctly
>>> implemented
>>> >>>>>>>>> should be idempotent when run atop an exactly once
>>> infrastructure such
>>> >>>>>>>>> as Flink of Dataflow.
>>> >>>>>>>>>
>>> >>>>>>>>>>    > 1) Is it correct for a (Stateful)DoFn to assume elements
>>> are
>>> >>>>>>>>>> received
>>> >>>>>>>>>> in a specific order? In the current model, it is not. Being
>>> able to
>>> >>>>>>>>>> read, handle, and produced out-of-order data, including late
>>> data,
>>> >>>>>>>>>> is a
>>> >>>>>>>>>> pretty fundamental property of distributed systems.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo
>>> should
>>> >>>>>>>>>> presume to receive elements in any order, but to _present_ it
>>> as
>>> >>>>>>>>>> such to
>>> >>>>>>>>>> the user @ProcessElement function.
>>> >>>>>>>>> Sounds like we should make this clearer.
>>> >>>>>>>>>
>>> >>>>>>>>>>    > 2) Given that some operations are easier (or possibly
>>> only
>>> >>>>>>>>>> possible)
>>> >>>>>>>>>> to write when operating on ordered data, and that different
>>> runners
>>> >>>>>>>>>> may
>>> >>>>>>>>>> have (significantly) cheaper ways to provide this ordering
>>> than can be
>>> >>>>>>>>>> done by the user themselves, should we elevate this to a
>>> property of
>>> >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a
>>> compelling
>>> >>>>>>>>>> argument can be made here that we should.
>>> >>>>>>>>>>
>>> >>>>>>>>>> +1
>>> >>>>>>>>>>
>>> >>>>>>>>>> Jan
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
>>> >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský <
>>> je...@seznam.cz> wrote:
>>> >>>>>>>>>>>>     > I don't see batch vs. streaming as part of the model.
>>> One
>>> >>>>>>>>>>>> can have
>>> >>>>>>>>>>>> microbatch, or even a runner that alternates between
>>> different
>>> >>>>>>>>>>>> modes.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Although I understand motivation of this statement, this
>>> project
>>> >>>>>>>>>>>> name is
>>> >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What
>>> does the
>>> >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
>>> model?
>>> >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of
>>> the
>>> >>>>>>>>>>> model
>>> >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the
>>> runner
>>> >>>>>>>>>>> (executor) of the pipeline.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský <
>>> je...@seznam.cz>
>>> >>>>>>>>>>> wrote:
>>> >>>>>>>>>>>> Hi Kenn,
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful
>>> ParDo
>>> >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I
>>> still
>>> >>>>>>>>>>>> have some doubts, though. Let me explain. The current
>>> behavior of
>>> >>>>>>>>>>>> stateful ParDo has the following properties:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>     a) might fail in batch, although runs fine in streaming
>>> (that
>>> >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch,
>>> which
>>> >>>>>>>>>>>> was discussed back and forth in this thread)
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>     b) might be non deterministic (this is because the
>>> elements
>>> >>>>>>>>>>>> arrive at somewhat random order, and even if you do the
>>> operation
>>> >>>>>>>>>>>> "assign unique ID to elements" this might produce different
>>> >>>>>>>>>>>> results when run multiple times)
>>> >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that
>>> >>>>>>>>>>> assume or
>>> >>>>>>>>>>> depend on a specific ordering for correctness (or
>>> determinism) must
>>> >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary
>>> shuffling
>>> >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if
>>> you have
>>> >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> StatefulDoFns are low-level operations that should be used
>>> with care;
>>> >>>>>>>>>>> the simpler windowing model gives determinism in the face of
>>> >>>>>>>>>>> unordered
>>> >>>>>>>>>>> data (though late data and non-end-of-window triggering
>>> introduces
>>> >>>>>>>>>>> some of the non-determanism back in).
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>> What worries me most is the property b), because it seems
>>> to me
>>> >>>>>>>>>>>> to have serious consequences - not only that if you run
>>> twice
>>> >>>>>>>>>>>> batch pipeline you would get different results, but even on
>>> >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from
>>> >>>>>>>>>>>> checkpoint, produced output might differ from the previous
>>> run
>>> >>>>>>>>>>>> and data from the first run might have already been
>>> persisted
>>> >>>>>>>>>>>> into sink. That would create somewhat messy outputs.
>>> >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed,
>>> state
>>> >>>>>>>>>>> mutated, and outputs written downstream (these three are
>>> committed
>>> >>>>>>>>>>> together atomically) it will not be replayed. That does not,
>>> of
>>> >>>>>>>>>>> course, solve the non-determanism due to ordering (including
>>> the fact
>>> >>>>>>>>>>> that two operations reading the same PCollection may view
>>> different
>>> >>>>>>>>>>> ordering).
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>> These two properties makes me think that the current
>>> >>>>>>>>>>>> implementation is more of a _special case_ than the general
>>> one.
>>> >>>>>>>>>>>> The general one would be that your state doesn't have the
>>> >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or
>>> >>>>>>>>>>>> non-determinism. Which is the case where you need sorting
>>> in both
>>> >>>>>>>>>>>> streaming and batch to be part of the model.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Let me point out one more analogy - that is merging vs.
>>> >>>>>>>>>>>> non-merging windows. The general case (merging windows)
>>> implies
>>> >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and
>>> streaming
>>> >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't
>>> rely
>>> >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can
>>> be
>>> >>>>>>>>>>>> dropped. The underlying root cause of this is the same for
>>> both
>>> >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window
>>> >>>>>>>>>>>> labels is a stateful operation when windowing function is
>>> merging).
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems
>>> to be
>>> >>>>>>>>>>>> performance, but is it right to abandon correctness in
>>> favor of
>>> >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the
>>> default
>>> >>>>>>>>>>>> behavior prefer correctness and when you have the specific
>>> >>>>>>>>>>>> conditions of state function having special properties,
>>> then you
>>> >>>>>>>>>>>> can annotate your DoFn (with something like
>>> >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better
>>> performance in
>>> >>>>>>>>>>>> that case?
>>> >>>>>>>>>>> There are two separable questions here.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are
>>> received
>>> >>>>>>>>>>> in a specific order? In the current model, it is not. Being
>>> able to
>>> >>>>>>>>>>> read, handle, and produced out-of-order data, including late
>>> data, is
>>> >>>>>>>>>>> a pretty fundamental property of distributed systems.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 2) Given that some operations are easier (or possibly only
>>> possible)
>>> >>>>>>>>>>> to write when operating on ordered data, and that different
>>> runners
>>> >>>>>>>>>>> may have (significantly) cheaper ways to provide this
>>> ordering than
>>> >>>>>>>>>>> can be done by the user themselves, should we elevate this
>>> to a
>>> >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I
>>> think a
>>> >>>>>>>>>>> compelling argument can be made here that we should.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> - Robert
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Thanks for the nice small example of a calculation that
>>> depends
>>> >>>>>>>>>>>> on order. You are right that many state machines have this
>>> >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for
>>> batch
>>> >>>>>>>>>>>> processing to sort by event timestamp before running a
>>> stateful
>>> >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event
>>> >>>>>>>>>>>> timestamp" by buffering until you know all earlier data
>>> will be
>>> >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in
>>> >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when
>>> things
>>> >>>>>>>>>>>> are out of order. So each runner if they see the
>>> >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to
>>> deliver
>>> >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to
>>> reject
>>> >>>>>>>>>>>> the pipeline as unsupported.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> And also want to say that this is not the default case -
>>> many
>>> >>>>>>>>>>>> uses of state & timers in ParDo yield different results at
>>> the
>>> >>>>>>>>>>>> element level, but the results are equivalent at in the big
>>> >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence
>>> number
>>> >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter
>>> >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And
>>> >>>>>>>>>>>> other cases like user funnels are monotonic enough that you
>>> also
>>> >>>>>>>>>>>> don't actually need sorting.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Kenn
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský <
>>> je...@seznam.cz>
>>> >>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have
>>> not
>>> >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not
>>> sure if
>>> >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help
>>> to have
>>> >>>>>>>>>>>>> data driven trigger. There would still be some doubts,
>>> though.
>>> >>>>>>>>>>>>> The main question is still here - people say, that sorting
>>> by
>>> >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively
>>> slow, but
>>> >>>>>>>>>>>>> I don't really see why - the sorting is very probably
>>> already
>>> >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted
>>> grouping),
>>> >>>>>>>>>>>>> then the sorting would affect only user defined
>>> StatefulParDos.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> This would suggest that the best way out of this would be
>>> really
>>> >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can
>>> decide.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare
>>> some
>>> >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able
>>> to
>>> >>>>>>>>>>>>> cover all runners / sdks.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> It is read all per key and window and not just read all
>>> (this
>>> >>>>>>>>>>>>> still won't scale with hot keys in the global window). The
>>> GBK
>>> >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are
>>> >>>>>>>>>>>>> processing all the values for a specific key and window at
>>> any
>>> >>>>>>>>>>>>> given time. Is there a specific window/trigger that is
>>> missing
>>> >>>>>>>>>>>>> that you feel would remove the need for you to use
>>> StatefulParDo?
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský <
>>> je...@seznam.cz>
>>> >>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>> Hi Lukasz,
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Today, if you must have a strict order, you must
>>> guarantee
>>> >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary
>>> "buffering &
>>> >>>>>>>>>>>>>>> sorting" into state.
>>> >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion
>>> started,
>>> >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot
>>> first
>>> >>>>>>>>>>>>>> read everything from distributed storage and then buffer
>>> it all
>>> >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will
>>> not
>>> >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste
>>> of
>>> >>>>>>>>>>>>>> resources.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Jan
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský <
>>> je...@seznam.cz>
>>> >>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>> This discussion brings many really interesting questions
>>> for
>>> >>>>>>>>>>>>>>> me. :-)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > I don't see batch vs. streaming as part of the
>>> model. One
>>> >>>>>>>>>>>>>>> can have
>>> >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
>>> different
>>> >>>>>>>>>>>>>>> modes.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Although I understand motivation of this statement, this
>>> >>>>>>>>>>>>>>> project name is
>>> >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model".
>>> What
>>> >>>>>>>>>>>>>>> does the
>>> >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the
>>> model?
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure
>>> streaming
>>> >>>>>>>>>>>>>>> are
>>> >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer
>>> to.
>>> >>>>>>>>>>>>>>> All these
>>> >>>>>>>>>>>>>>> define several runtime parameters, which in turn define
>>> how
>>> >>>>>>>>>>>>>>> well/badly
>>> >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be
>>> >>>>>>>>>>>>>>> needed. From
>>> >>>>>>>>>>>>>>> my point of view, pure streaming should be the most
>>> resource
>>> >>>>>>>>>>>>>>> demanding
>>> >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in
>>> >>>>>>>>>>>>>>> streaming
>>> >>>>>>>>>>>>>>> only? what will there remain to "unify"?).
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > Fortunately, for batch, only the state for a
>>> single key
>>> >>>>>>>>>>>>>>> needs to be
>>> >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
>>> across
>>> >>>>>>>>>>>>>>> the range
>>> >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can
>>> still
>>> >>>>>>>>>>>>>>> have
>>> >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns).
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful
>>> DoFn can
>>> >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain
>>> the use
>>> >>>>>>>>>>>>>>> case in
>>> >>>>>>>>>>>>>>> more detail.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s
>>> (and
>>> >>>>>>>>>>>>>>> some key for
>>> >>>>>>>>>>>>>>> each element, which is irrelevant for the
>>> demonstration). Your
>>> >>>>>>>>>>>>>>> task is
>>> >>>>>>>>>>>>>>> to calculate in running global window the actual number
>>> of
>>> >>>>>>>>>>>>>>> changes
>>> >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the
>>> state
>>> >>>>>>>>>>>>>>> doesn't
>>> >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for
>>> given key)
>>> >>>>>>>>>>>>>>> would be
>>> >>>>>>>>>>>>>>> (tN denotes timestamp N):
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t1: 1
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t2: 0
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t3: 0
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t4: 1
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t5: 1
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t6: 0
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> then the output should yield (supposing that default
>>> state is
>>> >>>>>>>>>>>>>>> zero):
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t1: (one: 1, zero: 0)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t2: (one: 1, zero: 1)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t3: (one: 1, zero: 1)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t4: (one: 2, zero: 1)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t5: (one: 2, zero: 1)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>      t6: (one: 2, zero: 2)
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> How would you implement this in current Beam semantics?
>>> >>>>>>>>>>>>>> I think your saying here that I know that my input is
>>> ordered
>>> >>>>>>>>>>>>>> in a specific way and since I assume the order when
>>> writing my
>>> >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is
>>> nothing
>>> >>>>>>>>>>>>>> preventing a runner from noticing that your processing in
>>> the
>>> >>>>>>>>>>>>>> global window with a specific type of trigger and
>>> re-ordering
>>> >>>>>>>>>>>>>> your inputs/processing to get better performance (since
>>> you
>>> >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in
>>> >>>>>>>>>>>>>> streaming for the GlobalWindow).
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Today, if you must have a strict order, you must
>>> guarantee that
>>> >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering &
>>> >>>>>>>>>>>>>> sorting" into state. I can see why you would want an
>>> annotation
>>> >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it
>>> >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier.
>>> StatefulParDo
>>> >>>>>>>>>>>>>> is a low-level function, it really is the "here you go
>>> and do
>>> >>>>>>>>>>>>>> whatever you need to but here be dragons" function while
>>> >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from
>>> >>>>>>>>>>>>>> writing StatefulParDo in the first place.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > Pipelines that fail in the "worst case" batch
>>> scenario
>>> >>>>>>>>>>>>>>> are likely to
>>> >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
>>> watermark
>>> >>>>>>>>>>>>>>> falls
>>> >>>>>>>>>>>>>>> behind in streaming mode as well.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> But the worst case is defined by input of size (available
>>> >>>>>>>>>>>>>>> resources +
>>> >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have
>>> >>>>>>>>>>>>>>> finished, given
>>> >>>>>>>>>>>>>>> the right conditions.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > This might be reasonable, implemented by default by
>>> >>>>>>>>>>>>>>> buffering
>>> >>>>>>>>>>>>>>> everything and releasing elements as the watermark
>>> (+lateness)
>>> >>>>>>>>>>>>>>> advances,
>>> >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe*
>>> easier to
>>> >>>>>>>>>>>>>>> reason
>>> >>>>>>>>>>>>>>> about) code.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it
>>> would
>>> >>>>>>>>>>>>>>> have to
>>> >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce
>>> >>>>>>>>>>>>>>> correct results
>>> >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > Would it be roughly equivalent to GBK +
>>> FlatMap(lambda
>>> >>>>>>>>>>>>>>> (key, values):
>>> >>>>>>>>>>>>>>> [(key, value) for value in values])?
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the
>>> trigger.
>>> >>>>>>>>>>>>>>> The trigger
>>> >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness)
>>> crosses
>>> >>>>>>>>>>>>>>> element
>>> >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could
>>> be
>>> >>>>>>>>>>>>>>> somehow
>>> >>>>>>>>>>>>>>> emulated by fixed trigger each X millis.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>     > Or is the underlying desire just to be able to
>>> hint to
>>> >>>>>>>>>>>>>>> the runner
>>> >>>>>>>>>>>>>>> that the code may perform better (e.g. require less
>>> resources)
>>> >>>>>>>>>>>>>>> as skew
>>> >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's
>>> cheap)?
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case
>>> as
>>> >>>>>>>>>>>>>>> well. That is
>>> >>>>>>>>>>>>>>> an imperative of the unified model. I think it is
>>> possible to
>>> >>>>>>>>>>>>>>> sort by
>>> >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch
>>> >>>>>>>>>>>>>>> stateful pardos
>>> >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then
>>> make
>>> >>>>>>>>>>>>>>> the same
>>> >>>>>>>>>>>>>>> guarantees for streaming case as well.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Jan
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
>>> >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský
>>> >>>>>>>>>>>>>>>> <je...@seznam.cz> wrote:
>>> >>>>>>>>>>>>>>>>> Hi Robert,
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no
>>> *explicit*
>>> >>>>>>>>>>>>>>>>> guarantees
>>> >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is
>>> *implicit*
>>> >>>>>>>>>>>>>>>>> ordering in
>>> >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing
>>> - the
>>> >>>>>>>>>>>>>>>>> difference
>>> >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing
>>> through
>>> >>>>>>>>>>>>>>>>> the pipeline
>>> >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the
>>> >>>>>>>>>>>>>>>>> overbuffering
>>> >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch.
>>> >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key
>>> needs
>>> >>>>>>>>>>>>>>>> to be
>>> >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys
>>> >>>>>>>>>>>>>>>> across the
>>> >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys,
>>> one can
>>> >>>>>>>>>>>>>>>> still
>>> >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns).
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions:
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>       - the best and most natural seems to be
>>> extension of
>>> >>>>>>>>>>>>>>>>> the model, so
>>> >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline
>>> >>>>>>>>>>>>>>>>> executed in batch
>>> >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime
>>> >>>>>>>>>>>>>>>>> characteristics as
>>> >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really
>>> >>>>>>>>>>>>>>>>> don't think that
>>> >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or
>>> that this
>>> >>>>>>>>>>>>>>>>> could
>>> >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as
>>> pointed by
>>> >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during
>>> translation
>>> >>>>>>>>>>>>>>>>> of stateful
>>> >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects
>>> user
>>> >>>>>>>>>>>>>>>>> defined
>>> >>>>>>>>>>>>>>>>> stateful pardos
>>> >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model.
>>> One can
>>> >>>>>>>>>>>>>>>> have
>>> >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between
>>> >>>>>>>>>>>>>>>> different modes.
>>> >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a
>>> >>>>>>>>>>>>>>>> (sometimes
>>> >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define
>>> >>>>>>>>>>>>>>>> things like
>>> >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any
>>> >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and
>>> define
>>> >>>>>>>>>>>>>>>> (and
>>> >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable
>>> >>>>>>>>>>>>>>>> out-of-orderedness.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario
>>> are
>>> >>>>>>>>>>>>>>>> likely to
>>> >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the
>>> watermark
>>> >>>>>>>>>>>>>>>> falls
>>> >>>>>>>>>>>>>>>> behind in streaming mode as well.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>       - another option would be to introduce
>>> annotation for
>>> >>>>>>>>>>>>>>>>> DoFns (e.g.
>>> >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would
>>> result in
>>> >>>>>>>>>>>>>>>>> the sorting
>>> >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to
>>> ensure
>>> >>>>>>>>>>>>>>>>> the sorting in
>>> >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of
>>> allowed
>>> >>>>>>>>>>>>>>>>> lateness,
>>> >>>>>>>>>>>>>>>>> and triggger (essentially similar to window)
>>> >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by
>>> buffering
>>> >>>>>>>>>>>>>>>> everything and releasing elements as the watermark
>>> (+lateness)
>>> >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though
>>> >>>>>>>>>>>>>>>> *maybe* easier
>>> >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of
>>> >>>>>>>>>>>>>>>> triggering
>>> >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be
>>> roughly
>>> >>>>>>>>>>>>>>>> equivalent
>>> >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for
>>> >>>>>>>>>>>>>>>> value in
>>> >>>>>>>>>>>>>>>> values])?
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to
>>> the
>>> >>>>>>>>>>>>>>>> runner that
>>> >>>>>>>>>>>>>>>> the code may perform better (e.g. require less
>>> resources) as
>>> >>>>>>>>>>>>>>>> skew is
>>> >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's
>>> cheap)?
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>       - last option would be to introduce these
>>> "higher order
>>> >>>>>>>>>>>>>>>>> guarantees" in
>>> >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to
>>> be the
>>> >>>>>>>>>>>>>>>>> worst
>>> >>>>>>>>>>>>>>>>> option to me
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> I see the first two options quite equally good,
>>> although the
>>> >>>>>>>>>>>>>>>>> letter one
>>> >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it
>>> would
>>> >>>>>>>>>>>>>>>>> bring
>>> >>>>>>>>>>>>>>>>> additional feature to streaming case as well.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Thanks for any thoughts.
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>       Jan
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
>>> >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský
>>> >>>>>>>>>>>>>>>>>> <je...@seznam.cz> wrote:
>>> >>>>>>>>>>>>>>>>>>> Hi Reuven,
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch
>>> >>>>>>>>>>>>>>>>>>>> runners.
>>> >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic
>>> inside
>>> >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded
>>> out-of-orderness
>>> >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work
>>> only
>>> >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't
>>> >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine",
>>> and any
>>> >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the
>>> ordering
>>> >>>>>>>>>>>>>>>>>>> of elements would matter.
>>> >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either*
>>> streaming
>>> >>>>>>>>>>>>>>>>>> or batch
>>> >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in
>>> order to
>>> >>>>>>>>>>>>>>>>>> make
>>> >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to
>>> limit
>>> >>>>>>>>>>>>>>>>>> the amount of
>>> >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event
>>> time vs.
>>> >>>>>>>>>>>>>>>>>> processing
>>> >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could
>>> help
>>> >>>>>>>>>>>>>>>>>> cap the
>>> >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently,
>>> whereas a
>>> >>>>>>>>>>>>>>>>>> batch runner
>>> >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until
>>> the whole
>>> >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has
>>> been
>>> >>>>>>>>>>>>>>>>>> observed.
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to
>>> >>>>>>>>>>>>>>>>>> batch" in batch mode.
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
>>> >>>>>>>>>>>>>>>>>> <m...@apache.org> wrote:
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>       batch semantics and streaming semantics
>>> differs only
>>> >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default
>>> trigger on
>>> >>>>>>>>>>>>>>>>>>>> batch and cannot on stream
>>> >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a
>>> default
>>> >>>>>>>>>>>>>>>>>>> trigger. You
>>> >>>>>>>>>>>>>>>>>>> could define additional triggers that do early
>>> firings.
>>> >>>>>>>>>>>>>>>>>>> And you could
>>> >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the
>>> watermark
>>> >>>>>>>>>>>>>>>>>>> to +inf.
>>> >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global
>>> window with
>>> >>>>>>>>>>>>>>>>>> default
>>> >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because
>>> this
>>> >>>>>>>>>>>>>>>>>> is more
>>> >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have
>>> no
>>> >>>>>>>>>>>>>>>>>> output until
>>> >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model.
>>>
>>>

Reply via email to