Resending ->

Hello Sergio,
I'm a newbie(not an expert) and give it a try.

If I presume you've only a reference state on Flink and no other states and
Kafka is the source, you may see below approach as an option , should test.

a. Create two applications - Primary and Secondary (blue and green)
b. Create two S3 buckets - Primary and secondary(S3/A and S3/B)
c. S3/A and S3/B  -both are in sync with reference data only
d. Refresh the reference data on both  Primary and Secondary buckets
everyday - separate Flink job(refresh reference data) with separate topic
e. Just shut down the primary cluster/instance . Note the latest offset
value of Kafka topic(s) of transaction events data till which primary able
to complete processing
f. intermittently processing could be ignored as processing would start
from the latest successful offset at secondary side
f. start the secondary cluster with newer deployment with Kafka - offset
setting latest. No gap of offset value of Kafka topic between Primary side
process where it stopped vs secondary where it starts

Logically it should work with very minimum downtime. Downside is, you need
to maintain two S3 buckets.

I leave it to Flink committers and experts to comment as I may have
completely misunderstood the problem and internal operations.

-A

On Tue, Mar 19, 2024 at 1:52 PM Asimansu Bera <asimansu.b...@gmail.com>
wrote:

> Hello Sergio,
> I'm a newbie(not an expert) and give it a try.
>
> If I presume you've only a reference state on Flink and no other states
> and Kafka is the source, you may see below approach as an option , should
> test.
>
> a. Create two applications - Primary and Secondary (blue and green)
> b. Create two S3 buckets - Primary and secondary(S3/A and S3/B)
> c. S3/A and S3/B  -both are in sync with reference data only
> d. Refresh the reference data on both  Primary and Secondary buckets
> everyday - separate Flink job(refresh reference data) with separate topic
> e. Just shut down the primary cluster/instance . Note the latest offset
> value of Kafka topic(s) of transaction events data till which primary able
> to complete processing
> f. intermittently processing could be ignored as processing would start
> from the latest successful offset at secondary side
> f. start the secondary cluster with newer deployment with Kafka - offset
> setting latest. No gap of offset value of Kafka topic between Primary side
> process where it stopped vs secondary where it starts
>
> Logically it should work with very minimum downtime. Down side is, you
> need to maintain two S3 buckets.
>
> I leave it to Flink committers and experts to comments as I may have
> completely misunderstood the problem and internal operations.
>
> -A
>
> On Tue, Mar 19, 2024 at 11:29 AM Sergio Chong Loo
> <schong...@apple.com.invalid> wrote:
>
>> Hello Flink Community,
>>
>> We have a particular challenging scenario, which we’d like to run by the
>> rest of the community experts and check
>>
>> 1. If anything can be done with existing functionality that we’re
>> overlooking, or
>> 2. The feasibility of this proposal.
>>
>> I tried to keep it concise in a 1-pager type format.
>>
>> Thanks in advance,
>> Sergio Chong
>>
>> —
>>
>> Goal
>>
>> We have a particular use case with a team evaluating and looking to adopt
>> Flink. Their pipelines have a specially intricate and long bootstrapping
>> sequence.
>>
>> The main objective: to have a minimum downtime (~1 minute as a starting
>> point). The situation that would affect this downtime the most for this
>> team is a (re)deployment. All changes proposed here are the minimum
>> necessary within a Flink job’s lifecycle boundaries only.
>>
>>
>> Non-goal
>>
>> Anything that requires coordination of 2 jobs or more belongs in the
>> orchestration layer and is outside of the scope of this document; no
>> inter-job awareness/communication should be considered (initially).
>> However, any necessary “hooks” should be provided to make that integration
>> smoother.
>>
>>
>> Scenario
>>
>> The (re)deployments are particularly of concern here. Given the stateful
>> nature of Flink we of course need to “snapshot” the state at any point in
>> time so that the next deployment can take over. However, if we consider a
>> typical sequence in EKS these would be the (simplified) list of steps:
>>
>> All the points with an [*] are “long running” steps where potentially no
>> progress/data processing occurs
>> [*] Take a Checkpoint
>> [*] Issue a restart (pod teardown, resource deallocation, etc.)
>> [*] Start deployment (resource allocation, pod setup, etc.)
>> [*] Flink job submission
>> [*] State loading from the Checkpoint
>> [*] Task Managers initialization (callbacks to user code via open)
>> Begin processing data
>>
>> These typical steps will already take more than 1 minute but there’s
>> more:
>>
>> The team has a requirement of statically loading about 120GB of reference
>> data into direct (non-heap) memory. This data is only read by the pipeline
>> and will need to be reloaded/refreshed every day. This process of fetching
>> the data from storage (S3) and loading it into direct memory takes around
>> 4.5 minutes.
>>
>> NOTES:
>> Reloading this data on the fly (online) is not the first choice since the
>> pipeline would need twice the memory capacity to guarantee a live copy at
>> all times.
>> The (re)deployment process becomes our common denominator since we’ll
>> still have to deal with this during normal code changes/updates. Hence...
>>
>>
>> Proposal
>>
>> The idea is to piggy back on the concept of blue/green deployments but
>> With a slightly modified Flink start up sequence and...
>> Still relying on a clean cutover between deployments (A and B) via
>> incremental checkpoints.
>> Why? To have deployment B bootstrap as much as possible and only cutover
>> deployment A to B when the latter is fully ready to proceed. This means B
>> can “take its time” and prepare what it needs first, then “signal” A and
>> potentially “wait” for A to deliver the desired checkpoint
>>
>> The “signaling” mechanism does not need to be fancy. An idea is to define
>> a “future” checkpoint as the cutover point (since checkpoints are
>> sequential and we know their S3 paths). Deployment B would just implement a
>> monitor on said checkpoint and wait until it’s ready, when it does it’ll
>> load it and proceed. Deployment A should just terminate right after it
>> produced the checkpoint.
>>
>> NOTES:
>> Rely on the existence of the ...chk-XX/_metadata file. This file gets
>> created when the CP is complete. Might need confirmation that the status is
>> actually COMPLETE (successful).
>> A checkpoint can fail, if that's the case we should “increment” the
>> counter and expect the next one to succeed.
>> TBD: what alternatives do we have to query the status of a checkpoint?
>> e.g. IN_PROGRESS, COMPLETED, FAILED, etc.
>>
>> The proposed sequence of steps would look like this:
>> [A] ... already running...
>> [B] Start deployment with a “future” checkpoint of chk-117 for example
>> (resource allocation, pod setup, etc. happens here)
>> [B] Flink job submission
>> [B] Time consuming user reference data loading
>> [B] Job is ready and will wait until our example checkpoint chk-117
>> exists/becomes ready. Transitions to a new state e.g.
>> WAITING_FOR_START_SIGNAL
>> [A] As an example the current checkpoint could be chk-112, the job
>> continues taking cps normally chk-113, -114... until it reaches chk-117
>> [*] [A] Job is terminated after chk-117 checkpoint is complete
>> [*] [B] Detects chk-117 is ready so it continues with the state loading
>> from it
>> [B] Task Managers initialization
>> [B] Transitions to RUNNING. Begin processing data
>>
>> NOTES:
>> Only steps 7-8 ([*]) should potentially “block” the progress/data
>> processing.
>> There should be metrics emitted indicating if a checkpoint was ready
>> before the job was. It is expected the new job to wait for the CP, not the
>> other way around.
>> There are no guarantees that steps 7-8 will take less than a minute but:
>> That’s the best this solution can do
>> The actual time will depend on the state size which entirely depends on
>> the target Team’s design, they’re aware of this.
>>
>> Challenges
>>
>> At the moment of writing I’m not aware of user code that can be executed
>> BEFORE the state is initialized with a checkpoint/savepoint.
>>
>> It is not ideal to change the existing Flink Task/Operator lifecycle, but
>> perhaps we could:
>> Add a new handle to execute user code, statically on a Task Manager,
>> BEFORE the state is loaded, if this is possible...
>> We can implement a monitor so that the startup sequence just waits until
>> the desired checkpoint is ready. Optionally a timeout could be added to
>> take action if the checkpoint is not found before then.
>>
>>
>> An alternative version
>>
>> A slightly modified version of this proposal that could even further
>> reduce the state loading time (non-trivial but worth discussing) could be:
>> Instead of waiting for a specific full checkpoint for the handover,
>> deployment B could start perform something like a “2 stage state loading“.
>> The modified step list would look like this:
>> ...
>> [B] As an example the current checkpoint could be chk-112 , so the job
>> starts preloading it, when it’s done it should transition to a new state
>> e.g. WAITING_FOR_START_SIGNAL
>> [A] The job continues taking cps normally chk-113, -114... until it
>> reaches chk-117
>> [A] Job is terminated after chk-117 checkpoint is complete
>> [*] [B] Detects chk-117 is ready so it loads the remaining incremental
>> checkpoints (-113, -114, -115, -116 and -117)
>> ...
>> [B] Transitions to RUNNING. Begin processing data
>>
>> With this approach only step 8 would potentially “block” the progress and
>> only while it’s loading a few incremental checkpoints rather than a full
>> one.
>
>

Reply via email to