Oh neat, Preserving Keys. I didn't think we had/provided a mechanism for declaring that.
Good doc. I do know there's no annotation for FinishBundle. It's generally optional and SDK side only as a concern. Finalize Bundle is a different mechanism which does have an annotation, and requires both SDK and runner support to trigger after a bundle has been committed/checkpointed. On Tue, Sep 26, 2023, 8:54 AM Kenneth Knowles <k...@apache.org> wrote: > > > On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Kenn and Reuven, >> >> I agree with all these points. The only issue here seems to be that >> FlinkRunner does not fulfill these constraints. This is a bug that can be >> fixed, though we need to change some defaults, as 1000 ms default bundle >> "duration" for lower traffic Pipelines can be too much. We are also >> probably missing some @ValidatesReunner tests for this. I created [1] and >> [2] to track this. >> >> One question still remains, the bundle vs. element life-cycle is relevant >> only for cases where processing of element X can affect processing of >> element Y later in the same bundle. Once this influence is rules out (i.e. >> no caching), this information can result in runner optimization that yields >> better performance. Should we consider propagate this information from user >> code to the runner? >> > Yes! > > This was the explicit goal of the move to annotation-driven DoFn in > https://s.apache.org/a-new-dofn to make it so that the SDK and runner can > get good information about what the DoFn requirements are. > > When there is no @FinishBundle method, the runner can make additional > optimizations. This should have been included in the ParDoPayload in the > proto when we moved to portable pipelines. I cannot remember if there was a > good reason that we did not do so. Maybe we (incorrectly) thought that this > was an issue that only the Java SDK harness needed to know about. > > Kenn > > >> [1] https://github.com/apache/beam/issues/28649 >> >> [2] https://github.com/apache/beam/issues/28650 >> On 9/25/23 18:31, Reuven Lax via dev wrote: >> >> >> >> On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> >>> On 9/23/23 18:16, Reuven Lax via dev wrote: >>> >>> Two separate things here: >>> >>> 1. Yes, a watermark can update in the middle of a bundle. >>> 2. The records in the bundle themselves will prevent the watermark from >>> updating as they are still in flight until after finish bundle. Therefore >>> simply caching the records should always be watermark safe, regardless of >>> the runner. You will only run into problems if you try and move timestamps >>> "backwards" - which is why Beam strongly discourages this. >>> >>> This is not aligned with FlinkRunner's implementation. And I actually >>> think it is not aligned conceptually. As mentioned, Flink does not have >>> the concept of bundles at all. It achieves fault tolerance via >>> checkpointing, essentially checkpoint barrier flowing from sources to >>> sinks, safely snapshotting state of each operator on the way. Bundles are >>> implemented as a somewhat arbitrary set of elements between two consecutive >>> checkpoints (there can be multiple bundles between checkpoints). A bundle >>> is 'committed' (i.e. persistently stored and guaranteed not to retry) only >>> after the checkpoint barrier passes over the elements in the bundle (every >>> bundle is finished at the very latest exactly before a checkpoint). But >>> watermark propagation and bundle finalization is completely unrelated. This >>> might be a bug in the runner, but requiring checkpoint for watermark >>> propagation will introduce insane delays between processing time and >>> watermarks, every executable stage will delay watermark propagation until a >>> checkpoint (which is typically the order of seconds). This delay would add >>> up after each stage. >>> >> >> It's not bundles that hold up processing, rather it is elements, and >> elements are not considered "processed" until FinishBundle. >> >> You are right about Flink. In many cases this is fine - if Flink rolls >> back to the last checkpoint, the watermark will also roll back, and >> everything stays consistent. So in general, one does not need to wait for >> checkpoints for watermark propagation. >> >> Where things get a bit weirder with Flink is whenever one has external >> side effects. In theory, one should wait for checkpoints before letting a >> Sink flush, otherwise one could end up with incorrect outputs (especially >> with a sink like TextIO). Flink itself recognizes this, and that's why they >> provide TwoPhaseCommitSinkFunction >> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> >> which >> waits for a checkpoint. In Beam, this is the reason we introduced >> RequiresStableInput. Of course in practice many Flink users don't do this - >> in which case they are prioritizing latency over data correctness. >> >>> >>> Reuven >>> >>> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is >>>> committed, as there's no guarantee that this work won't be discarded. >>>> >>>> There was a thread [1], where the conclusion seemed to be that updating >>>> watermark is possible even in the middle of a bundle. Actually, handling >>>> watermarks is runner-dependent (e.g. Flink does not store watermarks in >>>> checkpoints, they are always recomputed from scratch on restore). >>>> >>>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv >>>> On 9/22/23 21:47, Robert Bradshaw via dev wrote: >>>> >>>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote: >>>>> >>>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> I've actually wondered about this specifically for streaming... if >>>>>> you're writing a pipeline there it seems like you're often going to want >>>>>> to >>>>>> put high fixed cost things like database connections even outside of the >>>>>> bundle setup. You really only want to do that once in the lifetime of the >>>>>> worker itself, not the bundle. Seems like having that boundary be >>>>>> somewhere >>>>>> other than an arbitrarily (and probably small in streaming to avoid >>>>>> latency) group of elements might be more useful? I suppose this depends >>>>>> heavily on the object lifecycle in the sdk worker though. >>>>>> >>>>> >>>>> +1. This is the difference between @Setup and @StartBundle. The >>>>> start/finish bundle operations should be used for bracketing element >>>>> processing that must be committed as a unit for correct failure recovery >>>>> (e.g. if elements are cached in ProcessElement, they should all be emitted >>>>> in FinishBundle). On the other hand, things like open database connections >>>>> can and likely should be shared across bundles. >>>>> >>>>> This is correct, but the caching between @StartBundle and >>>>> @FinishBundle has some problems. First, users need to manually set >>>>> watermark hold for min(timestamp in bundle), otherwise watermark might >>>>> overtake the buffered elements. >>>>> >>>> >>>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is >>>> committed, as there's no guarantee that this work won't be discarded. >>>> >>>> >>>>> Users don't have other option than using timer.withOutputTimestamp for >>>>> that, as we don't have a user-facing API to set watermark hold otherwise, >>>>> thus the in-bundle caching implies stateful DoFn. The question might then >>>>> by, why not use "classical" stateful caching involving state, as there is >>>>> full control over the caching in user code. This triggered me an idea if >>>>> it >>>>> would be useful to add the information about caching to the API (e.g. in >>>>> Java @StartBundle(caching=true)), which could solve the above issues maybe >>>>> (runner would know to set the hold, it could work with "stateless" DoFns)? >>>>> >>>> >>>> Really, this is one of the areas that the streaming/batch abstraction >>>> leaks. In batch it was a common pattern to have local DoFn instance state >>>> that persisted from start to finish bundle, and these were also used as >>>> convenient entry points for other operations (like opening >>>> database connections) 'cause bundles were often "as large as possible." >>>> WIth the advent of n streaming it makes sense to put this in >>>> explicitly managed runner state to allow for cross-bundle amortization and >>>> there's more value in distinguishing between @Setup and @StartBundle. >>>> >>>> (Were I do to things over I'd probably encourage an API that >>>> discouraged non-configuration instance state on DoFns altogether, e.g. in >>>> the notion of Python context managers (and an equivalent API could probably >>>> be put together with AutoClosables in Java) one would have something like >>>> >>>> ParDo(X) >>>> >>>> which would logically (though not necessarily physically) lead to an >>>> execution like >>>> >>>> with X.bundle_processor() as bundle_processor: >>>> for bundle in bundles: >>>> with bundle_processor.element_processor() as process: >>>> for element in bundle: >>>> process(element) >>>> >>>> where the traditional setup/start_bundle/finish_bundle/teardown logic >>>> would live in the __enter__ and __exit__ methods (made even easier with >>>> coroutines.) For convenience one could of course provide a raw bundle >>>> processor or element processor to ParDo if the enter/exit contexts are >>>> trivial. But this is getting somewhat off-topic... >>>> >>>> >>>>> >>>>>> >>>>>> Best, >>>>>> B >>>>>> >>>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> (I notice that you replied only to yourself, but there has been a >>>>>>> whole thread of discussion on this - are you subscribed to dev@beam? >>>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd) >>>>>>> >>>>>>> It sounds like you want what everyone wants: to have the biggest >>>>>>> bundles possible. >>>>>>> >>>>>>> So for bounded data, basically you make even splits of the data and >>>>>>> each split is one bundle. And then dynamic splitting to redistribute >>>>>>> work >>>>>>> to eliminate stragglers, if your engine has that capability. >>>>>>> >>>>>>> For unbounded data, you more-or-less bundle as much as you can >>>>>>> without waiting too long, like Jan described. >>>>>>> >>>>>>> Users know to put their high fixed costs in @StartBundle and then it >>>>>>> is the runner's job to put as many calls to @ProcessElement as possible >>>>>>> to >>>>>>> amortize. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the >>>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a >>>>>>>> transform that takes 5 minutes to get set up and then maybe a >>>>>>>> microsecond >>>>>>>> per input >>>>>>>> >>>>>>>> I suppose one solution is to move the responsibility for handling >>>>>>>> this kind of situation to the user and expect users to use a bundling >>>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is >>>>>>>> this >>>>>>>> what other runners expect? Just want to make sure I'm not missing some >>>>>>>> smart generic bundling strategy that might handle this for users. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran < >>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>> >>>>>>>>> Writing a runner and the first strategy for determining bundling >>>>>>>>> size was to just start with a bundle size of one and double it until >>>>>>>>> we >>>>>>>>> reach a size that we expect to take some targets per-bundle runtime >>>>>>>>> (e.g. >>>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy >>>>>>>>> for high >>>>>>>>> sized cost transforms. I'm curious what kind of strategies other >>>>>>>>> runners >>>>>>>>> take? >>>>>>>>> >>>>>>>>