On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way. Bundles are
> implemented as a somewhat arbitrary set of elements between two consecutive
> checkpoints (there can be multiple bundles between checkpoints). A bundle
> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
> after the checkpoint barrier passes over the elements in the bundle (every
> bundle is finished at the very latest exactly before a checkpoint). But
> watermark propagation and bundle finalization is completely unrelated. This
> might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

It's not bundles that hold up processing, rather it is elements, and
elements are not considered "processed" until FinishBundle.

You are right about Flink. In many cases this is fine - if Flink rolls back
to the last checkpoint, the watermark will also roll back, and everything
stays consistent. So in general, one does not need to wait for checkpoints
for watermark propagation.

Where things get a bit weirder with Flink is whenever one has external side
effects. In theory, one should wait for checkpoints before letting a Sink
flush, otherwise one could end up with incorrect outputs (especially with a
sink like TextIO). Flink itself recognizes this, and that's why they
provide TwoPhaseCommitSinkFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
which
waits for a checkpoint. In Beam, this is the reason we introduced
RequiresStableInput. Of course in practice many Flink users don't do this -
in which case they are prioritizing latency over data correctness.

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <dev@beam.apache.org>
>>> wrote:
>>>
>>>> I've actually wondered about this specifically for streaming... if
>>>> you're writing a pipeline there it seems like you're often going to want to
>>>> put high fixed cost things like database connections even outside of the
>>>> bundle setup. You really only want to do that once in the lifetime of the
>>>> worker itself, not the bundle. Seems like having that boundary be somewhere
>>>> other than an arbitrarily (and probably small in streaming to avoid
>>>> latency) group of elements might be more useful? I suppose this depends
>>>> heavily on the object lifecycle in the sdk worker though.
>>>>
>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>>
>>> Users don't have other option than using timer.withOutputTimestamp for
>>> that, as we don't have a user-facing API to set watermark hold otherwise,
>>> thus the in-bundle caching implies stateful DoFn. The question might then
>>> by, why not use "classical" stateful caching involving state, as there is
>>> full control over the caching in user code. This triggered me an idea if it
>>> would be useful to add the information about caching to the API (e.g. in
>>> Java @StartBundle(caching=true)), which could solve the above issues maybe
>>> (runner would know to set the hold, it could work with "stateless" DoFns)?
>>>
>>
>> Really, this is one of the areas that the streaming/batch abstraction
>> leaks. In batch it was a common pattern to have local DoFn instance state
>> that persisted from start to finish bundle, and these were also used as
>> convenient entry points for other operations (like opening
>> database connections) 'cause bundles were often "as large as possible."
>> WIth the advent of n streaming it makes sense to put this in
>> explicitly managed runner state to allow for cross-bundle amortization and
>> there's more value in distinguishing between @Setup and @StartBundle.
>>
>> (Were I do to things over I'd probably encourage an API that discouraged
>> non-configuration instance state on DoFns altogether, e.g. in the notion of
>> Python context managers (and an equivalent API could probably be put
>> together with AutoClosables in Java) one would have something like
>>
>> ParDo(X)
>>
>> which would logically (though not necessarily physically) lead to an
>> execution like
>>
>> with X.bundle_processor() as bundle_processor:
>>   for bundle in bundles:
>>     with bundle_processor.element_processor() as process:
>>       for element in bundle:
>>         process(element)
>>
>> where the traditional setup/start_bundle/finish_bundle/teardown logic
>> would live in the __enter__ and __exit__ methods (made even easier with
>> coroutines.) For convenience one could of course provide a raw bundle
>> processor or element processor to ParDo if the enter/exit contexts are
>> trivial. But this is getting somewhat off-topic...
>>
>>
>>>
>>>>
>>>> Best,
>>>> B
>>>>
>>>> On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> (I notice that you replied only to yourself, but there has been a
>>>>> whole thread of discussion on this - are you subscribed to dev@beam?
>>>>> https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
>>>>>
>>>>> It sounds like you want what everyone wants: to have the biggest
>>>>> bundles possible.
>>>>>
>>>>> So for bounded data, basically you make even splits of the data and
>>>>> each split is one bundle. And then dynamic splitting to redistribute work
>>>>> to eliminate stragglers, if your engine has that capability.
>>>>>
>>>>> For unbounded data, you more-or-less bundle as much as you can without
>>>>> waiting too long, like Jan described.
>>>>>
>>>>> Users know to put their high fixed costs in @StartBundle and then it
>>>>> is the runner's job to put as many calls to @ProcessElement as possible to
>>>>> amortize.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Fri, Sep 22, 2023 at 9:39 AM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Whoops, I typoed my last email. I meant to write "this isn't the
>>>>>> greatest strategy for high *fixed* cost transforms", e.g. a
>>>>>> transform that takes 5 minutes to get set up and then maybe a microsecond
>>>>>> per input
>>>>>>
>>>>>> I suppose one solution is to move the responsibility for handling
>>>>>> this kind of situation to the user and expect users to use a bundling
>>>>>> transform (e.g. BatchElements [1]) followed by a Reshuffle+FlatMap. Is 
>>>>>> this
>>>>>> what other runners expect? Just want to make sure I'm not missing some
>>>>>> smart generic bundling strategy that might handle this for users.
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
>>>>>>
>>>>>>
>>>>>> On Thu, Sep 21, 2023 at 7:23 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Writing a runner and the first strategy for determining bundling
>>>>>>> size was to just start with a bundle size of one and double it until we
>>>>>>> reach a size that we expect to take some targets per-bundle runtime 
>>>>>>> (e.g.
>>>>>>> maybe 10 minutes). I realize that this isn't the greatest strategy for 
>>>>>>> high
>>>>>>> sized cost transforms. I'm curious what kind of strategies other runners
>>>>>>> take?
>>>>>>>
>>>>>>

Reply via email to