Hi Stefan,

the settings in the runner works differently than how you might expect. According to the Beam model, a "bundle" is a unit of work, that either completes successfully as a whole or is restarted from scratch (i.e. it is perfectly fine for the bundle to be processed to the middle, then fail and be restarted from the beginning). Some runners use bundles for fault tolerance (i.e. committing each bundle separately), but that is not the case for Flink. Flink uses checkpointing barrier flowing from sources to sinks and persisting any state on the way. Having said that - any bundle that is started between checkpoint N and finishes before checkpoint N+1 completes will be restarted in case the job fails before checkpoint N+1 succeeds.

Therefore, settings bundle sizes in Flink has mostly the effect of how often you get called @StartBundle and @FinishBundle life-cycle methods, but does not (directly) influence latency.

Hope this helps,

 Jan

On 6/6/24 19:48, Ahmet Altay via dev wrote:
I sent you an invite for Slack. Welcome to the Beam community.

On Thu, Jun 6, 2024 at 10:47 AM Pedratscher Stefan <stefan.pedratsc...@student.uibk.ac.at> wrote:

    Dear Apache Beam Team,

    I am a student at the University of Innsbruck, currently using Apache
    Beam for my research.

    I am using Flink v1.19 and the Flink runner
    (https://beam.apache.org/documentation/runners/flink/) to execute
    pipelines. I managed to run the pipeline with several options:
    - parallelism
    - operatorChaining
    - attachedMode
    - ...
    All of these options were applied correctly, and I could immediately
    see their effects.

    However, the "maxBundleTimeMills" and "maxBundleSize" settings do not
    seem to work properly. I tried setting them, but I observed no effect
    when gathering metrics. I created an artificial pipeline where I set
    maxBundleTimeMills=100000 (100 seconds) and maxBundleSize=10000
    elements. I configured a Kafka topic that gets 1 element/second. I
    expected to receive a whole bundle of messages after 100 seconds.
    However, while measuring, I noticed that I actually receive 1 message
    per second continuously, indicating that these two settings are not
    being applied correctly. Am I right or missing something?

    Could you provide me with assistance in solving this issue?

    I also tried different Flink versions (e.g., 1.16.3).

    It would be wonderful if I could join the Slack community. Could you
    provide me with an invitation, or is it restricted to @apache.org
    <http://apache.org> users?

    Thank you very much for your help, and I look forward to your reply.

    Best regards,
    Stefan Pedratscher

Reply via email to