Expanding the dimensionality could be the basis for loops within the graph since loops could be modeled as (time, loop iteration #, nested loop iteration #, nested nested loop iteration #, ...)
On Tue, May 28, 2019 at 12:10 PM Jan Lukavský <je...@seznam.cz> wrote: > Could this be solved by "expanding the dimensionality" of time field? What > I mean by that - if input element to to FlatMap has sequence number T, then > the (stateless) FlatMap knows the ordering of output elements, right? If it > would expand the field by which it will next sort the elements to (X, 1), > (X, 2), ... (X, N), then it would be possible to sort the elements back > later. There seems to be no need for state to achieve that, or? > > Jan > On 5/28/19 6:52 PM, Reuven Lax wrote: > > A slightly larger concern: it also will force users to create stateful > DoFns everywhere to generate these sequence numbers. If I have a ParDo that > is not a simple 1:1 transform (i.e. not MapElements), then the ParDo will > need to generate its own sequence numbers for ordering, and the only safe > way to do so is to use a stateful DoFn. This turns what used to be a simple > in-memory DoFn into one that has to access state. Also I believe many > runners will not fuse stateful DoFns. While none of this poses a problem > for the model, it could make ordering extremely expensive to achieve. > > Reuven > > On Tue, May 28, 2019 at 6:09 AM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Reuven, >> >> > It also gets awkward with Flatten - the sequence number is no longer >> enough, you must also encode which side of the flatten each element came >> from. >> >> That is a generic need. Even if you read data from Kafka, the offsets are >> comparable only inside single partition. So, for Kafka to work as a FIFO >> for ordering, elements with same key have to be pushed to the same >> partition (otherwise Kafka cannot act as FIFO, because different partitions >> can be handled by different brokers, which means different observers and >> they therefore might not agree on the order of events). So if we want to >> emulate FIFO per key, then the sequence IDs have also be per key. >> On 5/28/19 2:33 PM, Reuven Lax wrote: >> >> Sequence metadata does have the disadvantage that users can no longer use >> the types coming from the source. You must create a new type that contains >> a sequence number (unless Beam provides this). It also gets awkward with >> Flatten - the sequence number is no longer enough, you must also encode >> which side of the flatten each element came from. >> >> On Tue, May 28, 2019 at 3:18 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> As I understood it, Kenn was supporting the idea that sequence metadata >>> is preferable over FIFO. I was trying to point out, that it even should >>> provide the same functionally as FIFO, plus one important more - >>> reproducibility and ability to being persisted and reused the same way >>> in batch and streaming. >>> >>> There is no doubt, that sequence metadata can be stored in every >>> storage. But, regarding some implicit ordering that sources might have - >>> yes, of course, data written into HDFS or Cloud Storage has ordering, >>> but only partial - inside some bulk (e.g. file) and the ordering is not >>> defined correctly on boundaries of these bulks (between files). That is >>> why I'd say, that ordering of sources is relevant only for >>> (partitioned!) streaming sources and generally always reduces to >>> sequence metadata (e.g. offsets). >>> >>> Jan >>> >>> On 5/28/19 11:43 AM, Robert Bradshaw wrote: >>> > Huge +1 to all Kenn said. >>> > >>> > Jan, batch sources can have orderings too, just like Kafka. I think >>> > it's reasonable (for both batch and streaming) that if a source has an >>> > ordering that is an important part of the data, it should preserve >>> > this ordering into the data itself (e.g. as sequence numbers, offsets, >>> > etc.) >>> > >>> > On Fri, May 24, 2019 at 10:35 PM Kenneth Knowles <k...@apache.org> >>> wrote: >>> >> I strongly prefer explicit sequence metadata over FIFO requirements, >>> because: >>> >> >>> >> - FIFO is complex to specify: for example Dataflow has "per stage >>> key-to-key" FIFO today, but it is not guaranteed to remain so (plus "stage" >>> is not a portable concept, nor even guaranteed to remain a Dataflow concept) >>> >> - complex specifications are by definition poor usability (if >>> necessary, then it is what it is) >>> >> - overly restricts the runner, reduces parallelism, for example any >>> non-stateful ParDo has per-element parallelism, not per "key" >>> >> - another perspective on that: FIFO makes everyone pay rather than >>> just the transform that requires exactly sequencing >>> >> - previous implementation details like reshuffles become part of >>> the model >>> >> - I'm not even convinced the use cases involved are addressed by >>> some careful FIFO restrictions; many sinks re-key and they would all have >>> to become aware of how keying of a sequence of "stages" affects the >>> end-to-end FIFO >>> >> >>> >> A noop becoming a non-noop is essentially the mathematical definition >>> of moving from higher-level to lower-level abstraction. >>> >> >>> >> So this strikes at the core question of what level of abstraction >>> Beam aims to represent. Lower-level means there are fewer possible >>> implementations and it is more tied to the underlying architecture, and >>> anything not near-exact match pays a huge penalty. Higher-level means there >>> are more implementations possible with different tradeoffs, though they may >>> all pay a minor penalty. >>> >> >>> >> I could be convinced to change my mind, but it needs some extensive >>> design, examples, etc. I think it is probably about the most consequential >>> design decision in the whole Beam model, around the same level as the >>> decision to use ParDo and GBK as the primitives IMO. >>> >> >>> >> Kenn >>> >> >>> >> On Thu, May 23, 2019 at 10:17 AM Reuven Lax <re...@google.com> wrote: >>> >>> Not really. I'm suggesting that some variant of FIFO ordering is >>> necessary, which requires either runners natively support FIFO ordering or >>> transforms adding some extra sequence number to each record to sort by. >>> >>> >>> >>> I still think your proposal is very useful by the way. I'm merely >>> pointing out that to solve the state-machine problem we probably need >>> something more. >>> >>> >>> >>> Reuven >>> >>> >>> >>> On Thu, May 23, 2019 at 9:50 AM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >>>> Hi, >>> >>>> yes. It seems that ordering by user supplied UDF makes sense and I >>> will update the design proposal accordingly. >>> >>>> Would that solve the issues you mention? >>> >>>> Jan >>> >>>> ---------- Původní e-mail ---------- >>> >>>> Od: Reuven Lax <re...@google.com> >>> >>>> Komu: dev <dev@beam.apache.org> >>> >>>> Datum: 23. 5. 2019 18:44:38 >>> >>>> Předmět: Re: Definition of Unified model >>> >>>> >>> >>>> I'm simply saying that timestamp ordering is insufficient for state >>> machines. I wasn't proposing Kafka as a solution - that was simply an >>> example of how people solve this problem in other scenarios. >>> >>>> >>> >>>> BTW another example of ordering: Imagine today that you have a >>> triggered Sum aggregation writing out to a key-value sink. In theory we >>> provide no ordering, so the sink might write the triggered sums in the >>> wrong order, ending up with an incorrect value in the sink. In this case >>> you probably want values ordered by trigger pane index. >>> >>>> >>> >>>> Reuven >>> >>>> >>> >>>> On Thu, May 23, 2019 at 8:59 AM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >>>> >>> >>>> Hi Reuven, >>> >>>> I share the view point of Robert. I think the isuue you refer to is >>> not in reality related to timestamps, but to the fact, that ordering of >>> events in time is observer dependent (either caused by relativity, or time >>> skew, essentially this has the same consequences). And the resolution in >>> fact isn't Kafka, but generally an authoritative observer, that tells you >>> "I saw the events in this order". And you either have one (and have the >>> outcome of his observation persisted in the data - e.g. as offset in Kafka >>> partition), then you should be able to use it (maybe that suggests afterall >>> that sorting by some user supplied UDF might make sense), or do not have >>> it, and then any interpretation of the data seems to be equally valid. >>> Although determinism is fine, of course. >>> >>>> Jan >>> >>>> ---------- Původní e-mail ---------- >>> >>>> Od: Reuven Lax <re...@google.com> >>> >>>> Komu: dev <dev@beam.apache.org> >>> >>>> Datum: 23. 5. 2019 17:39:12 >>> >>>> Předmět: Re: Definition of Unified model >>> >>>> >>> >>>> So an example would be elements of type "startUserSession" and >>> "endUserSession" (website sessions, not Beam sessions). Logically you may >>> need to process them in the correct order if you have any sort of >>> state-machine logic. However timestamp ordering is never guaranteed to >>> match the logical ordering. Not only might you have several elements with >>> the same timestamp, but in reality time skew across backend servers can >>> cause the events to have timestamps in reverse order of the actual >>> causality order. >>> >>>> >>> >>>> People do solve this problem today though. Publish the events to >>> Kafka, making sure that events for the same user end up in the same Kafka >>> partition. This ensures that the events appear in the Kafka partitions in >>> causality order, even if the timestamp order doesn't match. The your Kafka >>> subscriber simply process the elements in each partition in order. >>> >>>> >>> >>>> I think the ability to impose FIFO causality ordering is what's >>> needed for any state-machine work. Timestamp ordering has advantages >>> (though often I think the advantage is in state), but does not solve this >>> problem. >>> >>>> >>> >>>> Reuven >>> >>>> >>> >>>> On Thu, May 23, 2019 at 7:48 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >>>> >>> >>>> Good point. >>> >>>> >>> >>>> The "implementation-specific" way I would do this is >>> >>>> window-by-instant, followed by a DoFn that gets all the elements >>> with >>> >>>> the same timestamp and sorts/acts accordingly, but this counts on >>> the >>> >>>> runner producing windows in timestamp order (likely?) and also the >>> >>>> subsequent DoFn getting them in this order (also likely, due to >>> >>>> fusion). >>> >>>> >>> >>>> One could make the argument that, though it does not provide >>> >>>> deterministic behavior, getting elements of the same timestamp in >>> >>>> different orders should produce equally valid interpretations of the >>> >>>> data. (After all, due to relatively, timestamps are not technically >>> >>>> well ordered across space.) I can see how data-dependent tiebreakers >>> >>>> could be useful, or promises of preservation of order between >>> >>>> operations. >>> >>>> >>> >>>> - Robert >>> >>>> >>> >>>> On Thu, May 23, 2019 at 4:18 PM Reuven Lax <re...@google.com> >>> wrote: >>> >>>>> So Jan's example of state machines is quite a valid use case for >>> ordering. However in my experience, timestamp ordering is insufficient for >>> state machines. Elements that cause state transitions might come in with >>> the exact same timestamp, yet still have a necessary ordering. Especially >>> given Beam's decision to have milliseconds timestamps this is possible, but >>> even at microsecond or nanosecond precision this can happen at scale. To >>> handle state machines you usually need some sort of FIFO ordering along >>> with an ordered sources, such as Kafka, not timestamp ordering. >>> >>>>> >>> >>>>> Reuven >>> >>>>> >>> >>>>> On Thu, May 23, 2019 at 12:32 AM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >>>>>> Hi all, >>> >>>>>> >>> >>>>>> thanks everyone for this discussion. I think I have gathered >>> enough >>> >>>>>> feedback to be able to put down a proposition for changes, which >>> I will >>> >>>>>> do and send to this list for further discussion. There are still >>> doubts >>> >>>>>> remaining the non-determinism and it's relation to outputs >>> stability vs. >>> >>>>>> latency. But I will try to clarify all this in the design >>> document. >>> >>>>>> >>> >>>>>> Thanks, >>> >>>>>> >>> >>>>>> Jan >>> >>>>>> >>> >>>>>> On 5/22/19 3:49 PM, Maximilian Michels wrote: >>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my >>> >>>>>>>> current understanding. >>> >>>>>>> In essence your description of how exactly-once works in Flink is >>> >>>>>>> correct. The general assumption in Flink is that pipelines must >>> be >>> >>>>>>> deterministic and thus produce idempotent writes in the case of >>> >>>>>>> failures. However, that doesn't mean Beam sinks can't guarantee >>> a bit >>> >>>>>>> more with what Flink has to offer. >>> >>>>>>> >>> >>>>>>> Luke already mentioned the design discussions for >>> @RequiresStableInput >>> >>>>>>> which ensures idempotent writes for non-deterministic pipelines. >>> This >>> >>>>>>> is not part of the model but an optional Beam feature. >>> >>>>>>> >>> >>>>>>> We recently implemented support for @RequiresStableInput in the >>> Flink >>> >>>>>>> Runner. Reuven mentioned the Flink checkpoint confirmation, which >>> >>>>>>> allows us to buffer (and checkpoint) processed data and only >>> emit it >>> >>>>>>> once a Flink checkpoint has completed. >>> >>>>>>> >>> >>>>>>> Cheers, >>> >>>>>>> Max >>> >>>>>>> >>> >>>>>>> On 21.05.19 16:49, Jan Lukavský wrote: >>> >>>>>>>> Hi, >>> >>>>>>>> >>> >>>>>>>> > Actually, I think it is a larger (open) question whether >>> exactly >>> >>>>>>>> once is guaranteed by the model or whether runners are allowed >>> to >>> >>>>>>>> relax that. I would think, however, that sources correctly >>> >>>>>>>> implemented should be idempotent when run atop an exactly once >>> >>>>>>>> infrastructure such as Flink of Dataflow. >>> >>>>>>>> >>> >>>>>>>> I would assume, that the model basically inherits guarantees of >>> >>>>>>>> underlying infrastructure. Because Flink does not work as you >>> >>>>>>>> described (atomic commit of inputs, state and outputs), but >>> rather a >>> >>>>>>>> checkpoint mark is flowing through the DAG much like watermark >>> and on >>> >>>>>>>> failures operators are restored and data reprocessed, it (IMHO) >>> >>>>>>>> implies, that you have exactly once everywhere in the DAG *but* >>> >>>>>>>> sinks. That is because sinks cannot be restored to previous >>> state, >>> >>>>>>>> instead sinks are supposed to be idempotent in order for the >>> exactly >>> >>>>>>>> once to really work (or at least be able to commit outputs on >>> >>>>>>>> checkpoint in sink). That implies that if you don't have sink >>> that is >>> >>>>>>>> able to commit outputs atomically on checkpoint, the pipeline >>> >>>>>>>> execution should be deterministic upon retries, otherwise shadow >>> >>>>>>>> writes from failed paths of the pipeline might appear. >>> >>>>>>>> >>> >>>>>>>> Someone from Flink might correct me if I'm wrong, but that's my >>> >>>>>>>> current understanding. >>> >>>>>>>> >>> >>>>>>>> > Sounds like we should make this clearer. >>> >>>>>>>> >>> >>>>>>>> I meant that you are right that we must not in any thoughts we >>> are >>> >>>>>>>> having forget that streams are by definition out-of-order. That >>> is >>> >>>>>>>> property that we cannot change. But - that doesn't limit us from >>> >>>>>>>> creating operator that presents the data to UDF as if the >>> stream was >>> >>>>>>>> ideally sorted. It can do that by introducing latency, of >>> course. >>> >>>>>>>> >>> >>>>>>>> On 5/21/19 4:01 PM, Robert Bradshaw wrote: >>> >>>>>>>>> Reza: One could provide something like this as a utility >>> class, but >>> >>>>>>>>> one downside is that it is not scale invariant. It requires a >>> tuning >>> >>>>>>>>> parameter that, if to small, won't mitigate the problem, but >>> if to >>> >>>>>>>>> big, greatly increases latency. (Possibly one could define a >>> dynamic >>> >>>>>>>>> session-like window to solve this though...) It also might be >>> harder >>> >>>>>>>>> for runners that *can* cheaply present stuff in timestamp >>> order to >>> >>>>>>>>> optimize. (That and, in practice, our annotation-style process >>> methods >>> >>>>>>>>> don't lend themselves to easy composition.) I think it could >>> work in >>> >>>>>>>>> specific cases though. >>> >>>>>>>>> >>> >>>>>>>>> More inline below. >>> >>>>>>>>> >>> >>>>>>>>> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský <je...@seznam.cz> >>> wrote: >>> >>>>>>>>>> Hi Robert, >>> >>>>>>>>>> >>> >>>>>>>>>> > Beam has an exactly-once model. If the data was >>> consumed, state >>> >>>>>>>>>> mutated, and outputs written downstream (these three are >>> committed >>> >>>>>>>>>> together atomically) it will not be replayed. That does not, >>> of >>> >>>>>>>>>> course, >>> >>>>>>>>>> solve the non-determanism due to ordering (including the fact >>> that two >>> >>>>>>>>>> operations reading the same PCollection may view different >>> ordering). >>> >>>>>>>>>> >>> >>>>>>>>>> I think what you describe is a property of a runner, not of >>> the model, >>> >>>>>>>>>> right? I think if I run my pipeline on Flink I will not get >>> this >>> >>>>>>>>>> atomicity, because although Flink uses also exactly-once >>> model if >>> >>>>>>>>>> might >>> >>>>>>>>>> write outputs multiple times. >>> >>>>>>>>> Actually, I think it is a larger (open) question whether >>> exactly once >>> >>>>>>>>> is guaranteed by the model or whether runners are allowed to >>> relax >>> >>>>>>>>> that. I would think, however, that sources correctly >>> implemented >>> >>>>>>>>> should be idempotent when run atop an exactly once >>> infrastructure such >>> >>>>>>>>> as Flink of Dataflow. >>> >>>>>>>>> >>> >>>>>>>>>> > 1) Is it correct for a (Stateful)DoFn to assume elements >>> are >>> >>>>>>>>>> received >>> >>>>>>>>>> in a specific order? In the current model, it is not. Being >>> able to >>> >>>>>>>>>> read, handle, and produced out-of-order data, including late >>> data, >>> >>>>>>>>>> is a >>> >>>>>>>>>> pretty fundamental property of distributed systems. >>> >>>>>>>>>> >>> >>>>>>>>>> Yes, absolutely. The argument here is not that Stateful ParDo >>> should >>> >>>>>>>>>> presume to receive elements in any order, but to _present_ it >>> as >>> >>>>>>>>>> such to >>> >>>>>>>>>> the user @ProcessElement function. >>> >>>>>>>>> Sounds like we should make this clearer. >>> >>>>>>>>> >>> >>>>>>>>>> > 2) Given that some operations are easier (or possibly >>> only >>> >>>>>>>>>> possible) >>> >>>>>>>>>> to write when operating on ordered data, and that different >>> runners >>> >>>>>>>>>> may >>> >>>>>>>>>> have (significantly) cheaper ways to provide this ordering >>> than can be >>> >>>>>>>>>> done by the user themselves, should we elevate this to a >>> property of >>> >>>>>>>>>> (Stateful?)DoFns that the runner can provide? I think a >>> compelling >>> >>>>>>>>>> argument can be made here that we should. >>> >>>>>>>>>> >>> >>>>>>>>>> +1 >>> >>>>>>>>>> >>> >>>>>>>>>> Jan >>> >>>>>>>>>> >>> >>>>>>>>>> On 5/21/19 11:07 AM, Robert Bradshaw wrote: >>> >>>>>>>>>>> On Mon, May 20, 2019 at 5:24 PM Jan Lukavský < >>> je...@seznam.cz> wrote: >>> >>>>>>>>>>>> > I don't see batch vs. streaming as part of the model. >>> One >>> >>>>>>>>>>>> can have >>> >>>>>>>>>>>> microbatch, or even a runner that alternates between >>> different >>> >>>>>>>>>>>> modes. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Although I understand motivation of this statement, this >>> project >>> >>>>>>>>>>>> name is >>> >>>>>>>>>>>> "Apache Beam: An advanced unified programming model". What >>> does the >>> >>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the >>> model? >>> >>>>>>>>>>> What I mean is that streaming vs. batch is no longer part of >>> the >>> >>>>>>>>>>> model >>> >>>>>>>>>>> (or ideally API), but pushed down to be a concern of the >>> runner >>> >>>>>>>>>>> (executor) of the pipeline. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Tue, May 21, 2019 at 10:32 AM Jan Lukavský < >>> je...@seznam.cz> >>> >>>>>>>>>>> wrote: >>> >>>>>>>>>>>> Hi Kenn, >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> OK, so if we introduce annotation, we can have stateful >>> ParDo >>> >>>>>>>>>>>> with sorting, that would perfectly resolve my issues. I >>> still >>> >>>>>>>>>>>> have some doubts, though. Let me explain. The current >>> behavior of >>> >>>>>>>>>>>> stateful ParDo has the following properties: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> a) might fail in batch, although runs fine in streaming >>> (that >>> >>>>>>>>>>>> is due to the buffering, and unbounded lateness in batch, >>> which >>> >>>>>>>>>>>> was discussed back and forth in this thread) >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> b) might be non deterministic (this is because the >>> elements >>> >>>>>>>>>>>> arrive at somewhat random order, and even if you do the >>> operation >>> >>>>>>>>>>>> "assign unique ID to elements" this might produce different >>> >>>>>>>>>>>> results when run multiple times) >>> >>>>>>>>>>> PCollections are *explicitly* unordered. Any operations that >>> >>>>>>>>>>> assume or >>> >>>>>>>>>>> depend on a specific ordering for correctness (or >>> determinism) must >>> >>>>>>>>>>> provide that ordering themselves (i.e. tolerate "arbitrary >>> shuffling >>> >>>>>>>>>>> of inputs"). As you point out, that may be very expensive if >>> you have >>> >>>>>>>>>>> very hot keys with very large (unbounded) timestamp skew. >>> >>>>>>>>>>> >>> >>>>>>>>>>> StatefulDoFns are low-level operations that should be used >>> with care; >>> >>>>>>>>>>> the simpler windowing model gives determinism in the face of >>> >>>>>>>>>>> unordered >>> >>>>>>>>>>> data (though late data and non-end-of-window triggering >>> introduces >>> >>>>>>>>>>> some of the non-determanism back in). >>> >>>>>>>>>>> >>> >>>>>>>>>>>> What worries me most is the property b), because it seems >>> to me >>> >>>>>>>>>>>> to have serious consequences - not only that if you run >>> twice >>> >>>>>>>>>>>> batch pipeline you would get different results, but even on >>> >>>>>>>>>>>> streaming, when pipeline fails and gets restarted from >>> >>>>>>>>>>>> checkpoint, produced output might differ from the previous >>> run >>> >>>>>>>>>>>> and data from the first run might have already been >>> persisted >>> >>>>>>>>>>>> into sink. That would create somewhat messy outputs. >>> >>>>>>>>>>> Beam has an exactly-once model. If the data was consumed, >>> state >>> >>>>>>>>>>> mutated, and outputs written downstream (these three are >>> committed >>> >>>>>>>>>>> together atomically) it will not be replayed. That does not, >>> of >>> >>>>>>>>>>> course, solve the non-determanism due to ordering (including >>> the fact >>> >>>>>>>>>>> that two operations reading the same PCollection may view >>> different >>> >>>>>>>>>>> ordering). >>> >>>>>>>>>>> >>> >>>>>>>>>>>> These two properties makes me think that the current >>> >>>>>>>>>>>> implementation is more of a _special case_ than the general >>> one. >>> >>>>>>>>>>>> The general one would be that your state doesn't have the >>> >>>>>>>>>>>> properties to be able to tolerate buffering problems and/or >>> >>>>>>>>>>>> non-determinism. Which is the case where you need sorting >>> in both >>> >>>>>>>>>>>> streaming and batch to be part of the model. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Let me point out one more analogy - that is merging vs. >>> >>>>>>>>>>>> non-merging windows. The general case (merging windows) >>> implies >>> >>>>>>>>>>>> sorting by timestamp in both batch case (explicit) and >>> streaming >>> >>>>>>>>>>>> (buffering). The special case (non-merging windows) doesn't >>> rely >>> >>>>>>>>>>>> on any timestamp ordering, so the sorting and buffering can >>> be >>> >>>>>>>>>>>> dropped. The underlying root cause of this is the same for >>> both >>> >>>>>>>>>>>> stateful ParDo and windowing (essentially, assigning window >>> >>>>>>>>>>>> labels is a stateful operation when windowing function is >>> merging). >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> The reason for the current behavior of stateful ParDo seems >>> to be >>> >>>>>>>>>>>> performance, but is it right to abandon correctness in >>> favor of >>> >>>>>>>>>>>> performance? Wouldn't it be more consistent to have the >>> default >>> >>>>>>>>>>>> behavior prefer correctness and when you have the specific >>> >>>>>>>>>>>> conditions of state function having special properties, >>> then you >>> >>>>>>>>>>>> can annotate your DoFn (with something like >>> >>>>>>>>>>>> @TimeOrderingAgnostic), which would yield a better >>> performance in >>> >>>>>>>>>>>> that case? >>> >>>>>>>>>>> There are two separable questions here. >>> >>>>>>>>>>> >>> >>>>>>>>>>> 1) Is it correct for a (Stateful)DoFn to assume elements are >>> received >>> >>>>>>>>>>> in a specific order? In the current model, it is not. Being >>> able to >>> >>>>>>>>>>> read, handle, and produced out-of-order data, including late >>> data, is >>> >>>>>>>>>>> a pretty fundamental property of distributed systems. >>> >>>>>>>>>>> >>> >>>>>>>>>>> 2) Given that some operations are easier (or possibly only >>> possible) >>> >>>>>>>>>>> to write when operating on ordered data, and that different >>> runners >>> >>>>>>>>>>> may have (significantly) cheaper ways to provide this >>> ordering than >>> >>>>>>>>>>> can be done by the user themselves, should we elevate this >>> to a >>> >>>>>>>>>>> property of (Stateful?)DoFns that the runner can provide? I >>> think a >>> >>>>>>>>>>> compelling argument can be made here that we should. >>> >>>>>>>>>>> >>> >>>>>>>>>>> - Robert >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>>> On 5/21/19 1:00 AM, Kenneth Knowles wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Thanks for the nice small example of a calculation that >>> depends >>> >>>>>>>>>>>> on order. You are right that many state machines have this >>> >>>>>>>>>>>> property. I agree w/ you and Luke that it is convenient for >>> batch >>> >>>>>>>>>>>> processing to sort by event timestamp before running a >>> stateful >>> >>>>>>>>>>>> ParDo. In streaming you could also implement "sort by event >>> >>>>>>>>>>>> timestamp" by buffering until you know all earlier data >>> will be >>> >>>>>>>>>>>> dropped - a slack buffer up to allowed lateness. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I do not think that it is OK to sort in batch and not in >>> >>>>>>>>>>>> streaming. Many state machines diverge very rapidly when >>> things >>> >>>>>>>>>>>> are out of order. So each runner if they see the >>> >>>>>>>>>>>> "@OrderByTimestamp" annotation (or whatever) needs to >>> deliver >>> >>>>>>>>>>>> sorted data (by some mix of buffering and dropping), or to >>> reject >>> >>>>>>>>>>>> the pipeline as unsupported. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> And also want to say that this is not the default case - >>> many >>> >>>>>>>>>>>> uses of state & timers in ParDo yield different results at >>> the >>> >>>>>>>>>>>> element level, but the results are equivalent at in the big >>> >>>>>>>>>>>> picture. Such as the example of "assign a unique sequence >>> number >>> >>>>>>>>>>>> to each element" or "group into batches" it doesn't matter >>> >>>>>>>>>>>> exactly what the result is, only that it meets the spec. And >>> >>>>>>>>>>>> other cases like user funnels are monotonic enough that you >>> also >>> >>>>>>>>>>>> don't actually need sorting. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Kenn >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský < >>> je...@seznam.cz> >>> >>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>> Yes, the problem will arise probably mostly when you have >>> not >>> >>>>>>>>>>>>> well distributed keys (or too few keys). I'm really not >>> sure if >>> >>>>>>>>>>>>> a pure GBK with a trigger can solve this - it might help >>> to have >>> >>>>>>>>>>>>> data driven trigger. There would still be some doubts, >>> though. >>> >>>>>>>>>>>>> The main question is still here - people say, that sorting >>> by >>> >>>>>>>>>>>>> timestamp before stateful ParDo would be prohibitively >>> slow, but >>> >>>>>>>>>>>>> I don't really see why - the sorting is very probably >>> already >>> >>>>>>>>>>>>> there. And if not (hash grouping instead of sorted >>> grouping), >>> >>>>>>>>>>>>> then the sorting would affect only user defined >>> StatefulParDos. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> This would suggest that the best way out of this would be >>> really >>> >>>>>>>>>>>>> to add annotation, so that the author of the pipeline can >>> decide. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> If that would be acceptable I think I can try to prepare >>> some >>> >>>>>>>>>>>>> basic functionality, but I'm not sure, if I would be able >>> to >>> >>>>>>>>>>>>> cover all runners / sdks. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote: >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> It is read all per key and window and not just read all >>> (this >>> >>>>>>>>>>>>> still won't scale with hot keys in the global window). The >>> GBK >>> >>>>>>>>>>>>> preceding the StatefulParDo will guarantee that you are >>> >>>>>>>>>>>>> processing all the values for a specific key and window at >>> any >>> >>>>>>>>>>>>> given time. Is there a specific window/trigger that is >>> missing >>> >>>>>>>>>>>>> that you feel would remove the need for you to use >>> StatefulParDo? >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský < >>> je...@seznam.cz> >>> >>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>> Hi Lukasz, >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Today, if you must have a strict order, you must >>> guarantee >>> >>>>>>>>>>>>>>> that your StatefulParDo implements the necessary >>> "buffering & >>> >>>>>>>>>>>>>>> sorting" into state. >>> >>>>>>>>>>>>>> Yes, no problem with that. But this whole discussion >>> started, >>> >>>>>>>>>>>>>> because *this doesn't work on batch*. You simply cannot >>> first >>> >>>>>>>>>>>>>> read everything from distributed storage and then buffer >>> it all >>> >>>>>>>>>>>>>> into memory, just to read it again, but sorted. That will >>> not >>> >>>>>>>>>>>>>> work. And even if it would, it would be a terrible waste >>> of >>> >>>>>>>>>>>>>> resources. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Jan >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote: >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský < >>> je...@seznam.cz> >>> >>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>> This discussion brings many really interesting questions >>> for >>> >>>>>>>>>>>>>>> me. :-) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > I don't see batch vs. streaming as part of the >>> model. One >>> >>>>>>>>>>>>>>> can have >>> >>>>>>>>>>>>>>> microbatch, or even a runner that alternates between >>> different >>> >>>>>>>>>>>>>>> modes. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Although I understand motivation of this statement, this >>> >>>>>>>>>>>>>>> project name is >>> >>>>>>>>>>>>>>> "Apache Beam: An advanced unified programming model". >>> What >>> >>>>>>>>>>>>>>> does the >>> >>>>>>>>>>>>>>> model unify, if "streaming vs. batch" is not part of the >>> model? >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Using microbatching, chaining of batch jobs, or pure >>> streaming >>> >>>>>>>>>>>>>>> are >>> >>>>>>>>>>>>>>> exactly the "runtime conditions/characteristics" I refer >>> to. >>> >>>>>>>>>>>>>>> All these >>> >>>>>>>>>>>>>>> define several runtime parameters, which in turn define >>> how >>> >>>>>>>>>>>>>>> well/badly >>> >>>>>>>>>>>>>>> will the pipeline perform and how many resources might be >>> >>>>>>>>>>>>>>> needed. From >>> >>>>>>>>>>>>>>> my point of view, pure streaming should be the most >>> resource >>> >>>>>>>>>>>>>>> demanding >>> >>>>>>>>>>>>>>> (if not, why bother with batch? why not run everything in >>> >>>>>>>>>>>>>>> streaming >>> >>>>>>>>>>>>>>> only? what will there remain to "unify"?). >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > Fortunately, for batch, only the state for a >>> single key >>> >>>>>>>>>>>>>>> needs to be >>> >>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys >>> across >>> >>>>>>>>>>>>>>> the range >>> >>>>>>>>>>>>>>> of skew. Of course if you have few or hot keys, one can >>> still >>> >>>>>>>>>>>>>>> have >>> >>>>>>>>>>>>>>> issues (and this is not specific to StatefulDoFns). >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Yes, but here is still the presumption that my stateful >>> DoFn can >>> >>>>>>>>>>>>>>> tolerate arbitrary shuffling of inputs. Let me explain >>> the use >>> >>>>>>>>>>>>>>> case in >>> >>>>>>>>>>>>>>> more detail. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Suppose you have input stream consisting of 1s and 0s >>> (and >>> >>>>>>>>>>>>>>> some key for >>> >>>>>>>>>>>>>>> each element, which is irrelevant for the >>> demonstration). Your >>> >>>>>>>>>>>>>>> task is >>> >>>>>>>>>>>>>>> to calculate in running global window the actual number >>> of >>> >>>>>>>>>>>>>>> changes >>> >>>>>>>>>>>>>>> between state 0 and state 1 and vice versa. When the >>> state >>> >>>>>>>>>>>>>>> doesn't >>> >>>>>>>>>>>>>>> change, you don't calculate anything. If input (for >>> given key) >>> >>>>>>>>>>>>>>> would be >>> >>>>>>>>>>>>>>> (tN denotes timestamp N): >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t1: 1 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t2: 0 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t3: 0 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t4: 1 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t5: 1 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t6: 0 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> then the output should yield (supposing that default >>> state is >>> >>>>>>>>>>>>>>> zero): >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t1: (one: 1, zero: 0) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t2: (one: 1, zero: 1) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t3: (one: 1, zero: 1) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t4: (one: 2, zero: 1) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t5: (one: 2, zero: 1) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> t6: (one: 2, zero: 2) >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> How would you implement this in current Beam semantics? >>> >>>>>>>>>>>>>> I think your saying here that I know that my input is >>> ordered >>> >>>>>>>>>>>>>> in a specific way and since I assume the order when >>> writing my >>> >>>>>>>>>>>>>> pipeline I can perform this optimization. But there is >>> nothing >>> >>>>>>>>>>>>>> preventing a runner from noticing that your processing in >>> the >>> >>>>>>>>>>>>>> global window with a specific type of trigger and >>> re-ordering >>> >>>>>>>>>>>>>> your inputs/processing to get better performance (since >>> you >>> >>>>>>>>>>>>>> can't use an AfterWatermark trigger for your pipeline in >>> >>>>>>>>>>>>>> streaming for the GlobalWindow). >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Today, if you must have a strict order, you must >>> guarantee that >>> >>>>>>>>>>>>>> your StatefulParDo implements the necessary "buffering & >>> >>>>>>>>>>>>>> sorting" into state. I can see why you would want an >>> annotation >>> >>>>>>>>>>>>>> that says I must have timestamp ordered elements, since it >>> >>>>>>>>>>>>>> makes writing certain StatefulParDos much easier. >>> StatefulParDo >>> >>>>>>>>>>>>>> is a low-level function, it really is the "here you go >>> and do >>> >>>>>>>>>>>>>> whatever you need to but here be dragons" function while >>> >>>>>>>>>>>>>> windowing and triggering is meant to keep many people from >>> >>>>>>>>>>>>>> writing StatefulParDo in the first place. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > Pipelines that fail in the "worst case" batch >>> scenario >>> >>>>>>>>>>>>>>> are likely to >>> >>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the >>> watermark >>> >>>>>>>>>>>>>>> falls >>> >>>>>>>>>>>>>>> behind in streaming mode as well. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> But the worst case is defined by input of size (available >>> >>>>>>>>>>>>>>> resources + >>> >>>>>>>>>>>>>>> single byte) -> pipeline fail. Although it could have >>> >>>>>>>>>>>>>>> finished, given >>> >>>>>>>>>>>>>>> the right conditions. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > This might be reasonable, implemented by default by >>> >>>>>>>>>>>>>>> buffering >>> >>>>>>>>>>>>>>> everything and releasing elements as the watermark >>> (+lateness) >>> >>>>>>>>>>>>>>> advances, >>> >>>>>>>>>>>>>>> but would likely lead to inefficient (though *maybe* >>> easier to >>> >>>>>>>>>>>>>>> reason >>> >>>>>>>>>>>>>>> about) code. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Sure, the pipeline will be less efficient, because it >>> would >>> >>>>>>>>>>>>>>> have to >>> >>>>>>>>>>>>>>> buffer and sort the inputs. But at least it will produce >>> >>>>>>>>>>>>>>> correct results >>> >>>>>>>>>>>>>>> in cases where updates to state are order-sensitive. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > Would it be roughly equivalent to GBK + >>> FlatMap(lambda >>> >>>>>>>>>>>>>>> (key, values): >>> >>>>>>>>>>>>>>> [(key, value) for value in values])? >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> I'd say roughly yes, but difference would be in the >>> trigger. >>> >>>>>>>>>>>>>>> The trigger >>> >>>>>>>>>>>>>>> should ideally fire as soon as watermark (+lateness) >>> crosses >>> >>>>>>>>>>>>>>> element >>> >>>>>>>>>>>>>>> with lowest timestamp in the buffer. Although this could >>> be >>> >>>>>>>>>>>>>>> somehow >>> >>>>>>>>>>>>>>> emulated by fixed trigger each X millis. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> > Or is the underlying desire just to be able to >>> hint to >>> >>>>>>>>>>>>>>> the runner >>> >>>>>>>>>>>>>>> that the code may perform better (e.g. require less >>> resources) >>> >>>>>>>>>>>>>>> as skew >>> >>>>>>>>>>>>>>> is reduced (and hence to order by timestamp iff it's >>> cheap)? >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> No, the sorting would have to be done in streaming case >>> as >>> >>>>>>>>>>>>>>> well. That is >>> >>>>>>>>>>>>>>> an imperative of the unified model. I think it is >>> possible to >>> >>>>>>>>>>>>>>> sort by >>> >>>>>>>>>>>>>>> timestamp only in batch case (and do it for *all* batch >>> >>>>>>>>>>>>>>> stateful pardos >>> >>>>>>>>>>>>>>> without annotation), or introduce annotation, but then >>> make >>> >>>>>>>>>>>>>>> the same >>> >>>>>>>>>>>>>>> guarantees for streaming case as well. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Jan >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> On 5/20/19 4:41 PM, Robert Bradshaw wrote: >>> >>>>>>>>>>>>>>>> On Mon, May 20, 2019 at 1:19 PM Jan Lukavský >>> >>>>>>>>>>>>>>>> <je...@seznam.cz> wrote: >>> >>>>>>>>>>>>>>>>> Hi Robert, >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> yes, I think you rephrased my point - although no >>> *explicit* >>> >>>>>>>>>>>>>>>>> guarantees >>> >>>>>>>>>>>>>>>>> of ordering are given in either mode, there is >>> *implicit* >>> >>>>>>>>>>>>>>>>> ordering in >>> >>>>>>>>>>>>>>>>> streaming case that is due to nature of the processing >>> - the >>> >>>>>>>>>>>>>>>>> difference >>> >>>>>>>>>>>>>>>>> between watermark and timestamp of elements flowing >>> through >>> >>>>>>>>>>>>>>>>> the pipeline >>> >>>>>>>>>>>>>>>>> are generally low (too high difference leads to the >>> >>>>>>>>>>>>>>>>> overbuffering >>> >>>>>>>>>>>>>>>>> problem), but there is no such bound on batch. >>> >>>>>>>>>>>>>>>> Fortunately, for batch, only the state for a single key >>> needs >>> >>>>>>>>>>>>>>>> to be >>> >>>>>>>>>>>>>>>> preserved at a time, rather than the state for all keys >>> >>>>>>>>>>>>>>>> across the >>> >>>>>>>>>>>>>>>> range of skew. Of course if you have few or hot keys, >>> one can >>> >>>>>>>>>>>>>>>> still >>> >>>>>>>>>>>>>>>> have issues (and this is not specific to StatefulDoFns). >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> As a result, I see a few possible solutions: >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> - the best and most natural seems to be >>> extension of >>> >>>>>>>>>>>>>>>>> the model, so >>> >>>>>>>>>>>>>>>>> that it defines batch as not only "streaming pipeline >>> >>>>>>>>>>>>>>>>> executed in batch >>> >>>>>>>>>>>>>>>>> fashion", but "pipeline with at least as good runtime >>> >>>>>>>>>>>>>>>>> characteristics as >>> >>>>>>>>>>>>>>>>> in streaming case, executed in batch fashion", I really >>> >>>>>>>>>>>>>>>>> don't think that >>> >>>>>>>>>>>>>>>>> there are any conflicts with the current model, or >>> that this >>> >>>>>>>>>>>>>>>>> could >>> >>>>>>>>>>>>>>>>> affect performance, because the required sorting (as >>> pointed by >>> >>>>>>>>>>>>>>>>> Aljoscha) is very probably already done during >>> translation >>> >>>>>>>>>>>>>>>>> of stateful >>> >>>>>>>>>>>>>>>>> pardos. Also note that this definition only affects >>> user >>> >>>>>>>>>>>>>>>>> defined >>> >>>>>>>>>>>>>>>>> stateful pardos >>> >>>>>>>>>>>>>>>> I don't see batch vs. streaming as part of the model. >>> One can >>> >>>>>>>>>>>>>>>> have >>> >>>>>>>>>>>>>>>> microbatch, or even a runner that alternates between >>> >>>>>>>>>>>>>>>> different modes. >>> >>>>>>>>>>>>>>>> The model describes what the valid outputs are given a >>> >>>>>>>>>>>>>>>> (sometimes >>> >>>>>>>>>>>>>>>> partial) set of inputs. It becomes really hard to define >>> >>>>>>>>>>>>>>>> things like >>> >>>>>>>>>>>>>>>> "as good runtime characteristics." Once you allow any >>> >>>>>>>>>>>>>>>> out-of-orderedness, it is not very feasible to try and >>> define >>> >>>>>>>>>>>>>>>> (and >>> >>>>>>>>>>>>>>>> more cheaply implement) a "upper bound" of acceptable >>> >>>>>>>>>>>>>>>> out-of-orderedness. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Pipelines that fail in the "worst case" batch scenario >>> are >>> >>>>>>>>>>>>>>>> likely to >>> >>>>>>>>>>>>>>>> degrade poorly (possibly catastrophically) when the >>> watermark >>> >>>>>>>>>>>>>>>> falls >>> >>>>>>>>>>>>>>>> behind in streaming mode as well. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> - another option would be to introduce >>> annotation for >>> >>>>>>>>>>>>>>>>> DoFns (e.g. >>> >>>>>>>>>>>>>>>>> @RequiresStableTimeCharacteristics), which would >>> result in >>> >>>>>>>>>>>>>>>>> the sorting >>> >>>>>>>>>>>>>>>>> in batch case - but - this extension would have to >>> ensure >>> >>>>>>>>>>>>>>>>> the sorting in >>> >>>>>>>>>>>>>>>>> streaming mode also - it would require definition of >>> allowed >>> >>>>>>>>>>>>>>>>> lateness, >>> >>>>>>>>>>>>>>>>> and triggger (essentially similar to window) >>> >>>>>>>>>>>>>>>> This might be reasonable, implemented by default by >>> buffering >>> >>>>>>>>>>>>>>>> everything and releasing elements as the watermark >>> (+lateness) >>> >>>>>>>>>>>>>>>> advances, but would likely lead to inefficient (though >>> >>>>>>>>>>>>>>>> *maybe* easier >>> >>>>>>>>>>>>>>>> to reason about) code. Not sure about the semantics of >>> >>>>>>>>>>>>>>>> triggering >>> >>>>>>>>>>>>>>>> here, especially data-driven triggers. Would it be >>> roughly >>> >>>>>>>>>>>>>>>> equivalent >>> >>>>>>>>>>>>>>>> to GBK + FlatMap(lambda (key, values): [(key, value) for >>> >>>>>>>>>>>>>>>> value in >>> >>>>>>>>>>>>>>>> values])? >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Or is the underlying desire just to be able to hint to >>> the >>> >>>>>>>>>>>>>>>> runner that >>> >>>>>>>>>>>>>>>> the code may perform better (e.g. require less >>> resources) as >>> >>>>>>>>>>>>>>>> skew is >>> >>>>>>>>>>>>>>>> reduced (and hence to order by timestamp iff it's >>> cheap)? >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> - last option would be to introduce these >>> "higher order >>> >>>>>>>>>>>>>>>>> guarantees" in >>> >>>>>>>>>>>>>>>>> some extension DSL (e.g. Euphoria), but that seems to >>> be the >>> >>>>>>>>>>>>>>>>> worst >>> >>>>>>>>>>>>>>>>> option to me >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> I see the first two options quite equally good, >>> although the >>> >>>>>>>>>>>>>>>>> letter one >>> >>>>>>>>>>>>>>>>> is probably more time consuming to implement. But it >>> would >>> >>>>>>>>>>>>>>>>> bring >>> >>>>>>>>>>>>>>>>> additional feature to streaming case as well. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Thanks for any thoughts. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Jan >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> On 5/20/19 12:41 PM, Robert Bradshaw wrote: >>> >>>>>>>>>>>>>>>>>> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský >>> >>>>>>>>>>>>>>>>>> <je...@seznam.cz> wrote: >>> >>>>>>>>>>>>>>>>>>> Hi Reuven, >>> >>>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> How so? AFAIK stateful DoFns work just fine in batch >>> >>>>>>>>>>>>>>>>>>>> runners. >>> >>>>>>>>>>>>>>>>>>> Stateful ParDo works in batch as far, as the logic >>> inside >>> >>>>>>>>>>>>>>>>>>> the state works for absolutely unbounded >>> out-of-orderness >>> >>>>>>>>>>>>>>>>>>> of elements. That basically (practically) can work >>> only >>> >>>>>>>>>>>>>>>>>>> for cases, where the order of input elements doesn't >>> >>>>>>>>>>>>>>>>>>> matter. But, "state" can refer to "state machine", >>> and any >>> >>>>>>>>>>>>>>>>>>> time you have a state machine involved, then the >>> ordering >>> >>>>>>>>>>>>>>>>>>> of elements would matter. >>> >>>>>>>>>>>>>>>>>> No guarantees on order are provided in *either* >>> streaming >>> >>>>>>>>>>>>>>>>>> or batch >>> >>>>>>>>>>>>>>>>>> mode by the model. However, it is the case that in >>> order to >>> >>>>>>>>>>>>>>>>>> make >>> >>>>>>>>>>>>>>>>>> forward progress most streaming runners attempt to >>> limit >>> >>>>>>>>>>>>>>>>>> the amount of >>> >>>>>>>>>>>>>>>>>> out-of-orderedness of elements (in terms of event >>> time vs. >>> >>>>>>>>>>>>>>>>>> processing >>> >>>>>>>>>>>>>>>>>> time) to make forward progress, which in turn could >>> help >>> >>>>>>>>>>>>>>>>>> cap the >>> >>>>>>>>>>>>>>>>>> amount of state that must be held concurrently, >>> whereas a >>> >>>>>>>>>>>>>>>>>> batch runner >>> >>>>>>>>>>>>>>>>>> may not allow any state to be safely discarded until >>> the whole >>> >>>>>>>>>>>>>>>>>> timeline from infinite past to infinite future has >>> been >>> >>>>>>>>>>>>>>>>>> observed. >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> Also, as pointed out, state is not preserved "batch to >>> >>>>>>>>>>>>>>>>>> batch" in batch mode. >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels >>> >>>>>>>>>>>>>>>>>> <m...@apache.org> wrote: >>> >>>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>>>>> batch semantics and streaming semantics >>> differs only >>> >>>>>>>>>>>>>>>>>>>> in that I can have GlobalWindow with default >>> trigger on >>> >>>>>>>>>>>>>>>>>>>> batch and cannot on stream >>> >>>>>>>>>>>>>>>>>>> You can have a GlobalWindow in streaming with a >>> default >>> >>>>>>>>>>>>>>>>>>> trigger. You >>> >>>>>>>>>>>>>>>>>>> could define additional triggers that do early >>> firings. >>> >>>>>>>>>>>>>>>>>>> And you could >>> >>>>>>>>>>>>>>>>>>> even trigger the global window by advancing the >>> watermark >>> >>>>>>>>>>>>>>>>>>> to +inf. >>> >>>>>>>>>>>>>>>>>> IIRC, as a pragmatic note, we prohibited global >>> window with >>> >>>>>>>>>>>>>>>>>> default >>> >>>>>>>>>>>>>>>>>> trigger on unbounded PCollections in the SDK because >>> this >>> >>>>>>>>>>>>>>>>>> is more >>> >>>>>>>>>>>>>>>>>> likely to be user error than an actual desire to have >>> no >>> >>>>>>>>>>>>>>>>>> output until >>> >>>>>>>>>>>>>>>>>> drain. But it's semantically valid in the model. >>> >>>