Hello Flink Community, We have a particular challenging scenario, which we’d like to run by the rest of the community experts and check
1. If anything can be done with existing functionality that we’re overlooking, or 2. The feasibility of this proposal. I tried to keep it concise in a 1-pager type format. Thanks in advance, Sergio Chong — Goal We have a particular use case with a team evaluating and looking to adopt Flink. Their pipelines have a specially intricate and long bootstrapping sequence. The main objective: to have a minimum downtime (~1 minute as a starting point). The situation that would affect this downtime the most for this team is a (re)deployment. All changes proposed here are the minimum necessary within a Flink job’s lifecycle boundaries only. Non-goal Anything that requires coordination of 2 jobs or more belongs in the orchestration layer and is outside of the scope of this document; no inter-job awareness/communication should be considered (initially). However, any necessary “hooks” should be provided to make that integration smoother. Scenario The (re)deployments are particularly of concern here. Given the stateful nature of Flink we of course need to “snapshot” the state at any point in time so that the next deployment can take over. However, if we consider a typical sequence in EKS these would be the (simplified) list of steps: All the points with an [*] are “long running” steps where potentially no progress/data processing occurs [*] Take a Checkpoint [*] Issue a restart (pod teardown, resource deallocation, etc.) [*] Start deployment (resource allocation, pod setup, etc.) [*] Flink job submission [*] State loading from the Checkpoint [*] Task Managers initialization (callbacks to user code via open) Begin processing data These typical steps will already take more than 1 minute but there’s more: The team has a requirement of statically loading about 120GB of reference data into direct (non-heap) memory. This data is only read by the pipeline and will need to be reloaded/refreshed every day. This process of fetching the data from storage (S3) and loading it into direct memory takes around 4.5 minutes. NOTES: Reloading this data on the fly (online) is not the first choice since the pipeline would need twice the memory capacity to guarantee a live copy at all times. The (re)deployment process becomes our common denominator since we’ll still have to deal with this during normal code changes/updates. Hence... Proposal The idea is to piggy back on the concept of blue/green deployments but With a slightly modified Flink start up sequence and... Still relying on a clean cutover between deployments (A and B) via incremental checkpoints. Why? To have deployment B bootstrap as much as possible and only cutover deployment A to B when the latter is fully ready to proceed. This means B can “take its time” and prepare what it needs first, then “signal” A and potentially “wait” for A to deliver the desired checkpoint The “signaling” mechanism does not need to be fancy. An idea is to define a “future” checkpoint as the cutover point (since checkpoints are sequential and we know their S3 paths). Deployment B would just implement a monitor on said checkpoint and wait until it’s ready, when it does it’ll load it and proceed. Deployment A should just terminate right after it produced the checkpoint. NOTES: Rely on the existence of the ...chk-XX/_metadata file. This file gets created when the CP is complete. Might need confirmation that the status is actually COMPLETE (successful). A checkpoint can fail, if that's the case we should “increment” the counter and expect the next one to succeed. TBD: what alternatives do we have to query the status of a checkpoint? e.g. IN_PROGRESS, COMPLETED, FAILED, etc. The proposed sequence of steps would look like this: [A] ... already running... [B] Start deployment with a “future” checkpoint of chk-117 for example (resource allocation, pod setup, etc. happens here) [B] Flink job submission [B] Time consuming user reference data loading [B] Job is ready and will wait until our example checkpoint chk-117 exists/becomes ready. Transitions to a new state e.g. WAITING_FOR_START_SIGNAL [A] As an example the current checkpoint could be chk-112, the job continues taking cps normally chk-113, -114... until it reaches chk-117 [*] [A] Job is terminated after chk-117 checkpoint is complete [*] [B] Detects chk-117 is ready so it continues with the state loading from it [B] Task Managers initialization [B] Transitions to RUNNING. Begin processing data NOTES: Only steps 7-8 ([*]) should potentially “block” the progress/data processing. There should be metrics emitted indicating if a checkpoint was ready before the job was. It is expected the new job to wait for the CP, not the other way around. There are no guarantees that steps 7-8 will take less than a minute but: That’s the best this solution can do The actual time will depend on the state size which entirely depends on the target Team’s design, they’re aware of this. Challenges At the moment of writing I’m not aware of user code that can be executed BEFORE the state is initialized with a checkpoint/savepoint. It is not ideal to change the existing Flink Task/Operator lifecycle, but perhaps we could: Add a new handle to execute user code, statically on a Task Manager, BEFORE the state is loaded, if this is possible... We can implement a monitor so that the startup sequence just waits until the desired checkpoint is ready. Optionally a timeout could be added to take action if the checkpoint is not found before then. An alternative version A slightly modified version of this proposal that could even further reduce the state loading time (non-trivial but worth discussing) could be: Instead of waiting for a specific full checkpoint for the handover, deployment B could start perform something like a “2 stage state loading“. The modified step list would look like this: ... [B] As an example the current checkpoint could be chk-112 , so the job starts preloading it, when it’s done it should transition to a new state e.g. WAITING_FOR_START_SIGNAL [A] The job continues taking cps normally chk-113, -114... until it reaches chk-117 [A] Job is terminated after chk-117 checkpoint is complete [*] [B] Detects chk-117 is ready so it loads the remaining incremental checkpoints (-113, -114, -115, -116 and -117) ... [B] Transitions to RUNNING. Begin processing data With this approach only step 8 would potentially “block” the progress and only while it’s loading a few incremental checkpoints rather than a full one.