Flink operators are long-running classes with life-cycle of open() and close(), so any amortization can be done between those methods, see [1]. Essentially, it could be viewed that in vanilla Flink the complete (unbounded) input is single "bundle". The crucial point is that state is checkpointed while this "bundle" is open.

 Jan

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/internals/task_lifecycle/

On 9/22/23 15:21, Kenneth Knowles wrote:
What is the best way to amortize heavy operations across elements in Flink? (that is what bundles are for, basically)

On Fri, Sep 22, 2023 at 5:09 AM Jan Lukavský <je...@seznam.cz> wrote:

    Flink defines bundles in terms of number of elements and
    processing time, by default 1000 elements or 1000 milliseconds,
    whatever happens first. But bundles are not a "natural" concept in
    Flink, it uses them merely to comply with the Beam model. By
    default, checkpoints are unaligned with bundles.

     Jan

    On 9/22/23 01:58, Robert Bradshaw via dev wrote:
    Dataflow uses a work-stealing protocol. The FnAPI has a protocol
    to ask the worker to stop at a certain element that has already
    been sent.

    On Thu, Sep 21, 2023 at 4:24 PM Joey Tran
    <joey.t...@schrodinger.com> wrote:

        Writing a runner and the first strategy for determining
        bundling size was to just start with a bundle size of one and
        double it until we reach a size that we expect to take some
        targets per-bundle runtime (e.g. maybe 10 minutes). I realize
        that this isn't the greatest strategy for high sized cost
        transforms. I'm curious what kind of strategies other runners
        take?

Reply via email to