Hi there, to me the simplest and most reliable solution still seems to be to split the stream based on event time. It requires a bit of preparation and assume that you can tolerate some downtime when doing the migration.
1) For Cloud1 you chain a filter to your sources that filters out any records with a timestamp > t_migration. Best you make this timestamp configurable. 2) For Cloud2, you chain a filter to your sources that filters out any records with timestamp <= t_migration. Also configurable. 3) When you do the migration you configure t_migration to be, let's say 1 hour in the future. You let the Job run in Cloud1 until you are sure that no more data with an event timestamp <= t_migration will arrive. You take a savepoint. 4) You start your application in cloud2 with the same value for t_migration and manually configured Kafka offsets for which you are sure they contain all records with a timestamp > t_migration. Could this work for you? Cheers, Konstantin Am Mi., 4. Mai 2022 um 22:26 Uhr schrieb Andrew Otto <o...@wikimedia.org>: > Have you tried MirrorMaker 2's consumer offset translation feature? I > have not used this myself, but it sounds like what you are looking for! > https://issues.apache.org/jira/browse/KAFKA-9076 > > https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html > https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/ > > I tried to find some better docs to link for you, but that's the best I > got :) It looks like there is just the Java API. > > > > On Wed, May 4, 2022 at 3:29 PM Hemanga Borah <borah.hema...@gmail.com> > wrote: > >> Thank you for the suggestions, guys! >> >> @Austin Cawley-Edwards >> Your idea is spot on! This approach would surely work. We could take a >> savepoint of each of our apps, load it using state processor apis and >> create another savepoint accounting for the delta on the offsets, and start >> the app on the new cloud using this modified savepoint. >> However, the solution will not be generic, and we have to do this for >> each of our applications. This can be quite cumbersome as we have several >> applications (around 25). >> >> We are thinking of overriding the FlinkKafkaConsumerBase to account for >> the offset deltas during the start-up of any app. Do you think it is safe >> to do that? Is there a better way of doing this? >> >> @Schwalbe Matthias >> Thank you for your suggestion. We do use exactly-once semantics, but, our >> apps can tolerate a few duplicates in rare cases like this one where we are >> migrating clouds. However, your suggestion is really helpful and we will >> use it in case some of the apps cannot tolerate duplicate data. >> >> >> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias < >> matthias.schwa...@viseca.ch> wrote: >> >>> Hello Hemanga, >>> >>> >>> >>> MirrorMaker can cause havoc in many respects, for one, it does not have >>> strict exactly-once.semantics… >>> >>> >>> >>> The way I would tackle this problem (and have done in similar >>> situaltions): >>> >>> >>> >>> - For the source topics that need to be have exactly-once-semantics >>> and that are not intrinsically idempotent: >>> - Add one extra operator after the source that deduplicates events >>> by unique id for a rolling time range (on the source cloud provider) >>> - Take a savepoint after the rolling time-range has passed (at least >>> once completely) >>> - Move your job to the target cloud provider >>> - Reconfigure the resp. source with a new kafka consumer group.id, >>> - Change the uid() of the resp. kafka source, >>> - Configure start-by-timestamp for the resp. source with a timestamp >>> that lies within the rolling time range (of above) >>> - Configure the job to ignore recovery for state that does not have >>> a corresponding operator in the job (the previous kafka source uid()s) >>> - Start the job on new cloud provider, wait for it to pick >>> up/back-fill >>> - Take a savepoint >>> - Remove deduplication operator if that causes too much >>> load/latency/whatever >>> >>> >>> >>> This scheme sounds more complicated than it really is … and has saved my >>> sanity quite a number of times 😊 >>> >>> >>> >>> Good luck and ready to answer more details >>> >>> >>> >>> Thias >>> >>> >>> >>> *From:* Hemanga Borah <borah.hema...@gmail.com> >>> *Sent:* Tuesday, May 3, 2022 3:12 AM >>> *To:* user@flink.apache.org >>> *Subject:* Migrating Flink apps across cloud with state >>> >>> >>> >>> Hello, >>> We are attempting to port our Flink applications from one cloud >>> provider to another. >>> >>> These Flink applications consume data from Kafka topics and output to >>> various destinations (Kafka or databases). The applications have states >>> stored in them. Some of these stored states are aggregations, for example, >>> at times we store hours (or days) worth of data to aggregate over time. >>> Some other applications have cached information for data enrichment, for >>> example, we store data in Flink state for days, so that we can join them >>> with newly arrived data. The amount of data on the input topics is a lot, >>> and it will be expensive to reprocess the data from the beginning of the >>> topic. >>> >>> As such, we want to retain the state of the application when we move to >>> a different cloud provider so that we can retain the aggregations and >>> cache, and do not have to start from the beginning of the input topics. >>> >>> We are replicating the Kafka topics using MirrorMaker 2. This is our >>> procedure: >>> >>> - Replicate the input topics of each Flink application from source >>> cloud to destination cloud. >>> - Take a savepoint of the Flink application on the source cloud >>> provider. >>> - Start the Flink application on the destination cloud provider >>> using the savepoint from the source cloud provider. >>> >>> >>> However, this does not work as we want because there is a difference in >>> offset in the new topics in the new cloud provider (because of MirrorMaker >>> implementation). The offsets of the new topic do not match the ones stored >>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new >>> topic during startup. >>> >>> Has anyone tried to move clouds while retaining the Flink state? >>> >>> Thanks, >>> Hemanga >>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>> dieser Informationen ist streng verboten. >>> >>> This message is intended only for the named recipient and may contain >>> confidential or privileged information. As the confidentiality of email >>> communication cannot be guaranteed, we do not accept any responsibility for >>> the confidentiality and the intactness of this message. If you have >>> received it in error, please advise the sender by return e-mail and delete >>> this message and any attachments. Any unauthorised use or dissemination of >>> this information is strictly prohibited. >>> >> -- https://twitter.com/snntrable https://github.com/knaufk