Hi everyone,

I'm trying to figure out how pipeline state works with Beam running on
Flink Classic. Would appreciate some help with the below.

My understanding is that on recovery (whether from a checkpoint or
savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
with whatever state they had when the pipeline crashed. For example the
Kafka operator might contain the latest *safe* offset to restart from. But
I'm not seeing this when I introduce exceptions in the pipeline.

My pipeline is as follows:
1. Read a Kafka topic from start
2. Have a DoFn that stores all incoming messages in a BagState
3. Above DoFn triggers a timer set in such a way that it triggers after
there are a few checkpoints created and kept because of
--externalizeCheckpointsEnabled = true. This timer handler then outputs the
elements to the next operator, in this case KafkaIo.Write.
4. Before the timer in #3 is executed manually trigger an exception (listen
to another kafka topic, and throw any time a new message comes in)

What I observe:
1. In #4 above Flink tries to process the exception twice then stops the
pipeline (because numberOfExecutionRetries =2 )
2. After the pipeline is stopped, I see the checkpoints are kept in the
configured directory
3. If I restart the pipeline (with --savepointPath = <path to latest
checkpoint from first run>):
3a. No messages are read from kafka, because the Kafka reader reached the
end of the topic during the first run
3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
isn't even started
3c. The timer in #3 is never executed, hence there is data loss as the
elements I had in my DoFn state are never processed
4. If I manually reset the offset to the start of the topic and restart the
pipeline (with --savepointPath = <path to latest checkpoint from first
run>):
4a. StartBundle methods are called
4b. In ProcessElement, the BagState is empty on the first received message.
If I'm restoring from a checkpoint/savepoint, I would expect this state to
be filled.

Is this correct behaviour? Am I doing something wrong?

Thanks,
Cristian

Other quirks I found:
a. If KafkaIO.Read is configured to read from the latest offset, and there
is an exception thrown in the pipeline before the first checkpoint happens
(let's say on the first message that comes in), when Flink retries KafkaIO
reads from the latest offset again. That means that the message that caused
the exception is not reprocessed. On the other hand, if the exception is
thrown after the first checkpoint, that message will be tried twice
(because numberOfExecutionRetries =2 ), and then the pipeline will exit. I
think this is working as designed but it feels a little weird that the
behaviour is different depending if there's a checkpoint or not.

b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
there is an exception thrown in the pipeline, the Flink job doesn't exit. I
think there is a kafka producer in KafkaExactlyOnceSink that is not closed
correctly.

Reply via email to