Hi Flink users,

Greetings. I have a question on how Flink invokes checkpoints with a slow
pipeline.

I have a Beam streaming pipeline with one Map() call. It is a Python
program running on Flink with PortableRunner. I’ve experimented with
varying amounts of sleep inside this call to simulate slowness. The
pipeline reads from Kafka, windows into 1-minute fixed windows, and writes
to a file. The pipeline parallelism is 1, and bundle size is 2. Checkpoint
interval is 30s and timeout is 1min.

I post messages to Kafka with kcat utility. The messages are all 32 bytes,
but I can vary the number of messages posted.

With sleep() < 0.6 seconds, i.e. a fast pipeline, I see checkpoints getting
started even when Kafka backlog > 0, i.e. when all the Kafka messages are
not fully drained.

However, with longer sleep() i.e. slower pipeline, I don’t see a checkpoint
getting started until the backlog goes all the way down to 0. I also don’t
see a “Received barrier” message until backlog gets to zero.

Annotated example logs later below. I’m happy to provide additional details
and logs, or run experiments on my setup.


My question is: what causes a fast pipeline to be able to start checkpoints
even when there are outstanding Kafka messages, but this fails on a slow
pipeline?

Thanks,

Deepak

# Posting to Kafka

2022-05-03 09:53:37,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] -
Reader-0:  backlog 17157

# Checkpoint was triggered

2022-05-03 09:53:38,924 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Trigger
checkpoint 2@1651596818922 for fb40570da5f4dd41e458af269c1a2eaf.

# Messages were slowly getting drained

# ...

2022-05-03 09:53:53,184 DEBUG
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader            [] -
Reader-0:  backlog 11696

# Even with non-zero backlog, I saw a checkpoint getting triggered (this is
what I want!)

2022-05-03 09:53:53,369 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Starting
checkpoint (2) CHECKPOINT on task Source:
ReadMessages/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)
-> Flat Map -> Map -> [1]ReadMessages/Remove Kafka Metadata ->
[7]{CreateKafkaRecord, Process1, Window, Write to file} -> ToKeyedWorkItem
(1/1)#0

# Q: I cannot get Beam/Flink to behave this way, i.e. trigger checkpoint
with non-zero backlog,

#   with slower pipelines (sleep() > 0.6s). Why?

Reply via email to