Thank you for the suggestions, guys!

@Andrew Otto
This is the way we will most likely go. However, this will require us to
meddle with the Flink consumer codebase. And looks like there is no other
way around it. We will add some custom code to perform offset resetting for
specific savepoints.

@Konstantin Knauf
This is a valuable suggestion. And we thought about it. However, we are
migrating 25+ applications, and making such code changes for each of them
would be quite expensive because we have to write code for each of these
apps. So, even though this method would definitely get us to the goal, we
cannot afford to adopt it at this time due to a lot of manual changes.


On Tue, May 10, 2022 at 11:59 AM Konstantin Knauf <kna...@apache.org> wrote:

> Hi there,
>
> to me the simplest and most reliable solution still seems to be to split
> the stream based on event time. It requires a bit of preparation and assume
> that you can tolerate some downtime when doing the migration.
>
> 1) For Cloud1 you chain a filter to your sources that filters out any
> records with a timestamp >  t_migration. Best you make this timestamp
> configurable.
> 2) For Cloud2, you chain a filter to your sources that filters out any
> records with timestamp <= t_migration.  Also configurable.
> 3) When you do the migration you configure t_migration to be, let's say 1
> hour in the future. You let the Job run in Cloud1 until you are sure that
> no more data with an event timestamp <= t_migration will arrive. You take a
> savepoint.
> 4) You start your application in cloud2 with the same value for
> t_migration and manually configured Kafka offsets for which you are sure
> they contain all records with a timestamp > t_migration.
>
> Could this work for you?
>
> Cheers,
>
> Konstantin
>
>
>
>
> Am Mi., 4. Mai 2022 um 22:26 Uhr schrieb Andrew Otto <o...@wikimedia.org>:
>
>> Have you tried MirrorMaker 2's consumer offset translation feature?  I
>> have not used this myself, but it sounds like what you are looking for!
>> https://issues.apache.org/jira/browse/KAFKA-9076
>>
>> https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html
>> https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/
>>
>> I tried to find some better docs to link for you, but that's the best I
>> got :)  It looks like there is just the Java API.
>>
>>
>>
>> On Wed, May 4, 2022 at 3:29 PM Hemanga Borah <borah.hema...@gmail.com>
>> wrote:
>>
>>> Thank you for the suggestions, guys!
>>>
>>> @Austin Cawley-Edwards
>>> Your idea is spot on! This approach would surely work. We could take a
>>> savepoint of each of our apps, load it using state processor apis and
>>> create another savepoint accounting for the delta on the offsets, and start
>>> the app on the new cloud using this modified savepoint.
>>> However, the solution will not be generic, and we have to do this for
>>> each of our applications. This can be quite cumbersome as we have several
>>> applications (around 25).
>>>
>>> We are thinking of overriding the FlinkKafkaConsumerBase to account for
>>> the offset deltas during the start-up of any app. Do you think it is safe
>>> to do that? Is there a better way of doing this?
>>>
>>> @Schwalbe Matthias
>>> Thank you for your suggestion. We do use exactly-once semantics, but,
>>> our apps can tolerate a few duplicates in rare cases like this one where we
>>> are migrating clouds. However, your suggestion is really helpful and we
>>> will use it in case some of the apps cannot tolerate duplicate data.
>>>
>>>
>>> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
>>>> Hello Hemanga,
>>>>
>>>>
>>>>
>>>> MirrorMaker can cause havoc in many respects, for one, it does not have
>>>> strict exactly-once.semantics…
>>>>
>>>>
>>>>
>>>> The way I would tackle this problem (and have done in similar
>>>> situaltions):
>>>>
>>>>
>>>>
>>>>    - For the source topics that need to be have exactly-once-semantics
>>>>    and that are not intrinsically idempotent:
>>>>    - Add one extra operator after the source that deduplicates events
>>>>    by unique id for a rolling time range (on the source cloud provider)
>>>>    - Take a savepoint after the rolling time-range has passed (at
>>>>    least once completely)
>>>>    - Move your job to the target cloud provider
>>>>    - Reconfigure the resp. source with a new kafka consumer group.id,
>>>>    - Change the uid() of the resp. kafka source,
>>>>    - Configure start-by-timestamp for the resp. source with a
>>>>    timestamp that lies within the rolling time range (of above)
>>>>    - Configure the job to ignore  recovery for state that does not
>>>>    have a corresponding operator in the job (the previous kafka source 
>>>> uid()s)
>>>>    - Start the job on new cloud provider, wait for it to pick
>>>>    up/back-fill
>>>>    - Take a savepoint
>>>>    - Remove deduplication operator if that causes too much
>>>>    load/latency/whatever
>>>>
>>>>
>>>>
>>>> This scheme sounds more complicated than it really is … and has saved
>>>> my sanity quite a number of times 😊
>>>>
>>>>
>>>>
>>>> Good luck and ready to answer more details
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>> *From:* Hemanga Borah <borah.hema...@gmail.com>
>>>> *Sent:* Tuesday, May 3, 2022 3:12 AM
>>>> *To:* user@flink.apache.org
>>>> *Subject:* Migrating Flink apps across cloud with state
>>>>
>>>>
>>>>
>>>> Hello,
>>>>  We are attempting to port our Flink applications from one cloud
>>>> provider to another.
>>>>
>>>>  These Flink applications consume data from Kafka topics and output to
>>>> various destinations (Kafka or databases). The applications have states
>>>> stored in them. Some of these stored states are aggregations, for example,
>>>> at times we store hours (or days) worth of data to aggregate over time.
>>>> Some other applications have cached information for data enrichment, for
>>>> example, we store data in Flink state for days, so that we can join them
>>>> with newly arrived data. The amount of data on the input topics is a lot,
>>>> and it will be expensive to reprocess the data from the beginning of the
>>>> topic.
>>>>
>>>>  As such, we want to retain the state of the application when we move
>>>> to a different cloud provider so that we can retain the aggregations and
>>>> cache, and do not have to start from the beginning of the input topics.
>>>>
>>>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>>>> procedure:
>>>>
>>>>    - Replicate the input topics of each Flink application from source
>>>>    cloud to destination cloud.
>>>>    - Take a savepoint of the Flink application on the source cloud
>>>>    provider.
>>>>    - Start the Flink application on the destination cloud provider
>>>>    using the savepoint from the source cloud provider.
>>>>
>>>>
>>>> However, this does not work as we want because there is a difference in
>>>> offset in the new topics in the new cloud provider (because of MirrorMaker
>>>> implementation). The offsets of the new topic do not match the ones stored
>>>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>>>> topic during startup.
>>>>
>>>> Has anyone tried to move clouds while retaining the Flink state?
>>>>
>>>> Thanks,
>>>> Hemanga
>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>> dieser Informationen ist streng verboten.
>>>>
>>>> This message is intended only for the named recipient and may contain
>>>> confidential or privileged information. As the confidentiality of email
>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>> the confidentiality and the intactness of this message. If you have
>>>> received it in error, please advise the sender by return e-mail and delete
>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>> this information is strictly prohibited.
>>>>
>>>
>
> --
> https://twitter.com/snntrable
> https://github.com/knaufk
>

Reply via email to