Hi there,

to me the simplest and most reliable solution still seems to be to split
the stream based on event time. It requires a bit of preparation and assume
that you can tolerate some downtime when doing the migration.

1) For Cloud1 you chain a filter to your sources that filters out any
records with a timestamp >  t_migration. Best you make this timestamp
configurable.
2) For Cloud2, you chain a filter to your sources that filters out any
records with timestamp <= t_migration.  Also configurable.
3) When you do the migration you configure t_migration to be, let's say 1
hour in the future. You let the Job run in Cloud1 until you are sure that
no more data with an event timestamp <= t_migration will arrive. You take a
savepoint.
4) You start your application in cloud2 with the same value for t_migration
and manually configured Kafka offsets for which you are sure they contain
all records with a timestamp > t_migration.

Could this work for you?

Cheers,

Konstantin




Am Mi., 4. Mai 2022 um 22:26 Uhr schrieb Andrew Otto <o...@wikimedia.org>:

> Have you tried MirrorMaker 2's consumer offset translation feature?  I
> have not used this myself, but it sounds like what you are looking for!
> https://issues.apache.org/jira/browse/KAFKA-9076
>
> https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html
> https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/
>
> I tried to find some better docs to link for you, but that's the best I
> got :)  It looks like there is just the Java API.
>
>
>
> On Wed, May 4, 2022 at 3:29 PM Hemanga Borah <borah.hema...@gmail.com>
> wrote:
>
>> Thank you for the suggestions, guys!
>>
>> @Austin Cawley-Edwards
>> Your idea is spot on! This approach would surely work. We could take a
>> savepoint of each of our apps, load it using state processor apis and
>> create another savepoint accounting for the delta on the offsets, and start
>> the app on the new cloud using this modified savepoint.
>> However, the solution will not be generic, and we have to do this for
>> each of our applications. This can be quite cumbersome as we have several
>> applications (around 25).
>>
>> We are thinking of overriding the FlinkKafkaConsumerBase to account for
>> the offset deltas during the start-up of any app. Do you think it is safe
>> to do that? Is there a better way of doing this?
>>
>> @Schwalbe Matthias
>> Thank you for your suggestion. We do use exactly-once semantics, but, our
>> apps can tolerate a few duplicates in rare cases like this one where we are
>> migrating clouds. However, your suggestion is really helpful and we will
>> use it in case some of the apps cannot tolerate duplicate data.
>>
>>
>> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
>> matthias.schwa...@viseca.ch> wrote:
>>
>>> Hello Hemanga,
>>>
>>>
>>>
>>> MirrorMaker can cause havoc in many respects, for one, it does not have
>>> strict exactly-once.semantics…
>>>
>>>
>>>
>>> The way I would tackle this problem (and have done in similar
>>> situaltions):
>>>
>>>
>>>
>>>    - For the source topics that need to be have exactly-once-semantics
>>>    and that are not intrinsically idempotent:
>>>    - Add one extra operator after the source that deduplicates events
>>>    by unique id for a rolling time range (on the source cloud provider)
>>>    - Take a savepoint after the rolling time-range has passed (at least
>>>    once completely)
>>>    - Move your job to the target cloud provider
>>>    - Reconfigure the resp. source with a new kafka consumer group.id,
>>>    - Change the uid() of the resp. kafka source,
>>>    - Configure start-by-timestamp for the resp. source with a timestamp
>>>    that lies within the rolling time range (of above)
>>>    - Configure the job to ignore  recovery for state that does not have
>>>    a corresponding operator in the job (the previous kafka source uid()s)
>>>    - Start the job on new cloud provider, wait for it to pick
>>>    up/back-fill
>>>    - Take a savepoint
>>>    - Remove deduplication operator if that causes too much
>>>    load/latency/whatever
>>>
>>>
>>>
>>> This scheme sounds more complicated than it really is … and has saved my
>>> sanity quite a number of times 😊
>>>
>>>
>>>
>>> Good luck and ready to answer more details
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>> *From:* Hemanga Borah <borah.hema...@gmail.com>
>>> *Sent:* Tuesday, May 3, 2022 3:12 AM
>>> *To:* user@flink.apache.org
>>> *Subject:* Migrating Flink apps across cloud with state
>>>
>>>
>>>
>>> Hello,
>>>  We are attempting to port our Flink applications from one cloud
>>> provider to another.
>>>
>>>  These Flink applications consume data from Kafka topics and output to
>>> various destinations (Kafka or databases). The applications have states
>>> stored in them. Some of these stored states are aggregations, for example,
>>> at times we store hours (or days) worth of data to aggregate over time.
>>> Some other applications have cached information for data enrichment, for
>>> example, we store data in Flink state for days, so that we can join them
>>> with newly arrived data. The amount of data on the input topics is a lot,
>>> and it will be expensive to reprocess the data from the beginning of the
>>> topic.
>>>
>>>  As such, we want to retain the state of the application when we move to
>>> a different cloud provider so that we can retain the aggregations and
>>> cache, and do not have to start from the beginning of the input topics.
>>>
>>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>>> procedure:
>>>
>>>    - Replicate the input topics of each Flink application from source
>>>    cloud to destination cloud.
>>>    - Take a savepoint of the Flink application on the source cloud
>>>    provider.
>>>    - Start the Flink application on the destination cloud provider
>>>    using the savepoint from the source cloud provider.
>>>
>>>
>>> However, this does not work as we want because there is a difference in
>>> offset in the new topics in the new cloud provider (because of MirrorMaker
>>> implementation). The offsets of the new topic do not match the ones stored
>>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>>> topic during startup.
>>>
>>> Has anyone tried to move clouds while retaining the Flink state?
>>>
>>> Thanks,
>>> Hemanga
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>

-- 
https://twitter.com/snntrable
https://github.com/knaufk

Reply via email to