Hi Flink users, Greetings. I have a question on how Flink invokes checkpoints with a slow pipeline.
I have a Beam streaming pipeline with one Map() call. It is a Python program running on Flink with PortableRunner. I’ve experimented with varying amounts of sleep inside this call to simulate slowness. The pipeline reads from Kafka, windows into 1-minute fixed windows, and writes to a file. The pipeline parallelism is 1, and bundle size is 2. Checkpoint interval is 30s and timeout is 1min. I post messages to Kafka with kcat utility. The messages are all 32 bytes, but I can vary the number of messages posted. With sleep() < 0.6 seconds, i.e. a fast pipeline, I see checkpoints getting started even when Kafka backlog > 0, i.e. when all the Kafka messages are not fully drained. However, with longer sleep() i.e. slower pipeline, I don’t see a checkpoint getting started until the backlog goes all the way down to 0. I also don’t see a “Received barrier” message until backlog gets to zero. Annotated example logs later below. I’m happy to provide additional details and logs, or run experiments on my setup. My question is: what causes a fast pipeline to be able to start checkpoints even when there are outstanding Kafka messages, but this fails on a slow pipeline? Thanks, Deepak # Posting to Kafka 2022-05-03 09:53:37,184 DEBUG org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0: backlog 17157 # Checkpoint was triggered 2022-05-03 09:53:38,924 DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Trigger checkpoint 2@1651596818922 for fb40570da5f4dd41e458af269c1a2eaf. # Messages were slowly getting drained # ... 2022-05-03 09:53:53,184 DEBUG org.apache.beam.sdk.io.kafka.KafkaUnboundedReader [] - Reader-0: backlog 11696 # Even with non-zero backlog, I saw a checkpoint getting triggered (this is what I want!) 2022-05-03 09:53:53,369 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Starting checkpoint (2) CHECKPOINT on task Source: ReadMessages/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource) -> Flat Map -> Map -> [1]ReadMessages/Remove Kafka Metadata -> [7]{CreateKafkaRecord, Process1, Window, Write to file} -> ToKeyedWorkItem (1/1)#0 # Q: I cannot get Beam/Flink to behave this way, i.e. trigger checkpoint with non-zero backlog, # with slower pipelines (sleep() > 0.6s). Why?