Hello Flink Community,

We have a particular challenging scenario, which we’d like to run by the rest 
of the community experts and check

1. If anything can be done with existing functionality that we’re overlooking, 
or
2. The feasibility of this proposal.

I tried to keep it concise in a 1-pager type format.

Thanks in advance,
Sergio Chong

—

Goal

We have a particular use case with a team evaluating and looking to adopt 
Flink. Their pipelines have a specially intricate and long bootstrapping 
sequence.

The main objective: to have a minimum downtime (~1 minute as a starting point). 
The situation that would affect this downtime the most for this team is a 
(re)deployment. All changes proposed here are the minimum necessary within a 
Flink job’s lifecycle boundaries only.


Non-goal

Anything that requires coordination of 2 jobs or more belongs in the 
orchestration layer and is outside of the scope of this document; no inter-job 
awareness/communication should be considered (initially). However, any 
necessary “hooks” should be provided to make that integration smoother.


Scenario

The (re)deployments are particularly of concern here. Given the stateful nature 
of Flink we of course need to “snapshot” the state at any point in time so that 
the next deployment can take over. However, if we consider a typical sequence 
in EKS these would be the (simplified) list of steps:

All the points with an [*] are “long running” steps where potentially no 
progress/data processing occurs
[*] Take a Checkpoint
[*] Issue a restart (pod teardown, resource deallocation, etc.)
[*] Start deployment (resource allocation, pod setup, etc.)
[*] Flink job submission
[*] State loading from the Checkpoint
[*] Task Managers initialization (callbacks to user code via open)
Begin processing data

These typical steps will already take more than 1 minute but there’s more: 

The team has a requirement of statically loading about 120GB of reference data 
into direct (non-heap) memory. This data is only read by the pipeline and will 
need to be reloaded/refreshed every day. This process of fetching the data from 
storage (S3) and loading it into direct memory takes around 4.5 minutes.

NOTES:
Reloading this data on the fly (online) is not the first choice since the 
pipeline would need twice the memory capacity to guarantee a live copy at all 
times.
The (re)deployment process becomes our common denominator since we’ll still 
have to deal with this during normal code changes/updates. Hence...


Proposal

The idea is to piggy back on the concept of blue/green deployments but 
With a slightly modified Flink start up sequence and...
Still relying on a clean cutover between deployments (A and B) via incremental 
checkpoints.
Why? To have deployment B bootstrap as much as possible and only cutover 
deployment A to B when the latter is fully ready to proceed. This means B can 
“take its time” and prepare what it needs first, then “signal” A and 
potentially “wait” for A to deliver the desired checkpoint 

The “signaling” mechanism does not need to be fancy. An idea is to define a 
“future” checkpoint as the cutover point (since checkpoints are sequential and 
we know their S3 paths). Deployment B would just implement a monitor on said 
checkpoint and wait until it’s ready, when it does it’ll load it and proceed. 
Deployment A should just terminate right after it produced the checkpoint.

NOTES:
Rely on the existence of the ...chk-XX/_metadata file. This file gets created 
when the CP is complete. Might need confirmation that the status is actually 
COMPLETE (successful).
A checkpoint can fail, if that's the case we should “increment” the counter and 
expect the next one to succeed.
TBD: what alternatives do we have to query the status of a checkpoint? e.g. 
IN_PROGRESS, COMPLETED, FAILED, etc.

The proposed sequence of steps would look like this:
[A] ... already running...
[B] Start deployment with a “future” checkpoint of chk-117 for example 
(resource allocation, pod setup, etc. happens here)
[B] Flink job submission
[B] Time consuming user reference data loading
[B] Job is ready and will wait until our example checkpoint chk-117 
exists/becomes ready. Transitions to a new state e.g. WAITING_FOR_START_SIGNAL
[A] As an example the current checkpoint could be chk-112, the job continues 
taking cps normally chk-113, -114... until it reaches chk-117
[*] [A] Job is terminated after chk-117 checkpoint is complete
[*] [B] Detects chk-117 is ready so it continues with the state loading from it
[B] Task Managers initialization
[B] Transitions to RUNNING. Begin processing data

NOTES:
Only steps 7-8 ([*]) should potentially “block” the progress/data processing.
There should be metrics emitted indicating if a checkpoint was ready before the 
job was. It is expected the new job to wait for the CP, not the other way 
around.
There are no guarantees that steps 7-8 will take less than a minute but:
That’s the best this solution can do
The actual time will depend on the state size which entirely depends on the 
target Team’s design, they’re aware of this.

Challenges

At the moment of writing I’m not aware of user code that can be executed BEFORE 
the state is initialized with a checkpoint/savepoint.

It is not ideal to change the existing Flink Task/Operator lifecycle, but 
perhaps we could:
Add a new handle to execute user code, statically on a Task Manager, BEFORE the 
state is loaded, if this is possible...
We can implement a monitor so that the startup sequence just waits until the 
desired checkpoint is ready. Optionally a timeout could be added to take action 
if the checkpoint is not found before then.


An alternative version

A slightly modified version of this proposal that could even further reduce the 
state loading time (non-trivial but worth discussing) could be: Instead of 
waiting for a specific full checkpoint for the handover, deployment B could 
start perform something like a “2 stage state loading“. The modified step list 
would look like this:
...
[B] As an example the current checkpoint could be chk-112 , so the job starts 
preloading it, when it’s done it should transition to a new state e.g. 
WAITING_FOR_START_SIGNAL
[A] The job continues taking cps normally chk-113, -114... until it reaches 
chk-117
[A] Job is terminated after chk-117 checkpoint is complete
[*] [B] Detects chk-117 is ready so it loads the remaining incremental 
checkpoints (-113, -114, -115, -116 and -117)
...
[B] Transitions to RUNNING. Begin processing data

With this approach only step 8 would potentially “block” the progress and only 
while it’s loading a few incremental checkpoints rather than a full one.

Reply via email to