Hi Jean-Marc, (Thanks, Yaroslav for your excellent answer 😊)
Before I come back to your question, I would like to mention an important consideration that you might have overseen: If you replicate your Kafka cluster by means of e.g. MirrorMaker, the offsets on the replicated cluster will be much different per topic partition from those of the original cluster! Hence, using the original offset will be in vain or even lead to data glitches … In addition to what Yaroslav has written: * If you ever only are interested in the latest committed checkpoint/savepoint, you can enable your source connectors to record the offsets per consumer group into the __consumer_offsets topic, and then read it from the topic. (cf. to https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kafka/#consumer-offset-committing ) * Also, when you restart your job from the original savepoint, change the consumer group of your source connectors, such that Flink source connector considers your configuration instead of what is recorded in the savepoint (no need to patch the savepoint). Also set https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/#execution-state-recovery-ignore-unclaimed-state , otherwise Flink will complain about state in the savepoint that is not consumed by any operator on init. Back to your original request, I’m not sure of your scenario? * If this is a one-time migration to another Kafka cluster you have option of copying all files over to the new cluster (I’ve never done this before) * If your scenario is failover in case of disruption * You can create a federated cluster over 2 data centers * Configure replication such that at least one copy exists in every datacenter * Configure consumers to prefer the datacenter that is closer * Similar for producers (the leader partition needs to be in the right datacenter) * This would resolve the offset question for good … I hope this helps Sincere greetings Thias From: Yaroslav Tkachenko <[email protected]> Sent: Wednesday, April 1, 2026 8:18 PM To: [email protected] Subject: [External] Re: Q: Trying to read the kafka offset stored in a savepoint ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠Hi JM, Here's the snippet for you: https://gist.github.com/sap1ens/6b894c52279b6d2339e50dde35070013 First, you need to get an operator UID hash for your Kafka source. ReadSavepoint.java shows how you can do that using the savepoint_metadata table function. ReadKafkaSourceState.java shows how to use the State Processor API to read KafkaPartitionSplit stored in a savepoint. You'd get an output like this (my consumer read the messages up to offset 199 for partition 0): > KafkaPartitionSplitState{topic='loadtest.json', partition=0, > startingOffset=200, stoppingOffset=NONE} Note: I'm using Flink 2.1.1 and Kafka connector 4.0.1-2.0, but I hope the changes are minimal for 1.x (if any). Hope this helps. On Wed, Apr 1, 2026 at 4:42 AM Jean-Marc Paulin <[email protected]<mailto:[email protected]>> wrote: Hi, We are using Flink 1.20.3 and the flink connector 3.4.0-1.20. it's a streaming application using kafka source and sink. Was wondering if anyone here has a tip/sample to read the content of the savepoints to extract the kafka offset per partition. The scenario is around resuming from a replicated cluster. We plan to replicate kafka as well as the savepoints. What we do not know is if the latest savepoint that we replicated contains the kafka offset that flink would resume from with the latest savepoint, or if we need to resume from a previous savepoint. Thanks for any tips. JM Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.
