Hi everyone, >From the mailing list, I see this question asked a lot. But I can't seem to find a solution to my problem. I would appreciate some help.
The requirement for our project is that we do not lose data, and not produce duplicate records. Our pipelines are written with Apache Beam (2.35.0) and run on a single Flink (1.13.1) node (for now, while we transition to Kubernetes). We read a topic from Kafka, join it with another topic, and output the result in a third topic. When we introduce an exception artificially into the pipeline (by listening to a debug topic that throws an exception on every message it gets), we observe that restarting the pipeline from the last checkpoint does not pick up where it left off. I'm not really sure why... On the Beam side, the pipeline is configured with .withReadCommited().commitOffesetsInFinalize() and enable.auto.commit is set to false for the consumer, and with .withEOS(1, "sink-group-name"). On the Flink side, --externilizeCheckpointsEnabled is set to true, and --checkpointInterval is set to 1minute. I let the pipeline run for 4 checkpoints. Between checkpoint #2 and #3, I observe that the kafka consumer group of the main topic which started from the start of the topic has already reached the end. I trigger the exception between checkpoint #4 and #5 and the pipeline stops because --numberOfExecutionRetries=2. When I restart the pipeline and specify the metadata file in the chk-4 directory, I would expect the pipeline to continue processing the items still pending to be processed (estimated to about 80k of them) after checkpoint #4. Unfortunately, no item is processed. Nothing is read from kafka. The pipeline just sits around waiting for new messages in the main topic. Could anyone help me figure out what's going on? I'm sure it's a user mistake, but I'm unsure how to debug it. Cheers, Cristian